Skip to content

Commit

Permalink
Import multi threaded
Browse files Browse the repository at this point in the history
  • Loading branch information
goulven authored and goulven committed Sep 16, 2024
1 parent 710d57d commit 3fe85f0
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,13 @@ public class BackupConfig {
/**
* Size of the bulk to use for products importation
*/
private int importBulkSize = 200;
private int importBulkSize = 100;

/**
* Number of concurent threads running import (1 thread per file, so should be less or equals than productsExportThreads)
*/
private int productImportThreads = 4;

/**
* Number of threads (and of files) to operate export with
*/
Expand Down Expand Up @@ -134,6 +139,14 @@ public long getMaxProductsBackupAgeInHours() {
public void setMaxProductsBackupAgeInHours(long maxProductsBackupAgeInHours) {
this.maxProductsBackupAgeInHours = maxProductsBackupAgeInHours;
}

public int getProductImportThreads() {
return productImportThreads;
}

public void setProductImportThreads(int productImportThreads) {
this.productImportThreads = productImportThreads;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.GZIPInputStream;
Expand Down Expand Up @@ -164,40 +163,58 @@ public void importProducts() {

File importFolder = new File(backupConfig.getImportProductPath());
if (!importFolder.exists() || !importFolder.isDirectory()) {
logger.error("Import file does not exists or is not a folder : {}", importFolder.getAbsolutePath());
logger.error("Import file does not exist or is not a folder : {}", importFolder.getAbsolutePath());
return;
}

AtomicLong counter = new AtomicLong(0);
ExecutorService executorService = Executors.newFixedThreadPool(backupConfig.getProductImportThreads());

for (File importFile : importFolder.listFiles()) {
logger.info("Importing file started : {}", importFile.getAbsolutePath());
try (InputStream inputStream = new FileInputStream(importFile);
GZIPInputStream gzipInputStream = new GZIPInputStream(inputStream);
InputStreamReader inputStreamReader = new InputStreamReader(gzipInputStream);
BufferedReader bufferedReader = new BufferedReader(inputStreamReader)) {

List<Product> group = new ArrayList<>();

bufferedReader.lines().forEach(line -> {
try {
group.add(serialisationService.fromJson(line, Product.class));
} catch (IOException e) {
logger.error("Error occurs in data deserialisation", e);
executorService.submit(() -> {
logger.info("Importing file started : {}", importFile.getAbsolutePath());
try (InputStream inputStream = new FileInputStream(importFile);
GZIPInputStream gzipInputStream = new GZIPInputStream(inputStream);
InputStreamReader inputStreamReader = new InputStreamReader(gzipInputStream);
BufferedReader bufferedReader = new BufferedReader(inputStreamReader)) {

List<Product> group = new ArrayList<>();
bufferedReader.lines().forEach(line -> {
try {
group.add(serialisationService.fromJson(line, Product.class));
counter.incrementAndGet();
if (counter.get() % 1000 == 0) {
logger.warn("Imported items so : {}", counter.get());
}
if (group.size() == backupConfig.getImportBulkSize()) {
productRepo.storeNoCache(group); // Index the current group
group.clear(); // Clear the group for the next batch
}
} catch (Exception e) {
logger.error("Error occurs in data import", e);
}

});

if (!group.isEmpty()) {
productRepo.storeNoCache(group);
}
logger.info("Importing file finished : {}", importFile.getAbsolutePath());
} catch (Exception e) {
logger.error("Error occurs in data file processing", e);
}
if (group.size() == backupConfig.getImportBulkSize()) {
productRepo.storeNoCache(group); // Index the current group
group.clear(); // Clear the group for the next batch
}
});

// Index the remaining lines if any
if (!group.isEmpty()) {
productRepo.storeNoCache(group);
}
logger.info("Importing file finished : {}", importFile.getAbsolutePath());
} catch (Exception e) {
logger.error("Error occurs in data file import", e);
}
});
}

executorService.shutdown();
while (!executorService.isTerminated()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
logger.error("Error while sleeping",e);
}
}

logger.info("Product import : finished");
}

Expand Down

0 comments on commit 3fe85f0

Please sign in to comment.