1

In spring-batch, is there any support for Elasticsearch ItemReader, using the scan and scroll functionality? I do see this extension, but that's based on the normal spring-data search queries. Would be nice to have one based on the scan and scroll feature, since the batch job mostly would need to process large batch of data. Thanks.

2

3 Answers 3

1

While there is no "native" ItemReader implementation for ElasticSearch, Spring Batch does provide a RepositoryItemReader that wraps a Spring Data PagingAndSortingRepository. With this, you can use a repository definition for ElasticSearch provided by the Spring Data ElasticSearch project.

You can read more about the RepositoryItemReader in the Spring Batch documentation here: http://docs.spring.io/spring-batch/trunk/apidocs/org/springframework/batch/item/data/RepositoryItemReader.html.

You can read more about the Spring Data ElasticSearch project here: http://docs.spring.io/spring-data/elasticsearch/docs/current/reference/html/

Sign up to request clarification or add additional context in comments.

Comments

1
import java.util.Iterator;

import org.springframework.batch.item.data.AbstractPaginatedDataItemReader;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.query.SearchQuery;

public class ElasticsearchItemReader<T> extends AbstractPaginatedDataItemReader<T> {
    private ElasticsearchOperations elasticsearchOperations;
    private final SearchQuery searchQuery;
    private String scrollId;
    private int scrollTimeinMillis = 60000;
    private Class<T> type;

    public ElasticsearchItemReader(
        final ElasticsearchOperations elasticsearchOperations,
        final SearchQuery searchQuery,
        final Class<T> type
    ) {
        this.elasticsearchOperations = elasticsearchOperations;
        this.searchQuery = searchQuery;
        this.type = type;
    }

    @Override
    protected void doOpen() throws Exception {
        scrollId = elasticsearchOperations.scan(searchQuery, scrollTimeinMillis, false);
    }

    @Override
    protected Iterator<T> doPageRead() {
        return elasticsearchOperations.scroll(scrollId, scrollTimeinMillis, type).iterator();
   }
}

Comments

0

I'll give you a higher version of how to write it.

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-batch</artifactId>
   <version>2.4.5</version>
</dependency>

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
   <version>2.4.5</version>
</dependency>

public class ElasticsearchItemReader<T> extends AbstractPaginatedDataItemReader<T> implements InitializingBean {

    private final ElasticsearchRestTemplate elasticsearchRestTemplate;

    private final Query query;

    private final int scrollTimeinMillis = 60000;

    private final Class<T> type;

    private final String index;

    private String scrollId;

    private boolean isFirstCall = true;


    @Override
    @SuppressWarnings("unchecked")
    protected Iterator<T> doPageRead() {
        if (isFirstCall) {
            SearchScrollHits<T> searchHits = elasticsearchRestTemplate.searchScrollStart(scrollTimeinMillis, query, type, IndexCoordinates.of(index));
            scrollId = searchHits.getScrollId();
            List<SearchHit<T>> searchHitList = searchHits.getSearchHits();
            isFirstCall = false;

            return (Iterator<T>) searchHitList.iterator();
        } else {
            SearchHitsImpl<T> searchHits = (SearchHitsImpl) elasticsearchRestTemplate.searchScrollContinue(scrollId, scrollTimeinMillis, type, IndexCoordinates.of(index));
            return (Iterator<T>) searchHits.getSearchHits().iterator();
        }

    }



    public ElasticsearchItemReader(ElasticsearchRestTemplate elasticsearchRestTemplate,
                                   Query query,
                                   Class<T> type,
                                   String index) {
        setName(getShortName(getClass()));
        this.elasticsearchRestTemplate = elasticsearchRestTemplate;
        this.query = query;
        this.type = type;
        this.index = index;
    }


    @Override
    public void afterPropertiesSet() throws Exception {
        state(elasticsearchRestTemplate != null, "An ElasticsearchOperations implementation is required.");
        state(query != null, "A query is required.");
        state(type != null, "A target type to convert the input into is required.");
    }
}

and itemReader code

public ItemReader<SearchHit<T>> reader() {

     BoolQueryBuilder boolQueryBuilder = QueryBuilders
                .boolQuery();

     NativeSearchQuery query = new NativeSearchQueryBuilder()
                .withQuery(boolQueryBuilder)
                .build();


     return new ElasticsearchItemReader(elasticsearchRestTemplate, query, T.class, "*");

    }

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.