diff --git a/api/src/main/java/org/open4goods/api/services/store/DataFragmentStoreService.java b/api/src/main/java/org/open4goods/api/services/store/DataFragmentStoreService.java index 58e448ef4..d3686da38 100644 --- a/api/src/main/java/org/open4goods/api/services/store/DataFragmentStoreService.java +++ b/api/src/main/java/org/open4goods/api/services/store/DataFragmentStoreService.java @@ -174,7 +174,6 @@ public void aggregateAndstore(Collection buffer) { try { // Retrieving datafragments Map aggDatas = aggregatedDataRepository.multiGetById( - buffer.stream() .map(e -> Long.valueOf(e.gtin())) .toList()); @@ -234,13 +233,14 @@ public void aggregateAndstore(Collection buffer) { // Saving the result if (fullItemsResults.size() > 0) { + logger.warn("Will submit {} full products for indexation (datafragment queue size is now {})", fullItemsResults.size(),queue.size()); aggregatedDataRepository.addToFullindexationQueue(fullItemsResults); logger.warn("Submitted {} full products for indexation (datafragment queue size is now {})", fullItemsResults.size(),queue.size()); } if (partialItemsResults.size() > 0) { - // TODO(p1,perf) : Should use a thread pool + logger.warn("Will submit {} partial products for indexation (datafragment queue size is now {})", partialItemsResults.size(), queue.size()); aggregatedDataRepository.addToPartialIndexationQueue(partialItemsResults); logger.warn("Submitted {} partial products for indexation (datafragment queue size is now {})", partialItemsResults.size(), queue.size()); } diff --git a/commons/src/main/java/org/open4goods/commons/store/repository/FullProductIndexationWorker.java b/commons/src/main/java/org/open4goods/commons/store/repository/FullProductIndexationWorker.java index 9e60b8d98..ffd11fc33 100644 --- a/commons/src/main/java/org/open4goods/commons/store/repository/FullProductIndexationWorker.java +++ b/commons/src/main/java/org/open4goods/commons/store/repository/FullProductIndexationWorker.java @@ -1,7 +1,7 @@ package org.open4goods.commons.store.repository; -import java.util.HashMap; -import java.util.Map; +import java.util.HashSet; +import java.util.Set; import org.open4goods.commons.dao.ProductRepository; import org.open4goods.commons.model.product.Product; @@ -58,22 +58,15 @@ public void run() { if (itemsToTake > 0) { // There is data to consume and queue consummation is enabled - // A map to deduplicate --> MEANS WE CAN SOMETIMES LOOSE DATAFRAMENTS IF 2 ENTRIES ARE IN THE SAME BAG (no because we put back in queue) - final Map buffer = new HashMap<>(); + final Set buffer = new HashSet<>(); // Dequeuing for (int i = 0; i < itemsToTake; i++) { Product item = service.getFullProductQueue().take(); - - if (buffer.containsKey(item.gtin())) { - logger.info("Putting back in queue : {}", item.gtin() ); - service.getFullProductQueue().put(item); - } else { - buffer.put(item.gtin(),item); - } + buffer.add(item); } - service.store(buffer.values()); + service.store(buffer); logger.warn ("{} has indexed {} products. {} Remaining in queue",workerName, buffer.size(), service.getFullProductQueue().size()); diff --git a/commons/src/main/java/org/open4goods/commons/store/repository/PartialProductIndexationWorker.java b/commons/src/main/java/org/open4goods/commons/store/repository/PartialProductIndexationWorker.java index e7c9b0f3a..86a0d9db6 100644 --- a/commons/src/main/java/org/open4goods/commons/store/repository/PartialProductIndexationWorker.java +++ b/commons/src/main/java/org/open4goods/commons/store/repository/PartialProductIndexationWorker.java @@ -1,10 +1,9 @@ package org.open4goods.commons.store.repository; -import java.util.HashMap; -import java.util.Map; +import java.util.HashSet; +import java.util.Set; import org.open4goods.commons.dao.ProductRepository; -import org.open4goods.commons.model.product.Product; import org.open4goods.commons.model.product.ProductPartialUpdateHolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,21 +59,15 @@ public void run() { if (itemsToTake > 0) { // There is data to consume and queue consummation is enabled // A map to deduplicate --> MEANS WE CAN SOMETIMES LOOSE DATAFRAMENTS IF 2 ENTRIES ARE IN THE SAME BAG (no because we put back in queue) - final Map buffer = new HashMap<>(); + final Set buffer = new HashSet<>(); // Dequeuing for (int i = 0; i < itemsToTake; i++) { ProductPartialUpdateHolder item = service.getPartialProductQueue().take(); - - if (buffer.containsKey(item.getProductId())) { - logger.info("Putting back in queue : {}", item.getProductId() ); - service.getPartialProductQueue().put(item); - } else { - buffer.put(item.getProductId(),item); - } + buffer.add(item); } - service.bulkUpdateDocument(buffer.values()); + service.bulkUpdateDocument(buffer); logger.warn ("{} has indexed {} products. {} Remaining in queue",workerName, buffer.size(), service.getPartialProductQueue().size());