In spring-batch, is there any support for Elasticsearch ItemReader, using the scan and scroll functionality? I do see this extension, but that's based on the normal spring-data search queries. Would be nice to have one based on the scan and scroll feature, since the batch job mostly would need to process large batch of data. Thanks.
-
Maybe it is helpful to use Cunk-Processing:docs.spring.io/spring-batch/trunk/reference/html/…sven.kwiotek– sven.kwiotek2015-05-07 08:09:42 +00:00Commented May 7, 2015 at 8:09
-
Any folk from Spring has some suggestion?heyu– heyu2015-05-07 17:07:53 +00:00Commented May 7, 2015 at 17:07
3 Answers
While there is no "native" ItemReader implementation for ElasticSearch, Spring Batch does provide a RepositoryItemReader that wraps a Spring Data PagingAndSortingRepository. With this, you can use a repository definition for ElasticSearch provided by the Spring Data ElasticSearch project.
You can read more about the RepositoryItemReader in the Spring Batch documentation here: http://docs.spring.io/spring-batch/trunk/apidocs/org/springframework/batch/item/data/RepositoryItemReader.html.
You can read more about the Spring Data ElasticSearch project here: http://docs.spring.io/spring-data/elasticsearch/docs/current/reference/html/
Comments
import java.util.Iterator;
import org.springframework.batch.item.data.AbstractPaginatedDataItemReader;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.query.SearchQuery;
public class ElasticsearchItemReader<T> extends AbstractPaginatedDataItemReader<T> {
private ElasticsearchOperations elasticsearchOperations;
private final SearchQuery searchQuery;
private String scrollId;
private int scrollTimeinMillis = 60000;
private Class<T> type;
public ElasticsearchItemReader(
final ElasticsearchOperations elasticsearchOperations,
final SearchQuery searchQuery,
final Class<T> type
) {
this.elasticsearchOperations = elasticsearchOperations;
this.searchQuery = searchQuery;
this.type = type;
}
@Override
protected void doOpen() throws Exception {
scrollId = elasticsearchOperations.scan(searchQuery, scrollTimeinMillis, false);
}
@Override
protected Iterator<T> doPageRead() {
return elasticsearchOperations.scroll(scrollId, scrollTimeinMillis, type).iterator();
}
}
Comments
I'll give you a higher version of how to write it.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
<version>2.4.5</version>
</dependency>
public class ElasticsearchItemReader<T> extends AbstractPaginatedDataItemReader<T> implements InitializingBean {
private final ElasticsearchRestTemplate elasticsearchRestTemplate;
private final Query query;
private final int scrollTimeinMillis = 60000;
private final Class<T> type;
private final String index;
private String scrollId;
private boolean isFirstCall = true;
@Override
@SuppressWarnings("unchecked")
protected Iterator<T> doPageRead() {
if (isFirstCall) {
SearchScrollHits<T> searchHits = elasticsearchRestTemplate.searchScrollStart(scrollTimeinMillis, query, type, IndexCoordinates.of(index));
scrollId = searchHits.getScrollId();
List<SearchHit<T>> searchHitList = searchHits.getSearchHits();
isFirstCall = false;
return (Iterator<T>) searchHitList.iterator();
} else {
SearchHitsImpl<T> searchHits = (SearchHitsImpl) elasticsearchRestTemplate.searchScrollContinue(scrollId, scrollTimeinMillis, type, IndexCoordinates.of(index));
return (Iterator<T>) searchHits.getSearchHits().iterator();
}
}
public ElasticsearchItemReader(ElasticsearchRestTemplate elasticsearchRestTemplate,
Query query,
Class<T> type,
String index) {
setName(getShortName(getClass()));
this.elasticsearchRestTemplate = elasticsearchRestTemplate;
this.query = query;
this.type = type;
this.index = index;
}
@Override
public void afterPropertiesSet() throws Exception {
state(elasticsearchRestTemplate != null, "An ElasticsearchOperations implementation is required.");
state(query != null, "A query is required.");
state(type != null, "A target type to convert the input into is required.");
}
}
and itemReader code
public ItemReader<SearchHit<T>> reader() {
BoolQueryBuilder boolQueryBuilder = QueryBuilders
.boolQuery();
NativeSearchQuery query = new NativeSearchQueryBuilder()
.withQuery(boolQueryBuilder)
.build();
return new ElasticsearchItemReader(elasticsearchRestTemplate, query, T.class, "*");
}