Skip to content

Commit

Permalink
Merge pull request #434 from open4good/opendata-fix
Browse files Browse the repository at this point in the history
Opendata Refactor
  • Loading branch information
GoulvenF authored Oct 18, 2024
2 parents 74137c4 + b0f0107 commit 3fc196b
Show file tree
Hide file tree
Showing 13 changed files with 946 additions and 372 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.open4goods.model.BarcodeType;
import org.open4goods.commons.config.yml.IndexationConfig;
import org.open4goods.commons.config.yml.ui.VerticalConfig;
import org.open4goods.commons.exceptions.ResourceNotFoundException;
Expand Down Expand Up @@ -63,10 +64,10 @@ public class ProductRepository {

// The file queue implementation for Full products (no partial updates)
private BlockingQueue<Product> fullProductQueue;

// The file queue implementation for Full products (no partial updates)
private BlockingQueue<ProductPartialUpdateHolder> partialProductQueue;



/**
Expand All @@ -83,32 +84,32 @@ public class ProductRepository {
private @Autowired ElasticsearchOperations elasticsearchOperations;

private @Autowired SerialisationService serialisationService;

// private @Autowired RedisProductRepository redisRepository;

// private @Autowired RedisOperations<String, Product> redisRepo;

public ProductRepository() {


}

// private ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();

//TODO(p3,perf) : Virtual threads, but ko with visualVM profiling
public ProductRepository(IndexationConfig indexationConfig) {

this.fullProductQueue = new LinkedBlockingQueue<>(indexationConfig.getProductsQueueMaxSize());
this.partialProductQueue = new LinkedBlockingQueue<>(indexationConfig.getPartialProductsQueueMaxSize());

for (int i = 0; i < indexationConfig.getProductWorkers(); i++) {
new Thread((new FullProductIndexationWorker(this, indexationConfig.getProductsbulkPageSize(), indexationConfig.getPauseDuration(),"full-products-worker-"+i))).start();
}

for (int i = 0; i < indexationConfig.getPartialProductWorkers(); i++) {
new Thread((new PartialProductIndexationWorker(this, indexationConfig.getPartialProductsbulkPageSize(), indexationConfig.getPauseDuration(),"partial-products-worker-"+i))).start();
}

}

/**
Expand Down Expand Up @@ -149,19 +150,33 @@ public Stream<Product> exportAll() {
.map(SearchHit::getContent);
}



public Stream<Product> exportAll(String vertical) {

Criteria c = new Criteria("vertical").is(vertical);

final NativeQuery initialQuery = new NativeQueryBuilder()
.withQuery(new CriteriaQuery(c)).build();

return elasticsearchOperations.searchForStream(initialQuery, Product.class, current_index).stream()
.map(SearchHit::getContent);
}

/**
* Export all aggregated data, corresponding to the given Barcodes
*
* @return
*/
public Stream<Product> exportAll(BarcodeType... barcodeTypes) {

Criteria criteria = new Criteria("gtinInfos.upcType").in((Object[]) barcodeTypes);
CriteriaQuery query = new CriteriaQuery(criteria);

return elasticsearchOperations.searchForStream(query, Product.class, current_index).stream()
.map(SearchHit::getContent);
}


public Stream<Product> searchInValidPrices(String query, final String indexName, int from, int to) {

Expand Down Expand Up @@ -267,24 +282,24 @@ public Stream<Product> exportVerticalWithValidDateOrderByEcoscore(String vertica
return elasticsearchOperations.searchForStream(initialQuery, Product.class, current_index).stream().map(SearchHit::getContent);
}


public Stream<Product> getAllHavingVertical() {
Criteria c = new Criteria("vertical").exists()
;


NativeQueryBuilder initialQueryBuilder = new NativeQueryBuilder().withQuery(new CriteriaQuery(c));
initialQueryBuilder = initialQueryBuilder.withSort(Sort.by(org.springframework.data.domain.Sort.Order.desc("scores.ECOSCORE.value")));

initialQueryBuilder = initialQueryBuilder.withSort(Sort.by(org.springframework.data.domain.Sort.Order.desc("scores.ECOSCORE.value")));

NativeQuery initialQuery = initialQueryBuilder.build();

return elasticsearchOperations.searchForStream(initialQuery, Product.class, current_index).stream().map(SearchHit::getContent);

}



/**
* Export all aggregateddatas for a vertical, ordered by ecoscore descending
*
Expand All @@ -306,9 +321,9 @@ public SearchHits<Product> search(Query query, final String indexName) {

}




// /**
// * Index an Product
// *
Expand Down Expand Up @@ -370,15 +385,15 @@ public void index(final Product p) {
public void addToFullindexationQueue(Collection<Product> data) {

logger.info("Queuing {} products", data.size());

data.forEach(e -> {

try {
fullProductQueue.put(e) ;
} catch (Exception e1) {
logger.error("!!!! exception, cannot enqueue product {}",e);
}

});
}

Expand All @@ -399,7 +414,7 @@ public void addToPartialIndexationQueue(Collection<ProductPartialUpdateHolder> d

public void store(Collection<Product> data) {
logger.info("Indexing {} products", data.size());

List<IndexQuery> indexQueries = data.stream()
.map(p -> new IndexQueryBuilder()
.withId(String.valueOf(p.getId()))
Expand All @@ -410,8 +425,8 @@ public void store(Collection<Product> data) {
elasticsearchOperations.bulkIndex(indexQueries, current_index);
}



public void forceIndex(Product data) {
logger.info("Indexing product {}", data.gtin());

Expand Down Expand Up @@ -447,9 +462,9 @@ public Product getById(final Long productId) throws ResourceNotFoundException {
// try {
// result = redisRepository.findById(productId).orElseThrow(ResourceNotFoundException::new);
// } catch (ResourceNotFoundException e) {
//
//
// result = null;
//
//
// } catch (Exception e) {
// logger.error("Error getting product {} from redis", productId, e);
// result = null;
Expand Down Expand Up @@ -522,7 +537,7 @@ public Map<String, Product> multiGetById( final Collection<Long> ids)
// ret.put(e.gtin(), e);
// }
// });
//
//
// Getting the one we don't have in redis from elastic
Set<String> missingIds = ids.stream().filter(e -> !ret.containsKey(e)).map(e-> String.valueOf(e)) .collect(Collectors.toSet());
logger.info("redis hits : {}, missing : {}, queue size : {}",ret.size(), missingIds.size(),fullProductQueue.size());
Expand Down Expand Up @@ -576,6 +591,12 @@ public Long countMainIndexHavingRecentUpdate() {
return elasticsearchOperations.count(query, current_index);
}

@Cacheable(cacheNames = CacheConstants.ONE_DAY_LOCAL_CACHE_NAME)
public long countItemsByBarcodeType(BarcodeType... barcodeTypes) {
Criteria criteria = new Criteria("gtinInfos.upcType").in((Object[]) barcodeTypes);
CriteriaQuery query = new CriteriaQuery(criteria);
return elasticsearchOperations.count(query, current_index);
}

@Cacheable(keyGenerator = CacheConstants.KEY_GENERATOR, cacheNames = CacheConstants.ONE_HOUR_LOCAL_CACHE_NAME)
public Map<Integer, Long> byTaxonomy() {
Expand Down Expand Up @@ -630,7 +651,7 @@ public void bulkUpdateDocument(Collection<ProductPartialUpdateHolder> partialIte
// Perform the bulk update
elasticsearchOperations.bulkUpdate(updateQueries, current_index);
}

// /**
// * Bulk update, using script
// * @param partialItemsResults
Expand Down Expand Up @@ -658,7 +679,7 @@ public void bulkUpdateDocument(Collection<ProductPartialUpdateHolder> partialIte
// // Perform the bulk update
// elasticsearchOperations.bulkUpdate(updateQueries, current_index);
// }
//
//


/**
Expand All @@ -667,7 +688,7 @@ public void bulkUpdateDocument(Collection<ProductPartialUpdateHolder> partialIte
*/
public Criteria getRecentPriceQuery() {
return getRecentProducts().and(new Criteria("offersCount").greaterThan(0));

}

/**
Expand All @@ -677,7 +698,7 @@ public Criteria getRecentPriceQuery() {
public Criteria getRecentProducts() {
return new Criteria("lastChange").greaterThan(expirationClause());
}

/**
*
* @return Criteria representing the valid dates
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
package org.open4goods.commons.model;

public enum BarcodeType {
ISBN_13, GTIN_13, GTIN_8, GTIN_12, GTIN_14, UNKNOWN,
ISBN_13,
GTIN_13,
GTIN_8,
GTIN_12,
GTIN_14,
UNKNOWN,


}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class AggregatedAttributes {
private Map<ReferentielKey, String> referentielAttributes = new HashMap<>();

//TODO: rename

private Map<String,AggregatedAttribute> aggregatedAttributes = new HashMap<>();


Expand Down Expand Up @@ -96,4 +96,13 @@ public Map<ReferentielKey, String> getReferentielAttributes() {
public void setReferentielAttributes(Map<ReferentielKey, String> referentielAttributes) {
this.referentielAttributes = referentielAttributes;
}









}
12 changes: 12 additions & 0 deletions commons/src/main/java/org/open4goods/model/BarcodeType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.open4goods.model;

public enum BarcodeType {
ISBN_13,
GTIN_13,
GTIN_8,
GTIN_12,
GTIN_14,
UNKNOWN,


}
8 changes: 4 additions & 4 deletions ui/src/main/java/org/open4goods/ui/config/AppConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,11 @@ IcecatService icecatFeatureService(UiConfig properties, RemoteFileCachingService
// TODO : xmlMapper not injected because corruct the springdoc used one. Should use a @Primary derivation
return new IcecatService(new XmlMapper(), properties.getIcecatFeatureConfig(), fileCachingService, properties.getRemoteCachingFolder(), brandService, verticalConfigService);
}


@Bean
OpenDataService openDataService(@Autowired ProductRepository aggregatedDataRepository, @Autowired UiConfig props) {
return new OpenDataService(aggregatedDataRepository, props);
OpenDataService openDataService(@Autowired ProductRepository aggregatedDataRepository, @Autowired UiConfig props, @Autowired OpenDataConfig openDataConfig) {
return new OpenDataService(aggregatedDataRepository, props, openDataConfig);
}


Expand Down
28 changes: 28 additions & 0 deletions ui/src/main/java/org/open4goods/ui/config/OpenDataConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.open4goods.ui.config;

import org.springframework.context.annotation.Configuration;

@Configuration
public class OpenDataConfig {

private int downloadSpeedKb;
private int concurrentDownloads;

public int getDownloadSpeedKb() {
return downloadSpeedKb;
}

public void setDownloadSpeedKb(int downloadSpeedKb) {
this.downloadSpeedKb = downloadSpeedKb;
}

public int getConcurrentDownloads() {
return concurrentDownloads;
}

public void setConcurrentDownloads(int concurrentDownloads) {
this.concurrentDownloads = concurrentDownloads;
}


}
15 changes: 13 additions & 2 deletions ui/src/main/java/org/open4goods/ui/config/yml/UiConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,21 @@ public File openDataFile() {
return new File(rootFolder + File.separator+"opendata"+File.separator+"full.zip");
}

public File tmpOpenDataFile() {
return new File(rootFolder + File.separator+"opendata"+File.separator+"full-tmp.zip");
public File isbnZipFile() {
return new File(rootFolder + File.separator + "opendata" + File.separator + "isbn.zip");
}

public File tmpIsbnZipFile() {
return new File(rootFolder + File.separator + "opendata" + File.separator + "isbn-tmp.zip");
}

public File gtinZipFile() {
return new File(rootFolder + File.separator + "opendata" + File.separator + "gtin.zip");
}

public File tmpGtinZipFile() {
return new File(rootFolder + File.separator + "opendata" + File.separator + "gtin-tmp.zip");
}

public String getRootFolder() {
return rootFolder;
Expand Down
Loading

0 comments on commit 3fc196b

Please sign in to comment.