Skip to main content
Version: 3.2

Amazon Kinesis Streams

Cribl LogStream supports receiving data records from Amazon Kinesis Streams.

Type: Pull | TLS Support: YES (secure API) | Event Breaker Support: No

Configuring Cribl LogStream to Receive Data from Kinesis Streams

In the QuickConnect UI: Click + New Source, or click + Add beside Sources. From the resulting drawer's tiles, select [Pull >] Amazon > Kinesis. Next, click either + Add New or (if displayed) Select Existing. The drawer will now provide the following options and fields.

Or, in the Data Routes UI: From the top nav of a LogStream instance or Group, select Data > Sources. From the resulting page's tiles or the Sources left nav, select [Pull >] Amazon > Kinesis. Next, click + Add New to open a New Source modal that provides the following options and fields.

General Settings

Input ID: Enter a unique name to identify this Kinesis Stream Source definition.

Stream name: Kinesis stream name (not ARN) to read data from.

Shard iterator start: Location at which to start reading a shard for the first time. Defaults to Earliest Record.

Record data format: Format of data inside the Kinesis Stream records. Gzip compression is automatically detected. Options include:

  • Cribl (the default): Use this option if LogStream wrote data to Kinesis in this format. This is a type of NDJSON.
  • Newline JSON: Use if the records contain newline-delimited JSON (NDJSON) events – e.g., Kubernetes logs ingested through Kinesis. This is a good choice if you don't know the records' format.
  • CloudWatch Logs: Use if you've configured CloudWatch to send logs to Kinesis.
  • Event per line: NDJSON can use this format when it fails to parse lines as valid JSON.

Region: Region where the Kinesis stream is located. Required.

Authentication

Use the Authentication Method buttons to select an AWS authentication method:

  • Auto: This default option uses the AWS instance's metadata service to automatically obtain short-lived credentials from the IAM role attached to an EC2 instance. The attached IAM role grants LogStream Workers access to authorized AWS resources. Can also use the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY. Works only when running on AWS.

  • Manual: If not running on AWS, you can select this option to enter a static set of user-associated IAM credentials (your access key and secret key) directly or by reference. This is useful for Workers not in an AWS VPC, e.g., those running a private cloud.

  • Secret: If not running on AWS, you can select this option to supply a stored secret that references an AWS access key and secret key.

Auto Authentication

When using an IAM role to authenticate with Kinesis Streams, the IAM policy statements must include the following Actions:

  • kinesis:GetRecords
  • kinesis:GetShardIterator
  • kinesis:ListShards

For details, see AWS' Actions, Resources, and Condition Keys for Amazon Kinesis documentation.

Manual Authentication

The Manual option exposes these additional fields:

Access key: Enter your AWS access key. If not present, will fall back to env.AWS_ACCESS_KEY_ID, or to the metadata endpoint for IAM role credentials.

Secret key: Enter your AWS secret key. If not present, will fall back to env.AWS_SECRET_ACCESS_KEY, or to the metadata endpoint for IAM credentials.

Secret Authentication

The Secret option exposes this additional field:

Secret key pair: Use the drop-down to select a secret key pair that you've configured in LogStream's internal secrets manager or (if enabled) an external KMS. Follow the Create link if you need to configure a key pair.

Assume Role

Enable for Kinesis Streams: Whether to use Assume Role credentials to access Kinesis Streams. Defaults to No.

AssumeRole ARN: Enter the Amazon Resource Name (ARN) of the role to assume.

External ID: Enter the External ID to use when assuming role.

Processing Settings

Fields (Metadata)

In this section, you can add fields/metadata to each event, using Eval-like functionality.

  • Name: Field name.
  • Value: JavaScript expression to compute field's value (can be a constant).

Pre-Processing

In this section's Pipeline drop-down list, you can select a single existing Pipeline to process data from this input before the data is sent through the Routes.

Advanced Settings

Shard selection expression: A JavaScript expression to be called with each shardId for the stream. The shard will be processed if the expression evaluates to a truthy value. Defaults to true.

Service Period: Time interval (in minutes) between consecutive service calls. Defaults to 1 minute.

Endpoint: Kinesis stream service endpoint. If empty, the endpoint will be automatically constructed from the region.

Signature version: Signature version to use for signing Kinesis Stream requests. Defaults to v4.

Verify KPL checksums: Enable this setting to verify Kinesis Producer Library (KPL) event checksums.

Reuse connections: Whether to reuse connections between requests. The default setting (Yes) can improve performance.

Reject unauthorized certificates: Whether to accept certificates that cannot be verified against a valid Certificate Authority (e.g., self-signed certificates). Defaults to Yes.

Environment: If you're using GitOps, optionally use this field to specify a single Git branch on which to enable this configuration. If empty, the config will be enabled everywhere.

Connected Destinations

Select Send to Routes to enable conditional routing, filtering, and cloning of this Source's data via the Routing table.

Select QuickConnect to send this Source’s data to one or more Destinations via independent, direct connections.

Internal Fields

Cribl LogStream uses a set of internal fields to assist in handling of data. These "meta" fields are not part of an event, but they are accessible, and Functions can use them to make processing decisions.

Field for this Source:

  • __inputId

How LogStream Pulls Data

Worker Processes get a list of available shards from Kinesis, and contact the Leader Node to fetch the latest sequence numbers. Based on the sequence number's value, the Worker either resumes the shard reading from where LogStream previously left off, or starts reading from the beginning.

The Kinesis Streams Source stores shard state on disk, so that it can pick up where it left off across restarts. The state file is located in LogStream's state/ subdirectory; the path format looks like this:

.../state/kvstore/<groupId>/input_kinesis_<inputId>_<streamName>/state.json

For example:

state/kvstore/default/input_kinesis_kinesisIn_just-a-test/state.json

Worker Processes become Kinesis Consumers, and fetch the records for the assigned shards. Every 5 minutes, each Worker Process forwards to the Leader Node the latest sequence numbers for the shards that Worker Process is responsible for. The Leader Node persists the shardId > sequenceNumber mapping to disk.