Amazon S3

Cribl Stream supports receiving data from Amazon S3 buckets, using event notifications through Amazon Simple Queue Service (SQS).

Type: Pull | TLS Support: YES (secure API) | Event Breaker Support: YES

Cribl Stream running on Linux (only) can use this Source to read Parquet files, identified by a .parquet, .parq, or .pqt filename extension.

See our Amazon S3 Better Practices and Using S3 Storage and Replay guides.

S3 Setup Strategy

The source S3 bucket must be configured to send s3:ObjectCreated:* events to an SQS queue, either directly (easiest) or via SNS (Amazon Simple Notification Service). See the event notification configuration guidelines below.

SQS messages will be deleted after they’re read, unless an error occurs, in which case Cribl Stream will retry. This means that although Cribl Stream will ignore files not matching the Filename Filter, their SQS events/notifications will still be read, and then deleted from the queue (along with those from files that match).

These ignored files will no longer be available to other S3 Sources targeting the same SQS queue. If you still need to process these files, we suggest one of these alternatives:

  • Using a different, dedicated SQS queue. (Preferred and recommended.)

  • Applying a broad filter on a single Source, and then using pre-processing Pipelines an/or Route filters for further processing.

Compression

Cribl Stream can ingest compressed S3 files if they meet all the following conditions:

  • Compressed with the x-gzip MIME type.
  • End with the .gz extension.
  • Can be uncompressed using the zlib.gunzip algorithm.

Storage Class Compatibility

Cribl Stream does not support data preview, collection, or replay from S3 Glacier or S3 Deep Glacier storage classes, whose stated retrieval lags (variously minutes to 48 hours) cannot guarantee data availability when the Collector needs it.

Cribl Stream does support data preview, collection, and replay from S3 Glacier Instant Retrieval when you’re using the S3 Intelligent-Tiering storage class.

Configuring Cribl Stream to Receive Data from Amazon S3

From the top nav, click Manage, then select a Worker Group to configure. Next, you have two options:

To configure via the graphical QuickConnect UI, click Routing > QuickConnect (Stream) or Collect (Edge). Next, click Add Source at left. From the resulting drawer’s tiles, select [Pull > ] Amazon > S3. Next, click either Add Destination or (if displayed) Select Existing. The resulting drawer will provide the options below.

Or, to configure via the Routing UI, click Data > Sources (Stream) or More > Sources (Edge). From the resulting page’s tiles or left nav, select [Pull > ] Amazon > S3. Next, click New Source to open a New Source modal that provides the options below.

General Settings

Input ID: Enter a unique name to identify this S3 Source definition. If you clone this Source, Cribl Stream will add -CLONE to the original Input ID.

Queue: The name, URL, or Amazon Resource Name (ARN) of the SQS queue to read events from. When specifying a non-AWS URL, you must use the format: {url}/<queueName>. (For example: https://host:port/<queueName>.) This value must be a JavaScript expression (which can evaluate to a constant), enclosed in single quotes, double quotes, or backticks.

Optional Settings

Filename filter: Regex matching file names to download and process. Defaults to .*, to match all characters. This regex will be evaluated against the S3 key’s full path.

Region: AWS Region where the S3 bucket and SQS queue are located. Required, unless the Queue entry is a URL or ARN that includes a Region.

Tags: Optionally, add tags that you can use to filter and group Sources in Cribl Stream’s Manage Sources page. These tags aren’t added to processed events. Use a tab or hard return between (arbitrary) tag names.

Authentication

Use the Authentication method drop-down to select an AWS authentication method.

Auto: This default option uses the AWS instance’s metadata service to automatically obtain short-lived credentials from the IAM role attached to an EC2 instance, local credentials, sidecar, or other source. The attached IAM role grants Cribl Stream Workers access to authorized AWS resources. Can also use the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY. Works only when running on AWS.

Manual: If not running on AWS, you can select this option to enter a static set of user-associated IAM credentials (your access key and secret key) directly or by reference. This is useful for Workers not in an AWS VPC, such as those running a private cloud. The Manual option exposes these corresponding additional fields:

  • Access key: Enter your AWS access key. If not present, will fall back to the env.AWS_ACCESS_KEY_ID environment variable, or to the metadata endpoint for IAM role credentials.

  • Secret key: Enter your AWS secret key. If not present, will fall back to the env.AWS_SECRET_ACCESS_KEY environment variable, or to the metadata endpoint for IAM credentials.

Secret: If not running on AWS, you can select this option to supply a stored secret that references an AWS access key and secret key. The Secret option exposes this additional field:

  • Secret key pair: Use the drop-down to select a secret key pair that you’ve configured in Cribl Stream’s internal secrets manager or (if enabled) an external KMS. Follow the Create link if you need to configure a key pair.

Assume Role

When using Assume Role to access resources in a different region than Cribl Stream, you can target the AWS Security Token Service (STS) endpoint specific to that region by using the CRIBL_AWS_STS_REGION environment variable on your Worker Node. Setting an invalid region results in a fallback to the global STS endpoint.

Enable for S3: Whether to use Assume Role credentials to access S3. Defaults to Yes.

Enable for SQS: Whether to use Assume Role credentials when accessing SQS. Defaults to No.

AWS account ID: SQS queue owner’s AWS account ID. Leave empty if the SQS queue is in the same AWS account.

AssumeRole ARN: Enter the ARN of the role to assume.

External ID: Enter the External ID to use when assuming role.

Duration (seconds): Duration of the Assumed Role’s session, in seconds. Minimum is 900 (15 minutes). Maximum is 43200 (12 hours). Defaults to 3600 (1 hour).

Processing Settings

Custom Command

In this section, you can pass the data from this input to an external command for processing, before the data continues downstream.

Enabled: Defaults to No. Toggle to Yes to enable the custom command.

Command: Enter the command that will consume the data (via stdin) and will process its output (via stdout).

Arguments: Click Add Argument to add each argument to the command. You can drag arguments vertically to resequence them.

Event Breakers

This section defines event breaking rulesets that will be applied, in order.

Event Breaker Rulesets: A list of event breaking rulesets that will be applied to the input data stream before the data is sent through the Routes. Defaults to System Default Rule.

Event Breaker buffer timeout: How long (in milliseconds) the Event Breaker will wait for new data to be sent to a specific channel, before flushing out the data stream, as-is, to the Routes. Minimum 10 ms, default 10000 (10 sec), maxiumum 43200000 (12 hours).

Fields

In this section, you can add Fields to each event, using Eval-like functionality.

Name: Field name.

Value: JavaScript expression to compute field’s value, enclosed in quotes or backticks. (Can evaluate to a constant.)

Pre-Processing

In this section’s Pipeline drop-down list, you can select a single existing Pipeline to process data from this input before the data is sent through the Routes.

Advanced Settings

Checkpoint Settings

Enable checkpointing: When toggled to No (the default) Cribl Stream will always start processing a file from the beginning. Toggle to Yes if, after an error or restart occurs, you want Cribl Stream to resume processing a given file from the point where it was interrupted, as explained below. When turned on, the following additional setting becomes available:

  • Retries: The number of times you want Cribl Stream to retry processing a file after an error or restart occurs. If Skip file on error is enabled, this setting is ignored.

Other Settings

Endpoint: S3 service endpoint. If empty, defaults to AWS’s region-specific endpoint. Otherwise, used to point to an S3-compatible endpoint. To access the AWS S3 endpoints, use the path-style URL format. You don’t need to specify the bucket name in the URL, because Cribl Stream will automatically add it to the URL path. For details, see AWS’ Path‑Style Requests topic.

Signature version: Signature version to use for signing SQS requests. Defaults to v4.

Max messages: The maximum number of messages that SQS should return in a poll request. Amazon SQS never returns more messages than this value. (However, fewer messages might be returned.) Acceptable values: 1 to 10. Defaults to 1.

Visibility timeout seconds: The duration (in seconds) that the received messages are hidden from subsequent retrieve requests, after being retrieved by a ReceiveMessage request. Defaults to 600.

Cribl Stream will automatically extend this timeout until the initial request’s files have been processed – notably, in the case of large files that require additional processing time.

Num receivers: The number of receiver processes to run. The higher the number, the better the throughput, at the expense of CPU overhead. Defaults to 1.

Poll timeout (secs): How long to wait for events before polling again. Minimum 1 second; default 10; maximum 20. Short durations increase the number ‑ and thus the cost – of requests sent to AWS. (The UI will show a warning for intervals shorter than 5 seconds.) Long durations increase the time the Source takes to react to configuration changes and system restarts.

Socket timeout: Socket inactivity timeout (in seconds). Increase this value if retrievals time out during backpressure. Defaults to 300 seconds.

Max Parquet chunk size (MB): Maximum size for each Parquet chunk. Defaults to 5 MB. Valid range is 1 to 100 MB. Cribl Stream stores chunks in the location specified by the CRIBL_TMP_DIR environment variable. It removes the chunks immediately after reading them. See Environment Variables.

Parquet chunk download timeout (seconds): The maximum time to wait for a Parquet file’s chunk to be downloaded. If a required chunk cannot not be downloaded within this time limit, processing will end. Defaults to 600 seconds. Valid range is 1 second to 3600 seconds (1 hour).

Skip file on error: Toggle to Yes to skip files that trigger a processing error (for example, corrupted files). Defaults to No, which enables retries after a processing error.

Reuse connections: Whether to reuse connections between requests. The default setting (Yes) can improve performance.

Reject unauthorized certificates: Whether to accept certificates that cannot be verified against a valid Certificate Authority (for example, self-signed certificates). Defaults to Yes.

Environment: If you’re using GitOps, optionally use this field to specify a single Git branch on which to enable this configuration. If empty, the config will be enabled everywhere.

Checkpointing

When Workers restart, checkpointing can prevent the Source from losing data or collecting duplicate events. Among the situations where this works well:

  • When Cribl Stream pushes a new configuration: the Leader pushes the configuration to the Workers, and restarts them.
  • When a Worker has gotten into a bad state, and you want to restart it.

Checkpointing allows Cribl Stream to resume processing a file at the point where an error or restart occurred, minimizing the number of duplicate events sent downstream in the event of an interruption. To accomplish this, Cribl Stream keeps track of how many events have been processed for a given file. Resuming processing after an interruption, the Source skips the first N events it has seen and starts processing from event N+1.

The Source manages checkpoint information in memory, but writes it out to disk periodically (once per second, and on Worker shutdown) so that the information can survive a Worker restart. Cribl Stream keeps checkpoint information at the Worker Process level and does not share it between Worker Processes or Worker Nodes.

Checkpointing is a feature to keep in mind when you do disaster recovery (DR) planning.

Caveats to using checkpointing include: Whenever Worker Node state is destroyed (that is, when a Worker ceases to exist), checkpoint information will be lost. In Cribl.Cloud deployments, this can happen during upgrades. Also, because checkpoints are not written to disk on every event, the checkpoints written to disk will lag behind those that the S3 Source manages in memory. If the Worker Process stops abruptly without a clean shutdown, checkpoints not yet written to disk will be lost. When processing resumes, the Source will start from just after the last checkpoint written to disk, producing duplicate events corresponding to the lost checkpoints.

Located in the UI at Advanced Settings > Checkpoint Settings > Enable checkpointing, this feature is turned off by default.

Connected Destinations

Select Send to Routes to enable conditional routing, filtering, and cloning of this Source’s data via the Routing table.

Select QuickConnect to send this Source’s data to one or more Destinations via independent, direct connections.

Internal Fields

Cribl Stream uses a set of internal fields to assist in handling of data. These “meta” fields are not part of an event, but they are accessible, and Functions can use them to make processing decisions.

Fields for this Source:

  • __final
  • __inputId
  • __source
  • _time

How to Configure S3 to Send Event Notifications to SQS

For step-by-step instructions, see AWS’ Walkthrough: Configure a Bucket for Notifications (SNS Topic and SQS Queue).

  1. Create a Standard SQS Queue. Note its ARN.

  2. Replace its access policy with one similar to the examples below. To do so, select the queue; and then, in the Permissions tab, click: Edit Policy Document (Advanced). (These examples differ only at line 9, showing public access to the SQS queue versus S3-only access to the queue.)

  3. In the Amazon S3 console, add a notification configuration to publish events of the s3:ObjectCreated:* type to the SQS queue.

Permissive SQS access policyRestrictive SQS access policy
{
 "Version": "2012-10-17",
 "Id": "example-ID",
 "Statement": [
  {
   "Sid": "<SID name>",
   "Effect": "Allow",
   "Principal": {
    "AWS":"*"  
   },
   "Action": [
    "SQS:SendMessage"
   ],
   "Resource": "example-SQS-queue-ARN",
   "Condition": {
      "ArnLike": { "aws:SourceArn": "arn:aws:s3:*:*:example-bucket-name" }
   }
  }
 ]
}
{
 "Version": "2012-10-17",
 "Id": "example-ID",
 "Statement": [
  {
   "Sid": "<SID name>",
   "Effect": "Allow",
   "Principal": {
    "Service":"s3.amazonaws.com"  
   },
   "Action": [
    "SQS:SendMessage"
   ],
   "Resource": "example-SQS-queue-ARN",
   "Condition": {
      "ArnLike": { "aws:SourceArn": "arn:aws:s3:*:*:example-bucket-name" }
   }
  }
 ]
}

S3 and SQS Permissions

The following permissions are required on the S3 bucket:

  • s3:GetObject
  • s3:ListBucket

The following permissions are required on the SQS queue:

  • sqs:ReceiveMessage
  • sqs:DeleteMessage
  • sqs:ChangeMessageVisibility
  • sqs:GetQueueAttributes
  • sqs:GetQueueUrl

Temporary Access via SSO Provider

You can use Okta or SAML to provide access to S3 buckets using temporary security credentials.

Proxying Requests

If you need to proxy HTTP/S requests, see System Proxy Configuration.

How Cribl Stream Pulls Data

Workers poll message from SQS. The call will return messages if they are available, or will time out after 1 second if no messages are available.

Each Worker gets its share of the load from S3. By default, S3 returns a maximum of 1 message in a single poll request. You can change this default in Max messages.

Best Practices

Beyond these basics, also see our Amazon S3 Better Practices and Using S3 Storage and Replay guides:

  • When Cribl Stream instances are deployed on AWS, use IAM Roles whenever possible.

    • Not only is this safer, but it also makes the configuration simpler to maintain.
  • Although optional, we highly recommend that you use a Filename Filter.

    • This will ensure that Cribl Stream ingests only files of interest.
    • Ingesting only what’s strictly needed improves latency, processing power, and data quality.
  • If higher throughput is needed, increase Advanced Settings > Number of Receivers and/or Max messages. However, do note:

    • These are set at 1 by default. As a result, each Worker Process, in each Cribl Stream Worker Node, will run one receiver consuming one message (that is, S3 file) at a time.
    • Total S3 objects processed at a time per Worker Node = Worker Processes x Number of Receivers x Max Messages
    • Increased throughput implies additional CPU utilization.
  • When ingesting large files, tune up the Visibility Timeout, or consider using smaller objects.

    • The default value of 600s works well in most cases, and while you certainly can increase it, we suggest that you also consider using smaller S3 objects.

Troubleshooting Notes

  • VPC endpoints for SQS and for S3 might need to be set up in your account. Check with your administrator for details.

  • If you’re having connectivity issues, but having no problems with the CLI, see if the AWS CLI proxy is in use. Check with your administrator for details.

Troubleshooting Resources

Cribl University offers an Advanced Troubleshooting > Source Integrations: S3 short course. To follow the direct course link, first log into your Cribl University account. (To create an account, click the Sign up link. You’ll need to click through a short Terms & Conditions presentation, with chill music, before proceeding to courses – but Cribl’s training is always free of charge.) Once logged in, check out other useful Advanced Troubleshooting short courses and Troubleshooting Criblets.

See also AWS Sources/​Destinations & S3-Compatible Stores for information on common errors.