Skip to main content
Version: 3.2

Kafka

Cribl LogStream supports receiving data records from a Kafka cluster.

Type: Pull | TLS Support: YES | Event Breaker Support: No

This Source currently does not support compressed data; set compression to none on your Kafka senders to prevent LZ4 compression not implemented errors.

Configuring LogStream to Receive Data from Kafka Topics

In the QuickConnect UI: Click + New Source, or click + Add beside Sources. From the resulting drawer's tiles, select [Pull >] Kafka. Next, click either + Add New or (if displayed) Select Existing. The drawer will now provide the following options and fields.

Or, in the Data Routes UI: From the top nav of a LogStream instance or Group, select Data > Sources. From the resulting page's tiles or the Sources left nav, select [Pull >] Kafka. Next, click + Add New to open a New Source modal that provides the following options and fields.

General Settings

Input ID: Enter a unique name to identify this Source definition.

Brokers: List of Kafka brokers to use, e.g., localhost:9092.

Topics: Enter the names of topics to subscribe to. Press Enter/Return between multiple entries.

Group ID: The name of the consumer group to which this Cribl LogStream instance belongs.

From beginning: Whether to start reading from the earliest available data. Relevant only during initial subscription. Defaults to Yes.

TLS Settings (Client Side)

Enabled: defaults to No. When toggled to Yes:

Autofill?: This setting is experimental.

Validate client certs: Reject certificates that are not authorized by a CA in the CA certificate path, or by another trusted CA (e.g., the system's CA). Defaults to No.

Server name (SNI): Server name for the SNI (Server Name Indication) TLS extension. This must be a host name, not an IP address.

Certificate name: The name of the predefined certificate.

CA certificate path: Path on client containing CA certificates (in PEM format) to use to verify the server's cert. Path can reference $ENV_VARS.

Private key path (mutual auth): Path on client containing the private key (in PEM format) to use. Path can reference $ENV_VARS. Use only if mutual auth is required.

Certificate path (mutual auth): Path on client containing certificates in (PEM format) to use. Path can reference $ENV_VARS. Use only if mutual auth is required.

Passphrase: Passphrase to use to decrypt private key.

Minimum TLS version: Optionally, select the minimum TLS version to accept from connections.

Maximum TLS version: Optionally, select the maximum TLS version to accept from connections.

In a LogStream Cloud deployment, do not set the TLS Settings (Client Side) tab's Enabled slider to Yes, nor configure any of the tab's resulting TLS fields. Any settings that you configure here would conflict with the LogStream Cloud Source's predefined TLS configuration.

Authentication

This section governs SASL (Simple Authentication and Security Layer) authentication to use when connecting to brokers.

Enabled: Defaults to No. When toggled to Yes:

SASL mechanism: Use this drop-down to select the SASL authentication mechanism to use. The mechanism you select determines the controls displayed below.

PLAIN, SCRAM-256, or SCRAM-512

With any of these authentication mechanisms, select one of the following buttons:

Manual: Displays Username and Password fields to enter your Kafka credentials directly.

Secret: This option exposes a Credentials secret drop-down in which you can select a stored text secret that references your Kafka credentials. A Create link is available to store a new, reusable secret.

GSSAPI/Kerberos

Selecting Kerberos as the authentication mechanism displays the following options:

Keytab location: Enter the location of the key table file for the authentication principal.

Principal: Enter the authentication principal, e.g.: kafka_user@example.com.

Broker service class: Enter the Kerberos service class for Kafka brokers, e.g.: kafka.

Schema Registry

This section governs Kafka Schema Registry Authentication for AVRO-encoded data with a schema stored in the Confluent Schema Registry.

Enabled: defaults to No. When toggled to Yes:

Schema registry URL: URL for access to the Confluent Schema Registry. (E.g., http://<hostname>:8081.)

TLS enabled: defaults to No. When toggled to Yes, displays the following TLS settings for the Schema Registry:

These have the same format as the TLS Settings (Client Side) above.

TLS Settings (Schema Registry)

Validate server certs: Reject certificates that are not authorized by a CA specified in the CA Certificate Path field. Defaults to No.

Server name (SNI): Server name for the SNI (Server Name Indication) TLS extension. This must be a host name, not an IP address.

Certificate name: The name of the predefined certificate.

CA certificate path: Path on client containing CA certificates (in PEM format) to use to verify the server's cert. Path can reference $ENV_VARS.

Private key path (mutual auth): Path on client containing the private key (in PEM format) to use. Path can reference $ENV_VARS. Use only if mutual auth is required.

Certificate path (mutual auth): Path on client containing certificates in (PEM format) to use. Path can reference $ENV_VARS. Use only if mutual auth is required.

Passphrase: Passphrase to use to decrypt private key.

Minimum TLS version: Optionally, select the minimum TLS version to use when connecting.

Maximum TLS version: Optionally, select the maximum TLS version to use when connecting.

Processing Settings

Fields (Metadata)

In this section, you can add fields/metadata to each event using Eval-like functionality.

Name: Field name.

Value: JavaScript expression to compute field's value (can be a constant).

Pre-Processing

In this section's Pipeline drop-down list, you can select a single existing Pipeline to process data from this input before the data is sent through the Routes.

Advanced Settings

Use these settings to fine-tune LogStream's integration with Kafka topics. If you are unfamiliar with these parameters, contact Cribl Support to understand the implications of changing the defaults.

Session timeout (ms): Timeout used to detect client failures when using Kafka's group management facilities. If the client sends the broker no heartbeats before this timeout expires, the broker will remove this client from the group, and will initiate a rebalance. Value must be between the broker's configured group.min.session.timeout.ms and group.max.session.timeout.ms. Defaults to 30000 ms, i.e., 30 seconds. For details, see the Kafka documentation.

Rebalance timeout (ms): Maximum allowed time for each worker to join the group after a rebalance has begun. If the timeout is exceeded, the coordinator broker will remove the worker from the group. Defaults to 60000 ms, i.e., 1 minute. For details, see the Kafka documentation.

Heartbeat interval (ms): Expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities. Value must be lower than sessionTimeout, and typically should not exceed 1/3 of the sessionTimeout value. Defaults to 3000 ms, i.e., 3 seconds. For details, see the Kafka documentation.

If you observe an excessive number of group rebalances, and/or you observe consumers not regularly pulling messages, try increasing the values of all three of the above parameters.

Environment: If you're using GitOps, optionally use this field to specify a single Git branch on which to enable this configuration. If empty, the config will be enabled everywhere.

Connected Destinations

Select Send to Routes to enable conditional routing, filtering, and cloning of this Source's data via the Routing table.

Select QuickConnect to send this Source’s data to one or more Destinations via independent, direct connections.

Internal Fields

Cribl LogStream uses a set of internal fields to assist in handling of data. These "meta" fields are not part of an event, but they are accessible, and Functions can use them to make processing decisions.

Fields for this Source:

  • __inputId
  • __topicIn (indicates the Kafka topic that the event came from; see __topicOut in our Kafka Destination documentation)
  • __schemaId (when using Schema Registry)

How LogStream Pulls Data

Kafka treats all the Worker Nodes as members of a Consumer Group, and Kafka manages each Node’s data load. By default, Workers will poll every 5 seconds. In the case of Leader failure, Worker Nodes will continue to receive data as normal.