Skip to content

Commit

Permalink
Tuning conf for inexation threads
Browse files Browse the repository at this point in the history
Test by skipping addAttribute (Datafragment)
  • Loading branch information
goulven authored and goulven committed Oct 7, 2024
1 parent 458b12f commit a2af1db
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ public DataFragmentAggregationWorker(final DataFragmentStoreService dataFragment
@Override
public void run() {

// TODO : exit thread condition
while (true) {
try {
if (!service.getQueue().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ public DataFragmentStoreService(StandardiserService standardiserService, Aggrega
this.queue = new LinkedBlockingQueue<>(indexationConfig.getDatafragmentQueueMaxSize());

for (int i = 0; i < indexationConfig.getDataFragmentworkers(); i++) {
logger.info("Starting file queue consumer thread {}, with bulk page size of {} items",i, indexationConfig.getBulkPageSize() );
logger.info("Starting file queue consumer thread {}, with bulk page size of {} items",i, indexationConfig.getDataFragmentBulkPageSize() );
//TODO(p3,perf) : Virtual threads, but ko with visualVM profiling
new Thread(new DataFragmentAggregationWorker(this, indexationConfig.getBulkPageSize(), indexationConfig.getPauseDuration(),"dequeue-worker-"+i)).start();;
new Thread(new DataFragmentAggregationWorker(this, indexationConfig.getDataFragmentBulkPageSize(), indexationConfig.getPauseDuration(),"aggreg-worker-"+i)).start();;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,33 @@ public class IndexationConfig {
/**
* Max size of the blocking queue for products
*/
int productsQueueMaxSize = 15000;
int productsQueueMaxSize = 10000;

/**
* Max size of the blocking queue for products
*/
int datafragmentQueueMaxSize = 50000;



int datafragmentQueueMaxSize = 10000;

/**
* Bulk size (applied for update, means on fetching and on insertion in elastic cluster)
* Bulk size (applied for update of datafragments, means on fetching and on insertion in elastic cluster)
*/

int bulkPageSize = 200;
int dataFragmentbulkPageSize = 200;

int productsbulkPageSize = 200;



/**
* Number ofconccurent workers
* Number ofconccurent workers for product (only save in es)
*/
int productWorkers = 2;

int dataFragmentworkers = 3;

/**
* Number of concurent workers for datafragments aggregation (means retrieve from cluster, update and save bck)
*/
int dataFragmentworkers = 2;


/**
Expand All @@ -35,11 +40,11 @@ public class IndexationConfig {
int pauseDuration = 4000;


public int getBulkPageSize() {
return bulkPageSize;
public int getDataFragmentBulkPageSize() {
return dataFragmentbulkPageSize;
}
public void setBulkPageSize(int dequeueSize) {
this.bulkPageSize = dequeueSize;
public void setDataFragmentBulkPageSize(int dequeueSize) {
this.dataFragmentbulkPageSize = dequeueSize;
}
public int getProductWorkers() {
return productWorkers;
Expand Down Expand Up @@ -71,6 +76,15 @@ public int getDatafragmentQueueMaxSize() {
public void setDatafragmentQueueMaxSize(int datafragmentQueueMaxSize) {
this.datafragmentQueueMaxSize = datafragmentQueueMaxSize;
}
public int getProductsbulkPageSize() {
return productsbulkPageSize;
}
public void setProductsbulkPageSize(int productsbulkPageSize) {
this.productsbulkPageSize = productsbulkPageSize;
}
public int getDataFragmentbulkPageSize() {
return dataFragmentbulkPageSize;
}



Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,12 @@ public ProductRepository() {
// private ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();

public ProductRepository(IndexationConfig indexationConfig) {
logger.info("Starting file queue consumer thread, with bulk page size of {} items", indexationConfig.getBulkPageSize() );

for (int i = 0; i < indexationConfig.getProductWorkers(); i++) {
//TODO(p3,perf) : Virtual threads, but ko with visualVM profiling
new Thread((new ProductIndexationWorker(this, indexationConfig.getBulkPageSize(), indexationConfig.getPauseDuration(),"dequeue-worker-"+i))).start();
}

logger.info("Starting file queue consumer thread, with bulk page size of {} items", indexationConfig.getProductsbulkPageSize() );

for (int i = 0; i < indexationConfig.getProductWorkers(); i++) {
//TODO(p3,perf) : Virtual threads, but ko with visualVM profiling
new Thread((new ProductIndexationWorker(this, indexationConfig.getProductsbulkPageSize(), indexationConfig.getPauseDuration(),"products-worker-"+i))).start();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,40 +540,40 @@ public void addReferentielAttribute(final String key, final String value) {
* @return
*/
public void addAttribute(final String name, final String value, final String language, final Boolean ignoreCariageReturns,final Set<String> multivalueSeparators ) {
if (null == value || value.isBlank()) {
logger.debug("Cannot add null or empty values for attribute " + name);
return ;
}

// Evicting too big values
// TODO(p3,conf) : From config
if (value.length() > 10000) {
logger.warn("Evicting a too big value for attribute {}:{}", name,value);
return;
}

final Attribute attr = new Attribute();

// Attributye name normalisation
String sanitizedName = IdHelper.normalizeAttributeName(name);
// TODO(p2,conf) : From conf, could have the =, ..
if (sanitizedName.endsWith(":")) {
sanitizedName = sanitizedName.substring(0, sanitizedName.length() -1).trim();
}

attr.setName(sanitizedName);

if (ignoreCariageReturns.booleanValue()) {
attr.setRawValue(value.trim().replaceAll("[\\r\\n]+", " "));
} else {
attr.setRawValue(value);
}

attr.setLanguage(language);



addAttribute(attr, multivalueSeparators);
// if (null == value || value.isBlank()) {
// logger.debug("Cannot add null or empty values for attribute " + name);
// return ;
// }
//
// // Evicting too big values
// // TODO(p3,conf) : From config
// if (value.length() > 10000) {
// logger.warn("Evicting a too big value for attribute {}:{}", name,value);
// return;
// }
//
// final Attribute attr = new Attribute();
//
// // Attributye name normalisation
// String sanitizedName = IdHelper.normalizeAttributeName(name);
// // TODO(p2,conf) : From conf, could have the =, ..
// if (sanitizedName.endsWith(":")) {
// sanitizedName = sanitizedName.substring(0, sanitizedName.length() -1).trim();
// }
//
// attr.setName(sanitizedName);
//
// if (ignoreCariageReturns.booleanValue()) {
// attr.setRawValue(value.trim().replaceAll("[\\r\\n]+", " "));
// } else {
// attr.setRawValue(value);
// }
//
// attr.setLanguage(language);
//
//
//
// addAttribute(attr, multivalueSeparators);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void run() {

service.store(buffer.values());

logger.info("{} has indexed {} DataFragments. {} Remaining in queue",workerName, buffer.size(), service.getQueue().size());
logger.warn ("{} has indexed {} products. {} Remaining in queue",workerName, buffer.size(), service.getQueue().size());

} else {
try {
Expand Down

0 comments on commit a2af1db

Please sign in to comment.