Skip to content

Commit

Permalink
Bulk updates strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
goulven authored and goulven committed Oct 8, 2024
1 parent e8f4f53 commit eefe7f1
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void score(VerticalConfig vertical) {
logger.error("Skipping product during batched scoring : ",e);
}
logger.info("Score batching : indexing {} products", productBag.size());
dataRepository.index(productBag);
dataRepository.addToFullindexationQueue(productBag);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public void importProducts() {
logger.warn("Imported items so : {}", counter.get());
}
if (group.size() == backupConfig.getImportBulkSize()) {
productRepo.storeNoCache(group); // Index the current group
productRepo.store(group); // Index the current group
group.clear(); // Clear the group for the next batch
}
} catch (Exception e) {
Expand All @@ -254,7 +254,7 @@ public void importProducts() {
});

if (!group.isEmpty()) {
productRepo.storeNoCache(group);
productRepo.store(group);
}
logger.info("Importing file finished : {}", importFile.getAbsolutePath());
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void run() {
// Aggregating
service.aggregateAndstore(buffer);

logger.info("{} has indexed {} DataFragments. {} Remaining in queue",workerName, buffer.size(), service.getQueue().size());
logger.warn("{} has handled {} DataFragments. {} Remaining in queue",workerName, buffer.size(), service.getQueue().size());

} else {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public DataFragmentStoreService(StandardiserService standardiserService, Aggrega
for (int i = 0; i < indexationConfig.getDataFragmentworkers(); i++) {
logger.info("Starting file queue consumer thread {}, with bulk page size of {} items",i, indexationConfig.getDataFragmentBulkPageSize() );
//TODO(p3,perf) : Virtual threads, but ko with visualVM profiling
new Thread(new DataFragmentAggregationWorker(this, indexationConfig.getDataFragmentBulkPageSize(), indexationConfig.getPauseDuration(),"aggreg-worker-"+i)).start();;
new Thread(new DataFragmentAggregationWorker(this, indexationConfig.getDataFragmentBulkPageSize(), indexationConfig.getPauseDuration(),"datafragment-worker-"+i)).start();;
}
}

Expand Down Expand Up @@ -234,15 +234,15 @@ public void aggregateAndstore(Collection<DataFragment> buffer) {
// Saving the result

if (fullItemsResults.size() > 0) {
aggregatedDataRepository.index(fullItemsResults);
logger.warn("Added {} products in queue (size is now {})", fullItemsResults.size(),queue.size());
aggregatedDataRepository.addToFullindexationQueue(fullItemsResults);
logger.warn("Submitted {} full products for indexation (datafragment queue size is now {})", fullItemsResults.size(),queue.size());
}


if (partialItemsResults.size() > 0) {
// TODO(p1,perf) : Should use a thread pool
aggregatedDataRepository.indexPartial(partialItemsResults);
logger.warn("Added {} partial products in queue (size is now {})", partialItemsResults.size(), queue.size());
aggregatedDataRepository.addToPartialIndexationQueue(partialItemsResults);
logger.warn("Submitted {} partial products for indexation (datafragment queue size is now {})", partialItemsResults.size(), queue.size());
}

} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.open4goods.commons.dao;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -37,6 +38,8 @@
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.Criteria;
import org.springframework.data.elasticsearch.core.query.CriteriaQuery;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.elasticsearch.core.query.IndexQueryBuilder;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.elasticsearch.core.query.UpdateQuery;

Expand Down Expand Up @@ -77,7 +80,7 @@ public class ProductRepository {

public IndexCoordinates current_index = IndexCoordinates.of(MAIN_INDEX_NAME);

private @Autowired ElasticsearchOperations elasticsearchTemplate;
private @Autowired ElasticsearchOperations elasticsearchOperations;

private @Autowired SerialisationService serialisationService;

Expand Down Expand Up @@ -125,7 +128,7 @@ public Stream<Product> getProductsMatchingCategoriesOrVerticalId(VerticalConfig

final NativeQuery initialQuery = new NativeQueryBuilder().withQuery(new CriteriaQuery(c)).build();

return elasticsearchTemplate.searchForStream(initialQuery, Product.class, current_index).stream()
return elasticsearchOperations.searchForStream(initialQuery, Product.class, current_index).stream()
.map(SearchHit::getContent);

}
Expand All @@ -139,7 +142,7 @@ public Stream<Product> exportAll() {
Query query = Query.findAll();
// TODO : From conf, apply to other
query.setPageable(PageRequest.of(0, 10000)); // Fetch larger batches
return elasticsearchTemplate.searchForStream(query, Product.class, current_index)
return elasticsearchOperations.searchForStream(query, Product.class, current_index)
.stream()
// TODO : Check CPU usage
.parallel()
Expand All @@ -155,7 +158,7 @@ public Stream<Product> exportAll(String vertical) {
final NativeQuery initialQuery = new NativeQueryBuilder()
.withQuery(new CriteriaQuery(c)).build();

return elasticsearchTemplate.searchForStream(initialQuery, Product.class, current_index).stream()
return elasticsearchOperations.searchForStream(initialQuery, Product.class, current_index).stream()
.map(SearchHit::getContent);
}

Expand All @@ -167,7 +170,7 @@ public Stream<Product> searchInValidPrices(String query, final String indexName,
final NativeQuery initialQuery = new NativeQueryBuilder().withQuery(new CriteriaQuery(c))
.withPageable(PageRequest.of(from, to)).build();

return elasticsearchTemplate.search(initialQuery, Product.class, current_index).stream()
return elasticsearchOperations.search(initialQuery, Product.class, current_index).stream()
.map(SearchHit::getContent);

}
Expand Down Expand Up @@ -197,7 +200,7 @@ public Stream<Product> exportVerticalWithValidDate(VerticalConfig vertical, bool

final NativeQuery initialQuery = new NativeQueryBuilder()
.withQuery(new CriteriaQuery(c)).build();
return elasticsearchTemplate.searchForStream(initialQuery, Product.class, current_index).stream()
return elasticsearchOperations.searchForStream(initialQuery, Product.class, current_index).stream()
.map(SearchHit::getContent);
}

Expand All @@ -224,7 +227,7 @@ public Stream<Product> exportAllVerticalizedProductsWithGenAiSinceEpoch(Long epo

NativeQuery initialQuery = initialQueryBuilder.build();

return elasticsearchTemplate.searchForStream(initialQuery, Product.class, current_index).stream().map(SearchHit::getContent);
return elasticsearchOperations.searchForStream(initialQuery, Product.class, current_index).stream().map(SearchHit::getContent);
}


Expand Down Expand Up @@ -261,7 +264,7 @@ public Stream<Product> exportVerticalWithValidDateOrderByEcoscore(String vertica

NativeQuery initialQuery = initialQueryBuilder.build();

return elasticsearchTemplate.searchForStream(initialQuery, Product.class, current_index).stream().map(SearchHit::getContent);
return elasticsearchOperations.searchForStream(initialQuery, Product.class, current_index).stream().map(SearchHit::getContent);
}


Expand All @@ -276,7 +279,7 @@ public Stream<Product> getAllHavingVertical() {

NativeQuery initialQuery = initialQueryBuilder.build();

return elasticsearchTemplate.searchForStream(initialQuery, Product.class, current_index).stream().map(SearchHit::getContent);
return elasticsearchOperations.searchForStream(initialQuery, Product.class, current_index).stream().map(SearchHit::getContent);

}

Expand All @@ -299,7 +302,7 @@ public Stream<Product> exportVerticalWithValidDateOrderByEcoscore(String vertica


public SearchHits<Product> search(Query query, final String indexName) {
return elasticsearchTemplate.search(query, Product.class, IndexCoordinates.of(indexName));
return elasticsearchOperations.search(query, Product.class, IndexCoordinates.of(indexName));

}

Expand Down Expand Up @@ -364,7 +367,7 @@ public void index(final Product p) {
*
* @param p
*/
public void index(Collection<Product> data) {
public void addToFullindexationQueue(Collection<Product> data) {

logger.info("Queuing {} products", data.size());

Expand All @@ -379,7 +382,7 @@ public void index(Collection<Product> data) {
});
}

public void indexPartial(Collection<ProductPartialUpdateHolder> data) {
public void addToPartialIndexationQueue(Collection<ProductPartialUpdateHolder> data) {

logger.info("Queuing {} products", data.size());

Expand All @@ -396,29 +399,24 @@ public void indexPartial(Collection<ProductPartialUpdateHolder> data) {

public void store(Collection<Product> data) {
logger.info("Indexing {} products", data.size());

// executor.submit(() -> {
elasticsearchTemplate.save(data, current_index);
// });

// executor.submit(() -> {
// redisRepository.saveAll(data);
// });
List<IndexQuery> indexQueries = new ArrayList<>();
for (Product p : data) {
IndexQuery query = new IndexQueryBuilder()
.withId(String.valueOf(p.getId()))
.withObject(p)
.build();
indexQueries.add(query);
}
elasticsearchOperations.bulkIndex(indexQueries,current_index);
}

public void storeNoCache(Collection<Product> data) {
logger.info("Indexing without caching {} products", data.size());

elasticsearchTemplate.save(data, current_index);

}


public void forceIndex(Product data) {
logger.info("Indexing product {}", data.gtin());

// executor.submit(() -> {
elasticsearchTemplate.save(data, current_index);
elasticsearchOperations.save(data, current_index);
// });

// executor.submit(() -> {
Expand Down Expand Up @@ -460,7 +458,7 @@ public Product getById(final Long productId) throws ResourceNotFoundException {
if (null == result) {
// Fail, getting from elastic
logger.info("Cache miss, getting product {} from elastic", productId);
result = elasticsearchTemplate.get(String.valueOf(productId), Product.class);
result = elasticsearchOperations.get(String.valueOf(productId), Product.class);

if (null == result) {
throw new ResourceNotFoundException("Product '" + productId + "' does not exists");
Expand Down Expand Up @@ -535,7 +533,7 @@ public Map<String, Product> multiGetById( final Collection<Long> ids)

NativeQuery query = new NativeQueryBuilder().withIds(missingIds).build();

elasticsearchTemplate.multiGet(query, Product.class,current_index )
elasticsearchOperations.multiGet(query, Product.class,current_index )
.stream().map(MultiGetItem::getItem)
.filter(Objects::nonNull)
.forEach(e -> ret.put(e.gtin(), e));
Expand Down Expand Up @@ -563,19 +561,19 @@ public Map<String, Product> multiGetById( final Collection<Long> ids)

@Cacheable(keyGenerator = CacheConstants.KEY_GENERATOR, cacheNames = CacheConstants.ONE_HOUR_LOCAL_CACHE_NAME)
public Long countMainIndex() {
return elasticsearchTemplate.count(Query.findAll(), current_index);
return elasticsearchOperations.count(Query.findAll(), current_index);
}

@Cacheable(keyGenerator = CacheConstants.KEY_GENERATOR, cacheNames = CacheConstants.ONE_HOUR_LOCAL_CACHE_NAME)
public Long countMainIndexHavingRecentPrices() {
CriteriaQuery query = new CriteriaQuery(getRecentPriceQuery());
return elasticsearchTemplate.count(query, current_index);
return elasticsearchOperations.count(query, current_index);
}

@Cacheable(keyGenerator = CacheConstants.KEY_GENERATOR, cacheNames = CacheConstants.ONE_HOUR_LOCAL_CACHE_NAME)
public Long countMainIndexHavingRecentUpdate() {
CriteriaQuery query = new CriteriaQuery(getRecentPriceQuery());
return elasticsearchTemplate.count(query, current_index);
return elasticsearchOperations.count(query, current_index);
}


Expand Down Expand Up @@ -630,37 +628,37 @@ public void bulkUpdateDocument(Collection<ProductPartialUpdateHolder> partialIte
.collect(Collectors.toList());

// Perform the bulk update
elasticsearchTemplate.bulkUpdate(updateQueries, current_index);
}

/**
* Bulk update, using script
* @param partialItemsResults
*/
public void bulkUpdateScript(Set<ProductPartialUpdateHolder> partialItemsResults) {
// Prepare a list to hold the update queries
List<UpdateQuery> updateQueries = partialItemsResults.stream()
.map(product -> {
// Script to iterate over the map and update the fields in _source
String script = "for (entry in params.fieldsToUpdate.entrySet()) { ctx._source[entry.getKey()] = entry.getValue(); }";

// Pass the updated fields as parameters
Map<String, Object> params = new HashMap<>();
params.put("fieldsToUpdate", product.getChanges());

// Create and return the UpdateQuery with the script and parameters
return UpdateQuery.builder(String.valueOf(product.getProductId()))
.withScript(script)
.withParams(params)
.withIndex(current_index.getIndexName()) // Use the current index
.build();
})
.collect(Collectors.toList());

// Perform the bulk update
elasticsearchTemplate.bulkUpdate(updateQueries, current_index);
elasticsearchOperations.bulkUpdate(updateQueries, current_index);
}

// /**
// * Bulk update, using script
// * @param partialItemsResults
// */
// public void bulkUpdateScript(Set<ProductPartialUpdateHolder> partialItemsResults) {
// // Prepare a list to hold the update queries
// List<UpdateQuery> updateQueries = partialItemsResults.stream()
// .map(product -> {
// // Script to iterate over the map and update the fields in _source
// String script = "for (entry in params.fieldsToUpdate.entrySet()) { ctx._source[entry.getKey()] = entry.getValue(); }";
//
// // Pass the updated fields as parameters
// Map<String, Object> params = new HashMap<>();
// params.put("fieldsToUpdate", product.getChanges());
//
// // Create and return the UpdateQuery with the script and parameters
// return UpdateQuery.builder(String.valueOf(product.getProductId()))
// .withScript(script)
// .withParams(params)
// .withIndex(current_index.getIndexName()) // Use the current index
// .build();
// })
// .collect(Collectors.toList());
//
// // Perform the bulk update
// elasticsearchOperations.bulkUpdate(updateQueries, current_index);
// }
//


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void run() {

service.bulkUpdateDocument(buffer.values());

logger.warn ("{} has indexed {} products. {} Remaining in queue",workerName, buffer.size(), service.getFullProductQueue().size());
logger.warn ("{} has indexed {} products. {} Remaining in queue",workerName, buffer.size(), service.getPartialProductQueue().size());

} else {
try {
Expand Down

0 comments on commit eefe7f1

Please sign in to comment.