Cribl LogStream – Docs

Cribl LogStream Documentation

Questions? We'd love to help you! Meet us in #Cribl Community Slack (sign up)
Download entire manual as PDF - v2.4.0

Aggregations

Description

The Aggregations Function performs aggregate statistics on event data.

Safeguarding Data

Upon shutdown, LogStream will attempt to flush the buffers that hold aggregated data, to avoid data loss. If you set a Time window greater than an hour, Cribl recommends adjusting the Aggregation memory limit and/or Aggregation event limit to prevent the system from running out of memory. This is especially necessary for high-cardinality data. (Both settings default to unlimited, but we recommend setting defined limits, based on testing.)

Usage

Filter: Filter expression (JS) that selects data to be fed through the Function. Defaults to true, meaning that all events will be evaluated.

Description: Simple description about this Function. Defaults to empty.

Final: If true, stops data from being fed to the downstream Functions. Defaults to No.

Time window: The time span of the tumbling window for aggregating events. Must be a valid time string (e.g., 10s). Must match pattern \d+[sm]$.

Aggregates: Aggregate function(s) to perform on events. E.g., sum(bytes).where(action=='REJECT').as(TotalBytes). Expression format: aggFunction(<FieldExpression>).where(<FilterExpression>).as(<outputField>). See more examples below.

  • Note: When used without as(), the aggregate's output will be placed in a field labeled <aggFunction>_<fieldName>. If there are conflicts, the last aggregate wins. For example, given two aggregates – sum(bytes).where(action=='REJECT') and sum(bytes) – the latter one (sum_bytes) is the winner.

Group by Fields: Fields to group aggregates by.

Evaluate fields: Set of key/value pairs to evaluate and add/set. Fields are added in the context of an aggregated event, before they’re sent out. Does not apply to passthrough events.

Time Window Settings

Cumulative aggregations: Determines if the aggregations should be retained for cumulative aggregations, or reset to 0, when flushing out an aggregation table event. Defaults to No.

Lag tolerance: The lag tolerance represents the tumbling window tolerance to late events. Must be a valid time string (e.g., 10s). Must match pattern \d+[sm]$.

Idle bucket time limit: The amount of time to wait before flushing a bucket that has not received events. Must be a valid time string (e.g., 10s). Must match pattern \d+[sm]$.

Output Settings

Passthrough mode : Determines whether to pass through the original events along with the aggregation events. Defaults to No.

Metrics mode: Determines whether to output aggregates as metrics. Defaults to No, causing aggregates to be output as events.

Sufficient stats mode: Determines whether to output only statistics sufficient for the supplied aggregations. Defaults to No, meaning output richer statistics.

Output prefix: A prefix that is prepended to all of the fields output by this Aggregations Function.

Advanced Settings

Aggregation event limit: The maximum number of events to include in any given aggregation event. Defaults to unlimited. Must be at least 1.

Aggregation memory limit: The memory usage limit to impose upon aggregations. Defaults to unlimited (i.e., the amount of memory available in the system).

List of Aggregate Functions

avg(expr:FieldExpression): Returns the average of the values of the parameter.
count(expr:FieldExpression): Returns the number of occurrences of the values of the parameter.
dc(expr: FieldExpression, errorRate: number = 0.01): Returns the estimated number of distinct values of the <expr> parameter, within a relative error rate.
distinct_count(expr: FieldExpression, errorRate: number = 0.01): Returns the estimated number of distinct values of the <expr> parameter, within a relative error rate.
earliest(expr:FieldExpression): Returns the earliest (based on _time) observed value of the parameter.
first(expr:FieldExpression): Returns the first observed value of the parameter.
last(expr:FieldExpression): Returns the last observed value of the parameter.
latest(expr:FieldExpression): Returns the latest (based on _time) observed value of the parameter.
max(expr:FieldExpression): Returns the maximum value of the parameter.
min(expr:FieldExpression): Returns the minimum value of the parameter.
per_second(expr:FieldExpression): Returns the per second rate (based on _time) observed value of the parameter.
perc(level: number, expr: FieldExpression): Returns <level> percentile value of the numeric values of the <expr> parameter.
rate(expr:FieldExpression, timeString: string = '1s'): Returns the rate (based on _time) observed value of the parameter.
stddev(expr:FieldExpression): Returns the sample standard deviation of the values of the parameter.
stddevp(expr:FieldExpression): Returns the population standard deviation of the values of the parameter.
sum(expr:FieldExpression): Returns the sum of the values of the parameter.
sumsq(expr:FieldExpression): Returns the sum of squares of the values of the parameter.
variance(expr:FieldExpression): Returns the sample variance of the values of the parameter.
variancep(expr:FieldExpression): Returns the population variance of the values of the parameter.

How Do Time Window Settings Work?

Lag Tolerance

As events are aggregated into windows, there is a good chance that most will arrive later than their event time. For instance, given a 10s window (10:42:00 - 10:42:10), an event with timestamp 10:42:03 might come in 2 seconds later at 10:42:05.

In several cases, there will also be late, or lagging, events that will arrive after the latest time window boundary. For example, an event with timestamp 10:42:04 might arrive at 10:42:12. Lag Tolerance is the setting that governs how long to wait – after the latest window boundary – and still accept late events.

The "bucket" of events is said to be in Stage 1, where it's still accepting new events, but it's not yet finalized. Notice how in the third case, an event with event time 10:42:09 arrives 1 second past the window boundary at 10:42:11, but it's still accepted because it happens before the lag time expires.

After the lag time expires, the bucket moves to Stage 2.

If the bucket is created from a historic stream, then the bucket is initiated in Stage 2. Lag time is not considered. A "historic" stream is one where the latest time of a bucket is before now(). E.g., if the window size is 10s, and now()=10:42:42, an event with event_time=10 will be placed in a Stage 2 bucket with range 10:42:10 - 10:42:20.

Idle Bucket Time Limit

While Lag Tolerance works with event time, Idle Bucket Time Limit works on arrival time (i.e., real time). It is defined as the amount of time to wait before flushing a bucket that has not received events.

After the Idle Time limit is reached, the bucket is "flushed" and sent out of the system.

Examples

Assume we're working with VPC Flowlog events that have the following structure:

version account_id interface_id srcaddr dstaddr srcport dstport protocol packets bytes start end action log_status

For example:

2 99999XXXXX eni-02f03c2880e4aaa3 10.0.1.70 10.0.1.11 9999 63030 6 6556 262256 1554562460 1554562475 ACCEPT OK
2 496698360409 eni-08e66c4525538d10b 37.23.15.38 10.0.2.232 4373 8108 6 1 52 1554562456 1554562466 REJECT OK

Scenario A:

Every 10s, compute sum of bytes and output it in a field called TotalBytes.

Time Window: 10s
Aggregations: sum(bytes).as(TotalBytes)

Scenario B:

Every 10s, compute sum of bytes, output it in a field called TotalBytes, group by srcaddr.

Time Window: 10s
Aggregations: sum(bytes).as(TotalBytes)
Group by Fields: srcaddr

Scenario C:

Every 10s, compute sum of bytes but only where action is REJECT, output it in a field called TotalBytes, group by srcaddr.

Time Window: 10s
Aggregations: sum(bytes).where(action=='REJECT').as(TotalBytes)
Group by Fields: srcaddr

Scenario D:

Every 10s, compute sum of bytes but only where action is REJECT, output it in a field called TotalBytes. Also, compute distinct count of srcaddr.

Time Window: 10s
Aggregations:
sum(bytes).where(action=='REJECT').as(TotalBytes)
distinct_count(srcaddr).where(action=='REJECT')

📘

For further examples, see Engineering Deep Dive: Streaming Aggregations Part 2 – Memory Optimization

Updated 6 days ago

Aggregations


Suggested Edits are limited on API Reference Pages

You can only suggest edits to Markdown body content, but not to the API spec.