1

I would like to ask about the Elasticsearch Bulk API

This is my code for using Bulk API

public void bulkInsert(String index, ArrayList<String> jsonList) throws IOException {
    BulkRequest request = new BulkRequest(); 

    for(String json: jsonList){
        if(json != null&& !json.isEmpty()){
            request.add(new IndexRequest(index)  
                    .source(json, XContentType.JSON));  
        }
    }

    BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
    for (BulkItemResponse bulkItemResponse : bulkResponse) { 
        DocWriteResponse itemResponse = bulkItemResponse.getResponse(); 

        switch (bulkItemResponse.getOpType()) {
        case INDEX:    
        case CREATE:
            IndexResponse indexResponse = (IndexResponse) itemResponse;
            break;
        case UPDATE:   
            UpdateResponse updateResponse = (UpdateResponse) itemResponse;
            break;
        case DELETE:   
            DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
        }
    }
    if (bulkResponse.hasFailures()) { 
        for (BulkItemResponse bulkItemResponse : bulkResponse) {
            if (bulkItemResponse.isFailed()) { 
                BulkItemResponse.Failure failure =
                        bulkItemResponse.getFailure(); 

                System.out.println("failed: " + failure.getId());

            }
        }
    }
}

I have encountered the timeout exception as my records have got 800k. java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection http-outgoing-16 [ACTIVE]

I tried to break up the jsonList that I passed in, but sometime will have the same error.

I am currently using Elasticsearch 7.6.2 version.

The exception trace

java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection http-outgoing-16 [ACTIVE] at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:808) at org.elasticsearch.client.RestClient.performRequest(RestClient.java:248) at org.elasticsearch.client.RestClient.performRequest(RestClient.java:235) at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1514) at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1484) at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1454) at org.elasticsearch.client.RestHighLevelClient.bulk(RestHighLevelClient.java:497) at com.ESUtil.bulkInsert(ESUtil.java:110) at org.download.App1.main(App1.java:167) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at org.eclipse.jdt.internal.jarinjarloader.JarRsrcLoader.main(JarRsrcLoader.java:58) Caused by: java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection http-outgoing-16 [ACTIVE] at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387) at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92) at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39) at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175) at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:261) at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:502) at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:211) at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280) at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591) at java.lang.Thread.run(Unknown Source)

13
  • can you provide the entire exception trace? Commented Apr 28, 2020 at 10:03
  • stackoverflow.com/questions/60518260/… has more details about the timeout and ES code, although your case is different but bottom-line is that Elastic is not able to cope up with your huge data. Commented Apr 28, 2020 at 10:15
  • can you provide how many primary shards, replicas and data nodes you have in your cluster. Commented Apr 28, 2020 at 10:16
  • I tried to break the records into 5000 and send 5000 everytime inside a While loop and every 5000, I will use Thread.sleep for awhile. I am not sure if it is because the first 5000 records have not finish and the next 5000 came in before the 1st 5000 records finish. Is there ways I could check that the 5000 records have all successfully added using the Bulk API? Commented Apr 28, 2020 at 10:21
  • 1
    HiHi, sorry for the late reply. I tried, request.timeout(TimeValue.timeValueMinutes(120)); but it still shows me the error message. I do note that sometime when I didn't run any other programs or use the laptop, the timeout will not occur and all the data will be inserted to the Elasticsearch. If I can't increase the computer performance, is there any other ways I can improve on the code to maybe do it slower? Commented Apr 30, 2020 at 10:26

1 Answer 1

1

As you are using the bulk API and sending a huge amount of data to Elasticsearch and default timeout for the connection is 30 seconds and Elasticsearch isn't able to finish this huge bulk operation in 30 seconds, hence you are getting this exception.

This timeout is normal for huge bulk APIs, and in your case(Indexing specific), you can do below:

Upgrade the infra and speed up indexing speed

Scale your cluster ie add more CPU, memory, better disk, disable refresh_interval(default is 1 sec) to speed up your bulk indexing.

Increase Bulk API timeout duration

As mentioned in official ES doc

request.timeout(TimeValue.timeValueMinutes(2));   --> 2 min timeout
request.timeout("2m");  --> string format of 2 sec.

Edit: As asked in the comment, you can use the sync execution of the bulk API, if you want to immediately check the response of your bulk API, below is quoted from the same doc:

Retrieve the response of the operation (successful or not), can be IndexResponse, UpdateResponse or DeleteResponse which can all be seen as DocWriteResponse instances

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.