Skip to content

Commit

Permalink
Removing dedup (probable cause of deadlocks)
Browse files Browse the repository at this point in the history
  • Loading branch information
goulven authored and goulven committed Oct 10, 2024
1 parent e7648bf commit 55d0c6c
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ public void aggregateAndstore(Collection<DataFragment> buffer) {
try {
// Retrieving datafragments
Map<String, Product> aggDatas = aggregatedDataRepository.multiGetById(

buffer.stream()
.map(e -> Long.valueOf(e.gtin()))
.toList());
Expand Down Expand Up @@ -234,13 +233,14 @@ public void aggregateAndstore(Collection<DataFragment> buffer) {
// Saving the result

if (fullItemsResults.size() > 0) {
logger.warn("Will submit {} full products for indexation (datafragment queue size is now {})", fullItemsResults.size(),queue.size());
aggregatedDataRepository.addToFullindexationQueue(fullItemsResults);
logger.warn("Submitted {} full products for indexation (datafragment queue size is now {})", fullItemsResults.size(),queue.size());
}


if (partialItemsResults.size() > 0) {
// TODO(p1,perf) : Should use a thread pool
logger.warn("Will submit {} partial products for indexation (datafragment queue size is now {})", partialItemsResults.size(), queue.size());
aggregatedDataRepository.addToPartialIndexationQueue(partialItemsResults);
logger.warn("Submitted {} partial products for indexation (datafragment queue size is now {})", partialItemsResults.size(), queue.size());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.open4goods.commons.store.repository;

import java.util.HashMap;
import java.util.Map;
import java.util.HashSet;
import java.util.Set;

import org.open4goods.commons.dao.ProductRepository;
import org.open4goods.commons.model.product.Product;
Expand Down Expand Up @@ -58,22 +58,15 @@ public void run() {

if (itemsToTake > 0) {
// There is data to consume and queue consummation is enabled
// A map to deduplicate --> MEANS WE CAN SOMETIMES LOOSE DATAFRAMENTS IF 2 ENTRIES ARE IN THE SAME BAG (no because we put back in queue)
final Map<String,Product> buffer = new HashMap<>();
final Set<Product> buffer = new HashSet<>();

// Dequeuing
for (int i = 0; i < itemsToTake; i++) {
Product item = service.getFullProductQueue().take();

if (buffer.containsKey(item.gtin())) {
logger.info("Putting back in queue : {}", item.gtin() );
service.getFullProductQueue().put(item);
} else {
buffer.put(item.gtin(),item);
}
buffer.add(item);
}

service.store(buffer.values());
service.store(buffer);

logger.warn ("{} has indexed {} products. {} Remaining in queue",workerName, buffer.size(), service.getFullProductQueue().size());

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package org.open4goods.commons.store.repository;

import java.util.HashMap;
import java.util.Map;
import java.util.HashSet;
import java.util.Set;

import org.open4goods.commons.dao.ProductRepository;
import org.open4goods.commons.model.product.Product;
import org.open4goods.commons.model.product.ProductPartialUpdateHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -60,21 +59,15 @@ public void run() {
if (itemsToTake > 0) {
// There is data to consume and queue consummation is enabled
// A map to deduplicate --> MEANS WE CAN SOMETIMES LOOSE DATAFRAMENTS IF 2 ENTRIES ARE IN THE SAME BAG (no because we put back in queue)
final Map<Long,ProductPartialUpdateHolder> buffer = new HashMap<>();
final Set<ProductPartialUpdateHolder> buffer = new HashSet<>();

// Dequeuing
for (int i = 0; i < itemsToTake; i++) {
ProductPartialUpdateHolder item = service.getPartialProductQueue().take();

if (buffer.containsKey(item.getProductId())) {
logger.info("Putting back in queue : {}", item.getProductId() );
service.getPartialProductQueue().put(item);
} else {
buffer.put(item.getProductId(),item);
}
buffer.add(item);
}

service.bulkUpdateDocument(buffer.values());
service.bulkUpdateDocument(buffer);

logger.warn ("{} has indexed {} products. {} Remaining in queue",workerName, buffer.size(), service.getPartialProductQueue().size());

Expand Down

0 comments on commit 55d0c6c

Please sign in to comment.