Helpers
Streaming Bulk
- elasticsearch.helpers.streaming_bulk(client, actions, chunk_size=500, max_chunk_bytes=104857600, flush_after_seconds=None, raise_on_error=True, expand_action_callback=<function expand_action>, raise_on_exception=True, max_retries=0, initial_backoff=2, max_backoff=600, yield_ok=True, ignore_status=(), retry_on_status=(429, ), span_name='helpers.streaming_bulk', *args, **kwargs)
Streaming bulk consumes actions from the iterable passed in and yields results per action. For non-streaming usecases use
bulk()which is a wrapper around streaming bulk that returns summary information about the bulk operation once the entire input is consumed and sent.If you specify
max_retriesit will also retry any documents that were rejected with a429status code. Useretry_on_statusto configure which status codes will be retried. To do this it will wait (by calling time.sleep which will block) forinitial_backoffseconds and then, every subsequent rejection for the same chunk, for double the time every time up tomax_backoffseconds.- Parameters:
client (Elasticsearch) – instance of
Elasticsearchto useactions (Iterable[bytes | str | Dict[str, Any] | BulkMeta]) – iterable containing the actions to be executed
chunk_size (int) – number of docs in one chunk sent to es (default: 500)
max_chunk_bytes (int) – the maximum size of the request in bytes (default: 100MB)
flush_after_seconds (float | None) – time in seconds after which a chunk is written even if hasn’t reached chunk_size or max_chunk_bytes. Set to 0 to not use a timeout-based flush. (default: 0)
raise_on_error (bool) – raise
BulkIndexErrorcontaining errors (as .errors) from the execution of the last chunk when some occur. By default we raise.raise_on_exception (bool) – if
Falsethen don’t propagate exceptions from call tobulkand just report the items that failed as failed.expand_action_callback (Callable[[bytes | str | Dict[str, Any]], Tuple[Dict[str, Any], None | bytes | Dict[str, Any]]]) – callback executed on each action passed in, should return a tuple containing the action line and the data line (None if data line should be omitted).
retry_on_status (int | Collection[int]) – HTTP status code that will trigger a retry. (if None is specified only status 429 will retry).
max_retries (int) – maximum number of times a document will be retried when retry_on_status (defaulting to
429) is received, set to 0 (default) for no retriesinitial_backoff (float) – number of seconds we should wait before the first retry. Any subsequent retries will be powers of
initial_backoff * 2**retry_numbermax_backoff (float) – maximum number of seconds a retry will wait
yield_ok (bool) – if set to False will skip successful documents in the output
ignore_status (int | Collection[int]) – list of HTTP status code that you want to ignore
client
actions
chunk_size
max_chunk_bytes
flush_after_seconds
raise_on_error
expand_action_callback
raise_on_exception
max_retries
initial_backoff
max_backoff
yield_ok
ignore_status
retry_on_status
span_name (str)
args (Any)
kwargs (Any)
- Return type:
Parallel Bulk
- elasticsearch.helpers.parallel_bulk(client, actions, thread_count=4, chunk_size=500, max_chunk_bytes=104857600, flush_after_seconds=None, queue_size=4, expand_action_callback=<function expand_action>, ignore_status=(), *args, **kwargs)
Parallel version of the bulk helper run in multiple threads at once.
- Parameters:
client (Elasticsearch) – instance of
Elasticsearchto useactions (Iterable[bytes | str | Dict[str, Any]]) – iterator containing the actions
thread_count (int) – size of the threadpool to use for the bulk requests
chunk_size (int) – number of docs in one chunk sent to es (default: 500)
max_chunk_bytes (int) – the maximum size of the request in bytes (default: 100MB)
flush_after_seconds (float | None) – time in seconds after which a chunk is written even if hasn’t reached chunk_size or max_chunk_bytes. Set to 0 to not use a timeout-based flush. (default: 0)
raise_on_error – raise
BulkIndexErrorcontaining errors (as .errors) from the execution of the last chunk when some occur. By default we raise.raise_on_exception – if
Falsethen don’t propagate exceptions from call tobulkand just report the items that failed as failed.expand_action_callback (Callable[[bytes | str | Dict[str, Any]], Tuple[Dict[str, Any], None | bytes | Dict[str, Any]]]) – callback executed on each action passed in, should return a tuple containing the action line and the data line (None if data line should be omitted).
queue_size (int) – size of the task queue between the main thread (producing chunks to send) and the processing threads.
ignore_status (int | Collection[int]) – list of HTTP status code that you want to ignore
client
actions
thread_count
chunk_size
max_chunk_bytes
flush_after_seconds
queue_size
expand_action_callback
ignore_status
args (Any)
kwargs (Any)
- Return type:
Bulk
- elasticsearch.helpers.bulk(client, actions, stats_only=False, ignore_status=(), *args, **kwargs)
Helper for the
bulk()api that provides a more human friendly interface - it consumes an iterator of actions and sends them to elasticsearch in chunks. It returns a tuple with summary information - number of successfully executed actions and either list of errors or number of errors ifstats_onlyis set toTrue. Note that by default we raise aBulkIndexErrorwhen we encounter an error so options likestats_onlyonly apply whenraise_on_erroris set toFalse.When errors are being collected original document data is included in the error dictionary which can lead to an extra high memory usage. If you need to process a lot of data and want to ignore/collect errors please consider using the
streaming_bulk()helper which will just return the errors and not store them in memory.- Parameters:
client (Elasticsearch) – instance of
Elasticsearchto useactions (Iterable[bytes | str | Dict[str, Any]]) – iterator containing the actions
stats_only (bool) – if True only report number of successful/failed operations instead of just number of successful and a list of error responses
ignore_status (int | Collection[int]) – list of HTTP status code that you want to ignore
client
actions
stats_only
ignore_status
args (Any)
kwargs (Any)
- Return type:
Any additional keyword arguments will be passed to
streaming_bulk()which is used to execute the operation, seestreaming_bulk()for more accepted parameters.
Scan
- elasticsearch.helpers.scan(client, query=None, scroll='5m', raise_on_error=True, preserve_order=False, size=1000, request_timeout=None, clear_scroll=True, scroll_kwargs=None, **kwargs)
Simple abstraction on top of the
scroll()api - a simple iterator that yields all hits as returned by underlining scroll requests.By default scan does not return results in any pre-determined order. To have a standard order in the returned documents (either by score or explicit sort definition) when scrolling, use
preserve_order=True. This may be an expensive operation and will negate the performance benefits of usingscan.- Parameters:
client (Elasticsearch) – instance of
Elasticsearchto usescroll (str) – Specify how long a consistent view of the index should be maintained for scrolled search
raise_on_error (bool) – raises an exception (
ScanError) if an error is encountered (some shards fail to execute). By default we raise.preserve_order (bool) – don’t set the
search_typetoscan- this will cause the scroll to paginate with preserving the order. Note that this can be an extremely expensive operation and can easily lead to unpredictable results, use with caution.size (int) – size (per shard) of the batch send at each iteration.
request_timeout (float | None) – explicit timeout for each call to
scanclear_scroll (bool) – explicitly calls delete on the scroll id via the clear scroll API at the end of the method on completion or error, defaults to true.
scroll_kwargs (MutableMapping[str, Any] | None) – additional kwargs to be passed to
scroll()client
query
scroll
raise_on_error
preserve_order
size
request_timeout
clear_scroll
scroll_kwargs
kwargs (Any)
- Return type:
Any additional keyword arguments will be passed to the initial
search()call:scan(client, query={"query": {"match": {"title": "python"}}}, index="orders-*", doc_type="books" )
Reindex
- elasticsearch.helpers.reindex(client, source_index, target_index, query=None, target_client=None, chunk_size=500, scroll='5m', op_type=None, scan_kwargs={}, bulk_kwargs={})
Reindex all documents from one index that satisfy a given query to another, potentially (if target_client is specified) on a different cluster. If you don’t specify the query you will reindex all the documents.
Since
2.3areindex()api is available as part of elasticsearch itself. It is recommended to use the api instead of this helper wherever possible. The helper is here mostly for backwards compatibility and for situations where more flexibility is needed.Note
This helper doesn’t transfer mappings, just the data.
- Parameters:
client (Elasticsearch) – instance of
Elasticsearchto use (for read if target_client is specified as well)source_index (str | Collection[str]) – index (or list of indices) to read documents from
target_index (str) – name of the index in the target cluster to populate
target_client (Elasticsearch | None) – optional, is specified will be used for writing (thus enabling reindex between clusters)
chunk_size (int) – number of docs in one chunk sent to es (default: 500)
scroll (str) – Specify how long a consistent view of the index should be maintained for scrolled search
op_type (str | None) – Explicit operation type. Defaults to ‘_index’. Data streams must be set to ‘create’. If not specified, will auto-detect if target_index is a data stream.
scan_kwargs (MutableMapping[str, Any]) – additional kwargs to be passed to
scan()bulk_kwargs (MutableMapping[str, Any]) – additional kwargs to be passed to
bulk()client
source_index
target_index
query
target_client
chunk_size
scroll
op_type
scan_kwargs
bulk_kwargs
- Return type: