Skip to main content

ConsumeAzureEventHub

Description

Receives messages from Microsoft Azure Event Hubs with checkpointing to ensure consistent event processing. Checkpoint tracking avoids consuming a message multiple times and enables reliable resumption of processing in the event of intermittent network failures. Checkpoint tracking requires external storage and provides the preferred approach to consuming messages from Azure Event Hubs. In clustered environment, ConsumeAzureEventHub processor instances form a consumer group and the messages are distributed among the cluster nodes (each message is processed on one cluster node only).

Tags

azure, cloud, eventhub, events, microsoft, streaming, streams

Properties

In the list below required Properties are shown with an asterisk (*). Other properties are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.

Display NameAPI NameDefault ValueAllowable ValuesDescription
Event Hub Namespace *event-hub-namespaceThe namespace that the Azure Event Hubs is assigned to. This is generally equal to <Event Hub Names>-ns.

Supports Expression Language, using Environment variables.
Event Hub Name *event-hub-nameThe name of the event hub to pull messages from.

Supports Expression Language, using Environment variables.
Service Bus Endpoint *Service Bus EndpointAzure
  • Azure
  • Azure China
  • Azure Germany
  • Azure US Government
To support namespaces not in the default windows.net domain.
Transport Type *Transport TypeAMQP
  • AMQP
  • AMQP_WEB_SOCKETS
Advanced Message Queuing Protocol Transport Type for communication with Azure Event Hubs
Shared Access Policy Nameevent-hub-shared-access-policy-nameThe name of the shared access policy. This policy must have Listen claims.

Supports Expression Language, using Environment variables.
Shared Access Policy Keyevent-hub-shared-access-policy-primary-keyThe key of the shared access policy. Either the primary or the secondary key can be used.
Use Azure Managed Identity *use-managed-identityfalse
  • true
  • false
Choose whether or not to use the managed identity of Azure VM/VMSS
Consumer Group *event-hub-consumer-group$DefaultThe name of the consumer group to use.

Supports Expression Language, using Environment variables.
Record Readerrecord-readerController Service:
RecordReaderFactory

Implementations:
AvroReader
CEFReader
CSVReader
ExcelReader
GrokReader
JsonPathReader
JsonTreeReader
ReaderLookup
ScriptedReader
Syslog5424Reader
SyslogReader
WindowsEventLogReader
XMLReader
YamlTreeReader
The Record Reader to use for reading received messages. The event hub name can be referred by Expression Language '${eventhub.name}' to access a schema.
Record Writerrecord-writerController Service:
RecordSetWriterFactory

Implementations:
AvroRecordSetWriter
CSVRecordSetWriter
FreeFormTextRecordSetWriter
JsonRecordSetWriter
RecordSetWriterLookup
ScriptedRecordSetWriter
XMLRecordSetWriter
The Record Writer to use for serializing Records to an output FlowFile. The event hub name can be referred by Expression Language '${eventhub.name}' to access a schema. If not specified, each message will create a FlowFile.
Initial Offset *event-hub-initial-offsetEnd of stream
  • Start of stream
  • End of stream
Specify where to start receiving messages if offset is not yet stored in the checkpoint store.
Prefetch Count *event-hub-prefetch-count300

Supports Expression Language, using Environment variables.
Batch Size *event-hub-batch-size10The number of messages to process within a NiFi session. This parameter affects throughput and consistency. NiFi commits its session and Event Hubs checkpoints after processing this number of messages. If NiFi session is committed, but fails to create an Event Hubs checkpoint, then it is possible that the same messages will be received again. The higher number, the higher throughput, but possibly less consistent.

Supports Expression Language, using Environment variables.
Message Receive Timeout *event-hub-message-receive-timeout1 minThe amount of time this consumer should wait to receive the Batch Size before returning.

Supports Expression Language, using Environment variables.
Checkpoint Strategy *checkpoint-strategyAzure Blob Storage
  • Azure Blob Storage
  • Component State
Specifies which strategy to use for storing and retrieving partition ownership and checkpoint information for each partition.
Storage Account Name *storage-account-nameName of the Azure Storage account to store event hub consumer group state.

Supports Expression Language, using Environment variables.

This property is only considered if:
  • the property Checkpoint Strategy has a value of AZURE_BLOB_STORAGE
Storage Account Keystorage-account-keyThe Azure Storage account key to store event hub consumer group state.

Supports Expression Language, using Environment variables.

This property is only considered if:
  • the property Checkpoint Strategy has a value of AZURE_BLOB_STORAGE
Storage SAS Tokenstorage-sas-tokenThe Azure Storage SAS token to store Event Hub consumer group state. Always starts with a ? character.

Supports Expression Language, using Environment variables.

This property is only considered if:
  • the property Checkpoint Strategy has a value of AZURE_BLOB_STORAGE
Storage Container Namestorage-container-nameName of the Azure Storage container to store the event hub consumer group state. If not specified, event hub name is used.

Supports Expression Language, using Environment variables.

This property is only considered if:
  • the property Checkpoint Strategy has a value of AZURE_BLOB_STORAGE
Proxy Configuration Serviceproxy-configuration-serviceController Service:
ProxyConfigurationService

Implementations:
StandardProxyConfigurationService
Specifies the Proxy Configuration Controller Service to proxy network requests. Supported proxies: HTTP + AuthN

This property is only considered if:
  • the property Transport Type has a value of AmqpWebSockets

Dynamic Properties

This component does not support dynamic properties.

Relationships

NameDescription
successFlowFiles received from Event Hub.

Reads Attributes

This processor does not read attributes.

Writes Attributes

NameDescription
eventhub.enqueued.timestampThe time (in milliseconds since epoch, UTC) at which the message was enqueued in the event hub
eventhub.nameThe name of the event hub from which the message was pulled
eventhub.offsetThe offset into the partition at which the message was stored
eventhub.partitionThe name of the partition from which the message was pulled
eventhub.property.*The application properties of this message. IE: 'application' would be 'eventhub.property.application'
eventhub.sequenceThe sequence number associated with the message

State Management

ScopeDescription
CLUSTER, LOCALLocal state is used to store the client id. Cluster state is used to store partition ownership and checkpoint information when component state is configured as the checkpointing strategy.

Restricted

This component is not restricted.

Input Requirement

This component does not allow an incoming relationship.

System Resource Considerations

This component does not specify system resource considerations.

See Also