Home / Stream/ Sources/ Kafka·Amazon MSK

Amazon MSK

Cribl Stream supports receiving data records from an Amazon Managed Streaming for Apache Kafka (MSK) cluster. This Source automatically detects compressed data in Gzip, Snappy, or LZ4 format.

Type: Pull | TLS Support: YES | Event Breaker Support: No

Kafka uses a binary protocol over TCP. It does not support HTTP proxies, so Cribl Stream must receive events directly from senders. You might need to adjust your firewall rules to allow this traffic.

Configuring Cribl Stream to Receive Data from Kafka Topics

From the top nav, click Manage, then select a Worker Group to configure. Next, you have two options:

In the QuickConnect UI: Click Add Source. From the resulting drawer’s tiles, select [Pull > ] Amazon MSK. Next, click either Add Destination or (if displayed) Select Existing. The resulting drawer will provide the options below.

Or, to configure via the Routing UI, click Data > Sources. From the resulting page’s tiles or left nav, select [Pull > ] Amazon MSK. Next, click New Source to open a New Source modal that provides the options below.

General Settings

Input ID: Enter a unique name to identify this Source definition.

Brokers: List of Kafka brokers to connect to, e.g., kafkaBrokerHost:9092. Specify hostname and port, e.g., mykafkabroker:9092, or just hostname, in which case Stream will assign port 9092.

Topics: Enter the name(s) of topics to subscribe to. Press Enter/Return between multiple entries.

To optimize performance and prevent excessive rebalancing, Cribl recommends subscribing each Kafka Source to only one topic. To subscribe to multiple topics, consider creating a dedicated Kafka Source for each one.

For the same reason, the Group ID (below) should be unique for each of your Kafka, Confluent Cloud, and Azure Event Hubs Sources. For details, see Controlling Rebalancing.

Region: Select the name of the AWS Region where your Amazon MSK cluster is located.

Optional Settings

Group ID: The name of the consumer group to which this Cribl Stream instance belongs.

From beginning: Toggle this off if you want Cribl Stream, upon subscribing to a topic for the first time, to read new messages only. Otherwise Cribl Stream will read all messages available on the server, starting from the oldest.

Tags: Optionally, add tags that you can use for filtering and grouping in Cribl Stream. Use a tab or hard return between (arbitrary) tag names.

TLS Settings (Client Side)

For Amazon MSK Sources and Destinations:

  • IAM is the only type of authentication that Cribl Stream supports.
  • Because IAM auth requires TLS, TLS is automatically enabled.

Validate client certs: Toggle on to reject certificates that are not authorized by a CA in the CA certificate path, nor by another trusted CA (e.g., the system’s CA). Defaults to No.

Server name (SNI): Server name for the SNI (Server Name Indication) TLS extension. This must be a host name, not an IP address.

Certificate name: The name of the predefined certificate.

CA certificate path: Path on client containing CA certificates (in PEM format) to use to verify the server’s cert. Path can reference $ENV_VARS.

Private key path (mutual auth): Path on client containing the private key (in PEM format) to use. Path can reference $ENV_VARS. Use only if mutual auth is required.

Certificate path (mutual auth): Path on client containing certificates in (PEM format) to use. Path can reference $ENV_VARS. Use only if mutual auth is required.

Passphrase: Passphrase to use to decrypt private key.

Minimum TLS version: Optionally, select the minimum TLS version to accept from connections.

Maximum TLS version: Optionally, select the maximum TLS version to accept from connections.

Authentication

Use the Authentication Method buttons to select an AWS authentication method.

Auto: This default option uses the AWS instance’s metadata service to automatically obtain short-lived credentials from the IAM role attached to an EC2 instance, local credentials, sidecar, or other source. The attached IAM role grants Cribl Stream Workers access to authorized AWS resources. Can also use the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY. Works only when running on AWS.

Manual: If not running on AWS, you can select this option to enter a static set of user-associated IAM credentials (your access key and secret key) directly or by reference. This is useful for Workers not in an AWS VPC, e.g., those running a private cloud. The Manual option exposes these corresponding additional fields:

  • Access key: Enter your AWS access key. If not present, will fall back to the env.AWS_ACCESS_KEY_ID environment variable, or to the metadata endpoint for IAM role credentials.
  • Secret key: Enter your AWS secret key. If not present, will fall back to the env.AWS_SECRET_ACCESS_KEY environment variable, or to the metadata endpoint for IAM credentials.

Secret: If not running on AWS, you can select this option to supply a stored secret that references an AWS access key and secret key. The Secret option exposes this additional field:

  • Secret key pair: Use the drop-down to select an API key/secret key pair that you’ve configured in Cribl Stream’s secrets manager. A Create link is available to store a new, reusable secret.

Assume Role

Enable for MSK: Toggle on to use Assume Role credentials to access MSK.

AssumeRole ARN: Enter the Amazon Resource Name (ARN) of the role to assume.

External ID: Enter the External ID to use when assuming role. This is required only when assuming a role that requires this ID in order to delegate third-party access. For details, see AWS’ documentation.

Processing Settings

Fields

In this section, you can add Fields to each event using Eval-like functionality.

Name: Field name.

Value: JavaScript expression to compute field’s value, enclosed in quotes or backticks. (Can evaluate to a constant.)

Pre-Processing

In this section’s Pipeline drop-down list, you can select a single existing Pipeline to process data from this input before the data is sent through the Routes.

Advanced Settings

Use these settings to fine-tune Cribl Stream’s integration with Kafka topics. If you are unfamiliar with these parameters, contact Cribl Support to understand the implications of changing the defaults.

Heartbeat interval (ms): Expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. Value must be lower than sessionTimeout, and typically should not exceed 1/3 of the sessionTimeout value. Defaults to 3000 ms (3 seconds). For details, see the Kafka documentation.

Session timeout (ms): Timeout used to detect client failures when using Kafka’s group management facilities. If the client sends the broker no heartbeats before this timeout expires, the broker will remove this client from the group, and will initiate a rebalance. Value must be between the broker’s configured group.min.session.timeout.ms and group.max.session.timeout.ms. Defaults to 30000 ms (30 seconds). For details, see the Kafka documentation.

Rebalance timeout (ms): Maximum allowed time for each Worker to join the Group after a rebalance has begun. If the timeout is exceeded, the coordinator broker will remove the Worker from the Group. Defaults to 60000 ms (1 minute). For details, see the Kafka documentation.

Connection timeout (ms): Maximum time to wait for a connection to complete successfully. Defaults to 10000 ms (10 seconds). Valid range is 1000 to 3600000 ms (1 second to 1 hour). For details, see the Kafka documentation.

Request timeout (ms): Maximum time to wait for Kafka to respond to a request. Defaults to 60000 ms (1 minute). For details, see the Kafka documentation.

Max retries: Maximum number of times to retry a failed request before the message fails. Defaults to 5; enter 0 to not retry at all.

Authentication timeout (ms): Maximum time to wait for Kafka to respond to an authentication request. Defaults to 1000 (1 second).

Reauthentication threshold (ms): If the broker requires periodic reauthentication, this setting defines how long before the reauthentication timeout Cribl Stream initiates the reauthentication. Defaults to 10000 (10 seconds).

A small value for this setting, combined with high network latency, might prevent the Source from reauthenticating before the Kafka broker closes the connection.

A large value might cause the Source to send reauthentication messages too soon, wasting bandwidth.

The Kafka setting connections.max.reauth.ms controls the reuthentication threshold on the Kafka side.

Offset commit interval (ms): How often, in milliseconds, to commit offsets. If both this field and the Offset commit threshold are empty, Cribl Stream will commit offsets after each batch. If both fields are set, Cribl Stream will commit offsets when either condition is met.

Offset commit threshold: How many events are needed to trigger an offset commit. If both this field and the Offset commit interval are empty, Cribl Stream will commit offsets after each batch. If both fields are set, Cribl Stream will commit offsets when either condition is met.

Max bytes per partition: Maximum amount of data that Kafka will return per partition, per fetch request. Must equal or exceed the maximum message size (maxBytesPerPartition) that Kafka is configured to allow. Otherwise, Cribl Stream can get stuck trying to retrieve messages. Defaults to 1048576 (1 MB).

Max bytes: Maximum number of bytes that Kafka will return per fetch request. Defaults to 10485760 (10 MB).

Environment: If you’re using GitOps, optionally use this field to specify a single Git branch on which to enable this configuration. If empty, the config will be enabled everywhere.

If you observe an excessive number of group rebalances, and/or you observe consumers not regularly pulling messages, try increasing the values of Heartbeat interval, Session timeout, and Rebalance timeout.

Connected Destinations

Select Send to Routes to enable conditional routing, filtering, and cloning of this Source’s data via the Routing table.

Select QuickConnect to send this Source’s data to one or more Destinations via independent, direct connections.

Internal Fields

Cribl Stream uses a set of internal fields to assist in handling of data. These “meta” fields are not part of an event, but they are accessible, and Functions can use them to make processing decisions.

Fields for this Source:

  • __inputId
  • __topicIn (indicates the Kafka topic that the event came from; see __topicOut in our Kafka Destination documentation)
  • __partition
  • __schemaId (when using Schema Registry)
  • __key (when using Schema Registry)
  • __headers (when using Schema Registry)
  • __keySchemaIdIn (when using Schema Registry)
  • __valueSchemaIdIn (when using Schema Registry)

How Cribl Stream Pulls Data

Kafka treats all the Worker Nodes as members of a Consumer Group, and Kafka manages each Node’s data load. By default, Workers will poll every 5 seconds. In the case of Leader failure, Worker Nodes will continue to receive data as normal.

How Cribl Stream Handles the _time Field

Events that the Kafka Source emits always contain a _time field, and sometimes also an __origTime field. Here’s how Cribl Stream determines what to send:

  • If the incoming Kafka message contains no _time field, the message’s timestamp becomes the value of the emitted event’s _time field.
  • If the incoming Kafka message contains a _time field whose value is a timestamp in UNIX epoch time, that timestamp becomes the value of the emitted event’s _time field.
  • If the incoming Kafka message contains a _time field whose value is not a timestamp in UNIX epoch time (e.g., an ISO or UTC timestamp), that becomes the value of the emitted event’s __origTime field, and the message’s timestamp becomes the value of the emitted event’s _time field.

Controlling Rebalancing

When you configure multiple Sources that subscribe to different topics but all belong to the same consumer group, a state change affecting any Source in this consumer group will affect all the other Sources. Examples of state changes include: deploying new configs, adding or removing Worker Processes, and Worker Processes crashing.

Here’s an example – three Sources, three different topics, all in one consumer group:

  • Source_1 - Topic_1 - ConsumerGroup1
  • Source_2 - Topic_2 - ConsumerGroup1
  • Source_3 - Topic_3 - ConsumerGroup1

Imagine that Source 1 undergoes a state change event, such as a Worker Process crash. Source 2 and Source 3 will rebalance – stopping data flow until the rebalance completes.

Shared Worker Group Mitigation

If Sources that share a consumer group all deploy as part of the same Worker Group, changes will have smaller side effects than when Sources are spread across different Worker Groups. (Conversely, imagine a configuration where deploying new configs for Worker Group 1 caused rebalancing of topics in Worker Worker Group 2. This spillover would be especially undesirable.)

Bottom Line

Changes to any member of a consumer group affect all other members of that consumer group. To prevent this undesired behavior, make sure to use a unique Group ID for each Kafka, Confluent Cloud, Amazon MSK, and Azure Event Hubs Source.