Skip to content

Commit

Permalink
Refactor Elasticsearch repository and make it wait until the server r…
Browse files Browse the repository at this point in the history
…eady
  • Loading branch information
aecio committed Jan 7, 2021
1 parent 096ea83 commit 7577abc
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 68 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- Reorganized gradle module directory structure
- Rename root package to 'achecrawler'
- Use multi-stage build to reduce Docker image size
- Refactor Elasticsearch repository and make it wait until the server ready


## Version 0.12.0 (2020-01-18)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,17 @@

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig;
import focusedCrawler.target.repository.elasticsearch.ElasticSearchClientFactory;
import org.apache.http.entity.AbstractHttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -50,7 +43,7 @@ public class ElasticSearchRestTargetRepository implements TargetRepository {
public ElasticSearchRestTargetRepository(ElasticSearchConfig config) {
this.indexName = config.getIndexName();
this.typeName = config.getTypeName();
this.client = createRestClient(config);
this.client = ElasticSearchClientFactory.createClient(config);
this.createIndexMapping(indexName);
}

Expand Down Expand Up @@ -186,38 +179,6 @@ private String serializeAsJson(Object model) {
return targetAsJson;
}

public RestClient createRestClient(ElasticSearchConfig config) {

List<String> esHosts = config.getRestApiHosts();
List<HttpHost> hosts = new ArrayList<>();
for (String host : esHosts) {
try {
URL url = new URL(host);
hosts.add(new HttpHost(url.getHost(), url.getPort()));
} catch (MalformedURLException e) {
throw new RuntimeException("Failed to initialize Elasticsearch REST client. "
+ "Invalid host: " + host, e);
}
}

HttpHost[] httpHostsArray = (HttpHost[]) hosts.toArray(new HttpHost[hosts.size()]);

client = RestClient.builder(httpHostsArray)
.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
@Override
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
return requestConfigBuilder
.setConnectTimeout(config.getRestConnectTimeout())
.setSocketTimeout(config.getRestSocketTimeout());
}
})
.setMaxRetryTimeoutMillis(config.getRestMaxRetryTimeoutMillis())
.build();

logger.info("Initialized Elasticsearch REST client for hosts: "+Arrays.toString(httpHostsArray));
return client;
}

@Override
public void close() {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package achecrawler.target.repository.elasticsearch;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig.Builder;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder.RequestConfigCallback;
import org.elasticsearch.client.RestClientBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -20,21 +23,58 @@ public static RestClient createClient(ElasticSearchConfig config) {

HttpHost[] httpHosts = parseHostAddresses(config.getRestApiHosts());

RestClient client = RestClient.builder(httpHosts)
.setRequestConfigCallback(new RequestConfigCallback() {
@Override
public Builder customizeRequestConfig(
Builder requestConfigBuilder) {
return requestConfigBuilder
.setConnectTimeout(config.getRestConnectTimeout())
.setSocketTimeout(config.getRestSocketTimeout());
}
})
.setMaxRetryTimeoutMillis(config.getRestMaxRetryTimeoutMillis())
.build();

logger.info("Initialized Elasticsearch REST client for: " + Arrays.toString(httpHosts));
return client;
final RestClientBuilder builder = RestClient.builder(httpHosts)
.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder
.setConnectTimeout(config.getRestConnectTimeout())
.setSocketTimeout(config.getRestSocketTimeout())
)
.setMaxRetryTimeoutMillis(config.getRestMaxRetryTimeoutMillis());

final int connectionTimeout = config.getRestClientInitialConnectionTimeout();
final long start = System.currentTimeMillis();
int attempts = 0;
RestClient client = null;
while(true) {
try {
attempts++;
if (client == null) {
client = builder.build();
logger.info("Initialized Elasticsearch REST client for hosts: " + Arrays.toString(httpHosts));
}
if (client != null) {
checkRestApi(client);
}
return client;
} catch (Exception e) {
long elapsed = System.currentTimeMillis() - start;
if (elapsed > connectionTimeout) {
String msg = String.format("Failed to connect to Elasticsearch server after %d retries", attempts);
throw new IllegalStateException(msg, e);
}
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException ex) {
throw new IllegalStateException("Interrupted while trying to connect to Elasticsearch server.", ex);
}
if (attempts % 10 == 0) {
logger.info("Failed to connect to Elasticsearch server (failed attempts: {}). Retrying...", attempts);
}
}
}
}

private static void checkRestApi(RestClient client) {
String rootEndpoint = "/";
try {
Response response = client.performRequest("GET", rootEndpoint);
final int statusCode = response.getStatusLine().getStatusCode();
logger.info(response.getEntity().toString());
if(statusCode != 200) {
throw new IllegalStateException("Cluster returned non-OK status code: " + statusCode);
}
} catch (IOException e) {
throw new IllegalStateException("Failed to issue request to Elasticsearch REST API.", e);
}
}

private static HttpHost[] parseHostAddresses(List<String> esHosts) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ public class ElasticSearchConfig {
@JsonProperty("target_storage.data_format.elasticsearch.rest.max_retry_timeout_millis")
private int restMaxRetryTimeoutMillis = 60000;

@JsonProperty("target_storage.data_format.elasticsearch.rest.initial_connection_timeout")
private int restClientInitialConnectionTimeout = 30000;

//
// Index and type parameters
//
Expand Down Expand Up @@ -68,4 +71,11 @@ public void setTypeName(String typeName) {
this.typeName = typeName;
}

public int getRestClientInitialConnectionTimeout() {
return restClientInitialConnectionTimeout;
}

public void setRestClientInitialConnectionTimeout(int restClientInitialConnectionTimeout) {
this.restClientInitialConnectionTimeout = restClientInitialConnectionTimeout;
}
}
2 changes: 1 addition & 1 deletion config/config_bipartite/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: '2'
services:
ache:
image: vidanyu/ache
entrypoint: sh -c 'sleep 10 && /ache/bin/ache startCrawl -c /config/ -s /config/seeds.txt -o /data -e crawl-data'
command: startCrawl -c /config/ -s /config/seeds.txt -o /data -e crawl-data
ports:
- "8080:8080"
volumes:
Expand Down
2 changes: 1 addition & 1 deletion config/config_docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: '2'
services:
ache:
image: vidanyu/ache
entrypoint: sh -c 'sleep 10 && /ache/bin/ache startCrawl -c /config/ -s /config/docker.seeds -o /data -e crawl-data'
command: startCrawl -c /config/ -s /config/docker.seeds -o /data -e crawl-data
ports:
- "8080:8080"
volumes:
Expand Down
2 changes: 1 addition & 1 deletion config/config_docker_tor/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: '2'
services:
ache:
image: vidanyu/ache
entrypoint: sh -c 'sleep 10 && /ache/bin/ache startCrawl -c /config/ -s /config/tor.seeds -o /data -e tor'
command: startCrawl -c /config/ -s /config/tor.seeds -o /data -e tor
ports:
- "8080:8080"
volumes:
Expand Down
2 changes: 1 addition & 1 deletion config/config_kafka/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: '2'
services:
ache:
image: vidanyu/ache
entrypoint: sh -c '/ache/bin/ache startCrawl -c /config/ -s /config/seeds.txt -o /data'
command: startCrawl -c /config/ -s /config/seeds.txt -o /data
ports:
- "8080:8080"
volumes:
Expand Down
2 changes: 1 addition & 1 deletion config/config_nginx/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: '2'
services:
ache:
image: vidanyu/ache
entrypoint: sh -c 'sleep 10 && /ache/bin/ache startServer -c /config/ -d /data'
command: startServer -c /config/ -d /data
ports:
- "8080:8080"
volumes:
Expand Down
10 changes: 5 additions & 5 deletions config/config_server/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ version: '2'
services:
ache:
image: vidanyu/ache
entrypoint: sh -c 'sleep 10 && /ache/bin/ache startServer -c /config -d /data'
command: startServer -c /config -d /data
ports:
- "8080:8080"
- "8080:8080"
volumes:
- ./data-ache/:/data
- ./:/config
- ./data-ache/:/data
- ./:/config
links:
- elasticsearch
- elasticsearch
elasticsearch:
image: elasticsearch:2.4.5
environment:
Expand Down

0 comments on commit 7577abc

Please sign in to comment.