Skip to main content

ConsumeElasticsearch

Description

A processor that repeatedly runs a paginated query against a field using a Range query to consume new Documents from an Elasticsearch index/query. The processor will retrieve multiple pages of results until either no more results are available or the Pagination Keep Alive expiration is reached, after which the Range query will automatically update the field constraint based on the last retrieved Document value.

Tags

elasticsearch, elasticsearch5, elasticsearch6, elasticsearch7, elasticsearch8, json, page, query, scroll, search

Properties

In the list below required Properties are shown with an asterisk (*). Other properties are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.

Display NameAPI NameDefault ValueAllowable ValuesDescription
Range Query Field *es-rest-range-fieldField to be tracked as part of an Elasticsearch Range query using a "gt" bound match. This field must exist within the Elasticsearch document for it to be retrieved.
Sort Order *es-rest-sort-orderasc
  • asc
  • desc
The order in which to sort the "Range Query Field". A "sort" clause for the "Range Query Field" field will be prepended to any provided "Sort" clauses. If a "sort" clause already exists for the "Range Query Field" field, it will not be updated.
Initial Valuees-rest-range-initial-valueThe initial value to use for the query if the processor has not run previously. If the processor has run previously and stored a value in its state, this property will be ignored. If no value is provided, and the processor has not previously run, no Range query bounds will be used, i.e. all documents will be retrieved in the specified "Sort Order".
Initial Value Date Formates-rest-range-formatIf the "Range Query Field" is a Date field, convert the "Initial Value" to a date with this format. If not specified, Elasticsearch will use the date format provided by the "Range Query Field"'s mapping. For valid syntax, see https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-date-format.html

This property is only considered if:
  • the property Initial Value has a value specified
Initial Value Date Time Zonees-rest-range-time-zoneIf the "Range Query Field" is a Date field, convert the "Initial Value" to UTC with this time zone. Valid values are ISO 8601 UTC offsets, such as "+01:00" or "-08:00", and IANA time zone IDs, such as "Europe/London".

This property is only considered if:
  • the property Initial Value has a value specified
Additional Filterses-rest-additional-filtersOne or more query filters in JSON syntax, not Lucene syntax. Ex: [{"match":{"somefield":"somevalue"}}, {"match":{"anotherfield":"anothervalue"}}]. These filters wil be used as part of a Bool query's filter.
Sizees-rest-sizeThe maximum number of documents to retrieve in the query. If the query is paginated, this "size" applies to each page of the query, not the "size" of the entire result set.

Supports Expression Language, using FlowFile attributes and Environment variables.
Sortes-rest-query-sortSort results by one or more fields, in JSON syntax. Ex: [{"price" : {"order" : "asc", "mode" : "avg"}}, {"post_date" : {"format": "strict_date_optional_time_nanos"}}]

Supports Expression Language, using FlowFile attributes and Environment variables.
Aggregationses-rest-query-aggsOne or more query aggregations (or "aggs"), in JSON syntax. Ex: {"items": {"terms": {"field": "product", "size": 10}}}

Supports Expression Language, using FlowFile attributes and Environment variables.
Fieldses-rest-query-fieldsFields of indexed documents to be retrieved, in JSON syntax. Ex: ["user.id", "http.response.*", {"field": "@timestamp", "format": "epoch_millis"}]

Supports Expression Language, using FlowFile attributes and Environment variables.
Script Fieldses-rest-query-script-fieldsFields to created using script evaluation at query runtime, in JSON syntax. Ex: {"test1": {"script": {"lang": "painless", "source": "doc['price'].value * 2"}}, "test2": {"script": {"lang": "painless", "source": "doc['price'].value * params.factor", "params": {"factor": 2.0}}}}

Supports Expression Language, using FlowFile attributes and Environment variables.
Query Attributeel-query-attributeIf set, the executed query will be set on each result flowfile in the specified attribute.

Supports Expression Language, using FlowFile attributes and Environment variables.
Index *el-rest-fetch-indexThe name of the index to use.

Supports Expression Language, using FlowFile attributes and Environment variables.
Typeel-rest-typeThe type of this document (used by Elasticsearch for indexing and searching).

Supports Expression Language, using FlowFile attributes and Environment variables.
Max JSON Field String Length *Max JSON Field String Length20 MBThe maximum allowed length of a string value when parsing a JSON document or attribute.
Client Service *el-rest-client-serviceController Service:
ElasticSearchClientService

Implementations:
ElasticSearchClientServiceImpl
An Elasticsearch client service to use for running queries.
Search Results Split *el-rest-split-up-hitsPER_RESPONSE
  • PER_HIT
  • PER_RESPONSE
  • PER_QUERY
Output a flowfile containing all hits or one flowfile for each individual hit or one flowfile containing all hits from all paged responses.
Search Results Format *el-rest-format-hitsFULL
  • FULL
  • SOURCE_ONLY
  • METADATA_ONLY
Format of Hits output.
Aggregation Results Split *el-rest-split-up-aggregationsPER_RESPONSE
  • PER_HIT
  • PER_RESPONSE
Output a flowfile containing all aggregations or one flowfile for each individual aggregation.
Aggregation Results Format *el-rest-format-aggregationsFULL
  • FULL
  • BUCKETS_ONLY
  • METADATA_ONLY
Format of Aggregation output.
Output No Hits *el-rest-output-no-hitsfalse
  • true
  • false
Output a "hits" flowfile even if no hits found for query. If true, an empty "hits" flowfile will be output even if "aggregations" are output.
Pagination Type *el-rest-pagination-typeSCROLL
  • SCROLL
  • SEARCH_AFTER
  • POINT_IN_TIME
Pagination method to use. Not all types are available for all Elasticsearch versions, check the Elasticsearch docs to confirm which are applicable and recommended for your service.
Pagination Keep Alive *el-rest-pagination-keep-alive10 minsPagination "keep_alive" period. Period Elasticsearch will keep the scroll/pit cursor alive in between requests (this is not the time expected for all pages to be returned, but the maximum allowed time for requests between page retrievals).

Dynamic Properties

NameValueDescription
The name of a URL query parameter to addThe value of the URL query parameterAdds the specified property name/value as a query parameter in the Elasticsearch URL used for processing. These parameters will override any matching parameters in the query request body. For SCROLL type queries, these parameters are only used in the initial (first page) query as the Elasticsearch Scroll API does not support the same query parameters for subsequent pages of data.

Supports Expression Language: Yes, evaluated using Environment variables.

Relationships

NameDescription
aggregationsAggregations are routed to this relationship.
hitsSearch hits are routed to this relationship.

Reads Attributes

This processor does not read attributes.

Writes Attributes

NameDescription
elasticsearch.query.errorThe error message provided by Elasticsearch if there is an error querying the index.
hit.countThe number of hits that are in the output flowfile
mime.typeapplication/json
page.numberThe number of the page (request), starting from 1, in which the results were returned that are in the output flowfile

State Management

ScopeDescription
CLUSTERThe pagination state (scrollId, searchAfter, pitId, hitCount, pageCount, pageExpirationTimestamp, trackingRangeValue) is retained in between invocations of this processor until the Scroll/PiT has expired (when the current time is later than the last query execution plus the Pagination Keep Alive interval).

Restricted

This component is not restricted.

Input Requirement

This component does not allow an incoming relationship.

System Resource Considerations

ScopeDescription
MEMORYCare should be taken on the size of each page because each response from Elasticsearch will be loaded into memory all at once and converted into the resulting flowfiles.

See Also

PaginatedJsonQueryElasticsearch, SearchElasticsearch