Skip to main content

ConsumeKinesisStream

Description

Reads data from the specified AWS Kinesis stream and outputs a FlowFile for every processed Record (raw) or a FlowFile for a batch of processed records if a Record Reader and Record Writer are configured. At-least-once delivery of all Kinesis Records within the Stream while the processor is running. AWS Kinesis Client Library can take several seconds to initialise before starting to fetch data. Uses DynamoDB for check pointing and CloudWatch (optional) for metrics. Ensure that the credentials provided have access to DynamoDB and CloudWatch (optional) along with Kinesis.

Tags

amazon, aws, consume, kinesis, stream

Properties

In the list below required Properties are shown with an asterisk (*). Other properties are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.

Display NameAPI NameDefault ValueAllowable ValuesDescription
Amazon Kinesis Stream Name *kinesis-stream-nameThe name of Kinesis Stream
Application Name *amazon-kinesis-stream-application-nameThe Kinesis stream reader application name.
Record Readeramazon-kinesis-stream-record-readerController Service:
RecordReaderFactory

Implementations:
AvroReader
CEFReader
CSVReader
ExcelReader
GrokReader
JsonPathReader
JsonTreeReader
ReaderLookup
ScriptedReader
Syslog5424Reader
SyslogReader
WindowsEventLogReader
XMLReader
YamlTreeReader
The Record Reader to use for reading received messages. The Kinesis Stream name can be referred to by Expression Language '${kinesis.name}' to access a schema. If Record Reader/Writer are not specified, each Kinesis Record will create a FlowFile.
Record Writeramazon-kinesis-stream-record-writerController Service:
RecordSetWriterFactory

Implementations:
AvroRecordSetWriter
CSVRecordSetWriter
FreeFormTextRecordSetWriter
JsonRecordSetWriter
RecordSetWriterLookup
ScriptedRecordSetWriter
XMLRecordSetWriter
The Record Writer to use for serializing Records to an output FlowFile. The Kinesis Stream name can be referred to by Expression Language '${kinesis.name}' to access a schema. If Record Reader/Writer are not specified, each Kinesis Record will create a FlowFile.
Region *RegionUS West (Oregon)
  • AWS GovCloud (US-East)
  • AWS GovCloud (US-West)
  • Africa (Cape Town)
  • Asia Pacific (Hong Kong)
  • Asia Pacific (Hyderabad)
  • Asia Pacific (Jakarta)
  • Asia Pacific (Malaysia)
  • Asia Pacific (Melbourne)
  • Asia Pacific (Mumbai)
  • Asia Pacific (Osaka)
  • Asia Pacific (Seoul)
  • Asia Pacific (Singapore)
  • Asia Pacific (Sydney)
  • Asia Pacific (Tokyo)
  • Canada (Central)
  • Canada West (Calgary)
  • China (Beijing)
  • China (Ningxia)
  • EU ISOE West
  • Europe (Frankfurt)
  • Europe (Ireland)
  • Europe (London)
  • Europe (Milan)
  • Europe (Paris)
  • Europe (Spain)
  • Europe (Stockholm)
  • Europe (Zurich)
  • Israel (Tel Aviv)
  • Middle East (Bahrain)
  • Middle East (UAE)
  • South America (Sao Paulo)
  • US East (N. Virginia)
  • US East (Ohio)
  • US ISO East
  • US ISO WEST
  • US ISOB East (Ohio)
  • US West (N. California)
  • US West (Oregon)
  • aws-cn-global
  • aws-global
  • aws-iso-b-global
  • aws-iso-global
  • aws-us-gov-global
Endpoint Override URLEndpoint Override URLEndpoint URL to use instead of the AWS default including scheme, host, port, and path. The AWS libraries select an endpoint URL based on the AWS region, but this property overrides the selected endpoint URL, allowing use with other S3-compatible endpoints.

Supports Expression Language, using Environment variables.
DynamoDB Overrideamazon-kinesis-stream-dynamodb-overrideDynamoDB override to use non-AWS deployments

Supports Expression Language, using Environment variables.
Initial Stream Position *amazon-kinesis-stream-initial-positionLATEST
  • LATEST
  • TRIM_HORIZON
  • AT_TIMESTAMP
Initial position to read Kinesis streams.
Stream Position Timestampamazon-kinesis-stream-position-timestampTimestamp position in stream from which to start reading Kinesis Records. Required if Initial position to read Kinesis streams. is AT_TIMESTAMP. Uses the Timestamp Format to parse value into a Date.

This property is only considered if:
  • the property Initial Stream Position has a value of AT_TIMESTAMP
Timestamp Format *amazon-kinesis-stream-timestamp-formatyyyy-MM-dd HH:mm:ssFormat to use for parsing the Stream Position Timestamp into a Date and converting the Kinesis Record's Approximate Arrival Timestamp into a FlowFile attribute.

Supports Expression Language, using Environment variables.
Failover Timeout *amazon-kinesis-stream-failover-timeout30 secsKinesis Client Library failover timeout
Graceful Shutdown Timeout *amazon-kinesis-stream-graceful-shutdown-timeout20 secsKinesis Client Library graceful shutdown timeout
Checkpoint Interval *amazon-kinesis-stream-checkpoint-interval3 secsInterval between Kinesis checkpoints
Retry Count *amazon-kinesis-stream-retry-count10Number of times to retry a Kinesis operation (process record, checkpoint, shutdown)
Retry Wait *amazon-kinesis-stream-retry-wait1 secInterval between Kinesis operation retries (process record, checkpoint, shutdown)
Report Metrics to CloudWatch *amazon-kinesis-stream-cloudwatch-flagfalse
  • true
  • false
Whether to report Kinesis usage metrics to CloudWatch.
Communications Timeout *Communications Timeout30 secs
AWS Credentials Provider Service *AWS Credentials Provider serviceController Service:
AWSCredentialsProviderService

Implementations:
AWSCredentialsProviderControllerService
The Controller Service that is used to obtain AWS credentials provider
Proxy Configuration Serviceproxy-configuration-serviceController Service:
ProxyConfigurationService

Implementations:
StandardProxyConfigurationService
Specifies the Proxy Configuration Controller Service to proxy network requests. Supported proxies: HTTP + AuthN

Dynamic Properties

NameValueDescription
Kinesis Client Library (KCL) Configuration property nameValue to set in the KCL Configuration propertyOverride default KCL Configuration ConfigsBuilder properties with required values. Supports setting of values directly on the ConfigsBuilder, such as 'namespace', as well as properties on nested builders. For example, to set configsBuilder.retrievalConfig().maxListShardsRetryAttempts(value), name the property as 'retrievalConfig.maxListShardsRetryAttempts'. Only supports setting of simple property values, e.g. String, int, long and boolean. Does not allow override of KCL Configuration settings handled by non-dynamic processor properties.

Supports Expression Language: No

Relationships

NameDescription
successFlowFiles are routed to success relationship

Reads Attributes

This processor does not read attributes.

Writes Attributes

NameDescription
aws.kinesis.approximate.arrival.timestampApproximate arrival timestamp of the (last) Kinesis Record read from the stream
aws.kinesis.partition.keyPartition key of the (last) Kinesis Record read from the Shard
aws.kinesis.sequence.numberThe unique identifier of the (last) Kinesis Record within its Shard
aws.kinesis.shard.idShard ID from which the Kinesis Record was read
mime.typeSets the mime.type attribute to the MIME Type specified by the Record Writer (if configured)
record.countNumber of records written to the FlowFiles by the Record Writer (if configured)
record.error.messageThis attribute provides on failure the error message encountered by the Record Reader or Record Writer (if configured)

State Management

This component does not store state.

Restricted

This component is not restricted.

Input Requirement

This component does not allow an incoming relationship.

System Resource Considerations

ScopeDescription
CPUKinesis Client Library is used to create a Worker thread for consumption of Kinesis Records. The Worker is initialised and started when this Processor has been triggered. It runs continually, spawning Kinesis Record Processors as required to fetch Kinesis Records. The Worker Thread (and any child Record Processor threads) are not controlled by the normal NiFi scheduler as part of the Concurrent Thread pool and are not released until this processor is stopped.

See Also

PutKinesisStream