On This Page

Home / Cribl as Code/ Code Examples/Configure Resources

Configure Resources

Preview Feature

The Cribl SDKs are Preview features that are still being developed. We do not recommend using them in a production environment, because the features might not be fully tested or optimized for performance, and related documentation could be incomplete.

Please continue to submit feedback through normal Cribl support channels, but assistance might be limited while the features remain in Preview.

These code examples demonstrate how to use the Cribl Python SDK for the control plane or the Cribl API to configure a Syslog Source, an S3 Destination, a Pipeline, and a Route in Cribl Stream.

About the Code Examples

The code examples use Bearer token authentication. Read the authentication documentation to learn how to get a Bearer token. The Permissions granted to your Bearer token must include creating and managing resources.

Replace the variables in the examples with the corresponding information for your Cribl deployment.

For customer-managed deployments, to use https in the URLs, you must configure Transport Layer Security (TLS).

The resource configurations in the examples do not include all available body parameters. For a complete list of body parameters for each resource, refer to the endpoint documentation in the API Reference.

Configure Resources with the Python SDK

This example demonstrates how to use the Python SDK for the control plane to create the following resources in Cribl Stream:

  • A Worker Group to manage the configuration.
  • A Syslog Source to receive data on port 9021.
  • An S3 Destination to store processed data.
  • A Pipeline that filters events and keeps only data in the eventSource and eventID fields.
  • A Route that connects the Source, Pipeline, and Destination.

This example also deploys the resource configurations to a Worker Group to make them active.

Python SDK (Cribl.Cloud)Python SDK (Customer-Managed Deployment)
"""
Replace the placeholder values for ORG_ID, CLIENT_ID, CLIENT_SECRET,
and WORKSPACE_NAME with your Organization ID, Client ID and Secret, and
Workspace name. To get your CLIENT_ID and CLIENT_SECRET values, follow
the steps at https://docs.cribl.io/cribl-as-code/authentication/#cloud-auth.
Your Client ID and Secret are sensitive information and should be kept private.

NOTE: This example is for Cribl.Cloud deployments only.

Prerequisites:
- Your AWS S3 values for AWS_API_KEY, AWS_SECRET_KEY, AWS_BUCKET_NAME, and
AWS_REGION.
- An Enterprise License on the server.
"""

import asyncio
from cribl_control_plane import CriblControlPlane
from cribl_control_plane.models import (
    ConfigGroup,
    ProductsCore,
    InputSyslogSyslog2,
    InputSyslogType2,
    OutputS3,
    OutputS3Type,
    OutputS3Compression,
    OutputS3CompressionLevel,
    Pipeline,
    RoutesRoute,
    Conf,
    PipelineFunctionConf,
    FunctionSpecificConfigs,
    InputSyslogTLSSettingsServerSide2,
    Security,
    SchemeClientOauth
)

ORG_ID = "your-org-id"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"
WORKSPACE_NAME = "your-workspace-name"
WORKER_GROUP_ID = "my-group"

base_url = f"https://{WORKSPACE_NAME}-{ORG_ID}.cribl.cloud/api/v1"
group_url = f"{base_url}/m/{WORKER_GROUP_ID}"

# Syslog Source configuration
SYSLOG_PORT = 9021

# S3 Destination configuration: Replace the placeholder values
AWS_API_KEY = "your-aws-api-key"  # Replace with your AWS Access Key ID
AWS_SECRET_KEY = "your-aws-secret-key"  # Replace with your AWS Secret Access Key
AWS_BUCKET_NAME = "your-aws-bucket-name"  # Replace with your S3 bucket name
AWS_REGION = "us-east-2"  # Replace with your S3 bucket region

# Worker Group configuration
my_worker_group = ConfigGroup(
    on_prem=True,
    worker_remote_access=True,
    is_fleet=False,
    is_search=False,
    id=WORKER_GROUP_ID,
)

# Syslog Source configuration
syslog_source = InputSyslogSyslog2(
    id="in-syslog-9021",
    type=InputSyslogType2.SYSLOG,
    tcp_port=SYSLOG_PORT,
    tls=InputSyslogTLSSettingsServerSide2(disabled=True),
)

# S3 Destination configuration
s3_destination = OutputS3(
    id="out_s3",
    type=OutputS3Type.S3,
    bucket=AWS_BUCKET_NAME,
    region=AWS_REGION,
    aws_secret_key=AWS_SECRET_KEY,
    aws_api_key=AWS_API_KEY,
    compress=OutputS3Compression.GZIP,
    compression_level=OutputS3CompressionLevel.BEST_SPEED,
    empty_dir_cleanup_sec=300,
)

# Pipeline configuration: filter events and keep only data in the "eventSource" and "eventID" fields
pipeline = Pipeline(
    id="my_pipeline",
    conf=Conf(
        async_func_timeout=1000,
        functions=[
            PipelineFunctionConf(
                filter_="true",
                conf=FunctionSpecificConfigs.model_validate(
                    {  # type: ignore
                        "remove": ["*"],
                        "keep": ["eventSource", "eventID"],
                    }
                ),
                id="eval",
                final=True,
            )
        ],
    ),
)

# Route configuration: route data from the Source to the Pipeline and Destination
route = RoutesRoute(
    final=False,
    id="my_route",
    name="my_route",
    pipeline=pipeline.id,
    output=s3_destination.id,
    filter_=f"__inputId=='{syslog_source.id}'",
    description="This is my new Route",
)


async def main():
    # Create authenticated SDK client
    client_oauth = SchemeClientOauth(
      client_id=CLIENT_ID,
      client_secret=CLIENT_SECRET,
      token_url="https://login.cribl.cloud/oauth/token",
      audience="https://api.cribl.cloud",
    )
    security = Security(client_oauth=client_oauth)
    cribl = CriblControlPlane(server_url=base_url, security=security)

    # Verify that Worker Group doesn't already exist
    worker_group_response = cribl.groups.get(id=my_worker_group.id, product=ProductsCore.STREAM)
    if worker_group_response.items and len(worker_group_response.items) > 0:
        print(
            f"❌ Worker Group already exists: {my_worker_group.id}. Try a different Worker Group ID."
        )
        return

    # Create Worker Group
    cribl.groups.create(
        product=ProductsCore.STREAM,
        id=my_worker_group.id,
        on_prem=my_worker_group.on_prem,
        worker_remote_access=my_worker_group.worker_remote_access,
        is_fleet=my_worker_group.is_fleet,
        is_search=my_worker_group.is_search,
    )
    print(f"✅ Worker Group created: {my_worker_group.id}")

    # Create Syslog Source
    cribl.sources.create(request=syslog_source, server_url=group_url)
    print(f"✅ Syslog source created: {syslog_source.id}")

    # Create S3 Destination
    cribl.destinations.create(request=s3_destination, server_url=group_url)
    print(f"✅ S3 Destination created: {s3_destination.id}")

    # Create Pipeline
    cribl.pipelines.create(id=pipeline.id, conf=pipeline.conf, server_url=group_url)
    print(f"✅ Pipeline created: {pipeline.id}")

    # Add Route to Routing table
    routes_list_response = cribl.routes.list(server_url=group_url)
    if not routes_list_response.items or len(routes_list_response.items) == 0:
        raise Exception("No Routes found")

    routes = routes_list_response.items[0]
    if not routes or not routes.id:
        raise Exception("No Routes found")

    routes.routes = [route] + (routes.routes or [])
    cribl.routes.update(
        id_param=routes.id, id=routes.id, routes=routes.routes, server_url=group_url
    )
    print(f"✅ Route added: {route.id}")

    # Commit configuration changes
    commit_response = cribl.versions.commits.create(
        group_id=my_worker_group.id,
        message="Commit for Cribl Stream example",
        effective=True,
        files=["."]
    )
    
    if not commit_response.items or len(commit_response.items) == 0:
        raise Exception("Failed to commit configuration changes")
    
    version = commit_response.items[0].commit
    print(f"✅ Committed configuration changes to the group: {my_worker_group.id}, commit ID: {version}")

    # Deploy configuration changes
    cribl.groups.deploy(
        product=ProductsCore.STREAM,
        id=my_worker_group.id,
        version=version
    )
    print(f"✅ Worker Group changes deployed: {my_worker_group.id}")


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except Exception as error:
        print(f"❌ Something went wrong: {error}")
"""
Replace the placeholder values for ONPREM_SERVER_URL, ONPREM_USERNAME, and
ONPREM_PASSWORD with your server URL and credentials. Your credentials are
sensitive information and should be kept private.

NOTE: This example is for customer-managed deployments only.

Prerequisites:
- Your AWS S3 values for AWS_API_KEY, AWS_SECRET_KEY, AWS_BUCKET_NAME, and
AWS_REGION.
- An Enterprise License on the server.
"""

import asyncio
from cribl_control_plane import CriblControlPlane
from cribl_control_plane.models import (
    Security,
    ConfigGroup,
    ProductsCore,
    InputSyslogSyslog2,
    InputSyslogType2,
    OutputS3,
    OutputS3Type,
    OutputS3Compression,
    OutputS3CompressionLevel,
    Pipeline,
    RoutesRoute,
    Conf,
    PipelineFunctionConf,
    FunctionSpecificConfigs,
    InputSyslogTLSSettingsServerSide2,
)

ONPREM_SERVER_URL = "http://localhost:9000"  # Replace with your server URL
ONPREM_USERNAME = "admin"  # Replace with your username
ONPREM_PASSWORD = "admin"  # Replace with your password
WORKER_GROUP_ID = "your-worker-group-id"

# Syslog Source configuration
SYSLOG_PORT = 9021

# S3 Destination configuration: Replace the placeholder values
AWS_API_KEY = "your-aws-api-key"  # Replace with your AWS Access Key ID
AWS_SECRET_KEY = "your-aws-secret-key"  # Replace with your AWS Secret Access Key
AWS_BUCKET_NAME = "your-aws-bucket-name"  # Replace with your S3 bucket name
AWS_REGION = "us-east-2"  # Replace with your S3 bucket region

base_url = f"{ONPREM_SERVER_URL}/api/v1"

# Worker Group configuration
my_worker_group = ConfigGroup(
    on_prem=True,
    worker_remote_access=True,
    is_fleet=False,
    is_search=False,
    id=WORKER_GROUP_ID,
)

# Syslog Source configuration
syslog_source = InputSyslogSyslog2(
    id="in-syslog-9021",
    type=InputSyslogType2.SYSLOG,
    tcp_port=SYSLOG_PORT,
    tls=InputSyslogTLSSettingsServerSide2(disabled=True),
)

# S3 Destination configuration
s3_destination = OutputS3(
    id="out_s3",
    type=OutputS3Type.S3,
    bucket=AWS_BUCKET_NAME,
    region=AWS_REGION,
    aws_secret_key=AWS_SECRET_KEY,
    aws_api_key=AWS_API_KEY,
    compress=OutputS3Compression.GZIP,
    compression_level=OutputS3CompressionLevel.BEST_SPEED,
    empty_dir_cleanup_sec=300,
)

# Pipeline configuration: filter events and keep only data in the "eventSource" and "eventID" fields
pipeline = Pipeline(
    id="my_pipeline",
    conf=Conf(
        async_func_timeout=1000,
        functions=[
            PipelineFunctionConf(
                filter_="true",
                conf=FunctionSpecificConfigs.model_validate(
                    {  # type: ignore
                        "remove": ["*"],
                        "keep": ["eventSource", "eventID"],
                    }
                ),
                id="eval",
                final=True,
            )
        ],
    ),
)

# Route configuration: route data from the Source to the Pipeline and Destination
route = RoutesRoute(
    final=False,
    id="my_route",
    name="my_route",
    pipeline=pipeline.id,
    output=s3_destination.id,
    filter_=f"__inputId=='{syslog_source.id}'",
    description="This is my new Route",
)

group_url = f"{base_url}/m/{my_worker_group.id}"


async def main():
    # Initialize Cribl client
    cribl = CriblControlPlane(server_url=base_url)
    response = await cribl.auth.tokens.get_async(
        username=ONPREM_USERNAME, password=ONPREM_PASSWORD
    )
    token = response.token
    security = Security(bearer_auth=token)
    cribl = CriblControlPlane(server_url=base_url, security=security)

    # Verify that Worker Group doesn't already exist
    worker_group_response = cribl.groups.get(id=my_worker_group.id, product=ProductsCore.STREAM)
    if worker_group_response.items and len(worker_group_response.items) > 0:
        print(
            f"❌ Worker Group already exists: {my_worker_group.id}. Try a different Worker Group ID."
        )
        return

    # Create Worker Group
    cribl.groups.create(
        product=ProductsCore.STREAM,
        id=my_worker_group.id,
        on_prem=my_worker_group.on_prem,
        worker_remote_access=my_worker_group.worker_remote_access,
        is_fleet=my_worker_group.is_fleet,
        is_search=my_worker_group.is_search,
    )
    print(f"✅ Worker Group created: {my_worker_group.id}")

    # Create Syslog Source
    cribl.sources.create(request=syslog_source, server_url=group_url)
    print(f"✅ Syslog source created: {syslog_source.id}")

    # Create S3 Destination
    cribl.destinations.create(request=s3_destination, server_url=group_url)
    print(f"✅ S3 Destination created: {s3_destination.id}")

    # Create Pipeline
    cribl.pipelines.create(id=pipeline.id, conf=pipeline.conf, server_url=group_url)
    print(f"✅ Pipeline created: {pipeline.id}")

    # Add Route to Routing table
    routes_list_response = cribl.routes.list(server_url=group_url)
    if not routes_list_response.items or len(routes_list_response.items) == 0:
        raise Exception("No Routes found")

    routes = routes_list_response.items[0]
    if not routes or not routes.id:
        raise Exception("No Routes found")

    routes.routes = [route] + (routes.routes or [])
    cribl.routes.update(
        id_param=routes.id, id=routes.id, routes=routes.routes, server_url=group_url
    )
    print(f"✅ Route added: {route.id}")

    # Commit configuration changes
    commit_response = cribl.versions.commits.create(
        group_id=my_worker_group.id,
        message="Commit for Cribl Stream example",
        effective=True,
        files=["."]
    )
    
    if not commit_response.items or len(commit_response.items) == 0:
        raise Exception("Failed to commit configuration changes")
    
    version = commit_response.items[0].commit
    print(f"✅ Committed configuration changes to the group: {my_worker_group.id}, commit ID: {version}")

    # Deploy configuration changes
    cribl.groups.deploy(
        product=ProductsCore.STREAM,
        id=my_worker_group.id,
        version=version
    )
    print(f"✅ Worker Group changes deployed: {my_worker_group.id}")


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except Exception as error:
        print(f"❌ Something went wrong: {error}")

Configure Resources with the Cribl API

The example requests in this section demonstrate how to use the Cribl API to create the following resources in Cribl Stream:

  • A Syslog Source to receive data on port 9021.
  • An S3 Destination to store processed data.
  • A Pipeline that filters events and keeps only data in the eventSource and eventID fields.
  • A Route that connects the Source, Pipeline, and Destination.

The example also deploys the resource configurations to a Worker Group to make them active.

The examples use the Worker Group created in Configure Worker Groups to manage the resource configuration.

Create a Source

This example creates a Syslog Source to receive data on port 9021.

API (Cribl.Cloud)API (Customer-Managed)
curl --request POST \
--url 'https://${workspaceName}-${organizationId}.cribl.cloud/api/v1/m/my-worker-group/system/inputs' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
  "id": "in_syslog_9021",
  "type": "syslog",
  "disabled": true,
  "tcpPort": 9021,
  "host": "192.168.1.100"
}'
curl --request POST \
--url 'https://${hostname}:${port}/api/v1/m/my-worker-group/system/inputs' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
  "id": "in_syslog_9021",
  "type": "syslog",
  "disabled": true,
  "tcpPort": 9021,
  "host": "192.168.1.100"
}'

Create a Destination

This example creates an S3 Destination to store processed data. Replace placeholder values like your-aws-api-key before you run the example.

API (Cribl.Cloud)API (Customer-Managed)
curl --request POST \
--url 'https://${workspaceName}-${organizationId}.cribl.cloud/api/v1/m/my-worker-group/system/outputs' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
  "id": "out_s3",
  "type": "s3",
  "awsAuthenticationMethod": "manual",
  "awsApiKey": "your-aws-api-key",
  "awsSecretKey": "your-aws-secret-key",
  "region": "us-east-2",
  "bucket": "your-aws-bucket-name",
  "compress": "gzip",
  "compressionLevel": "best_speed",
  "stagePath": "$CRIBL_HOME/state/outputs/staging",
  "emptyDirCleanupSec": 300
}'
curl --request POST \
--url 'https://${hostname}:${port}/api/v1/m/my-worker-group/system/outputs' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
  "id": "out_s3",
  "type": "s3",
  "awsAuthenticationMethod": "manual",
  "awsApiKey": "your-aws-api-key",
  "awsSecretKey": "your-aws-secret-key",
  "region": "us-east-2",
  "bucket": "your-aws-bucket-name",
  "compress": "gzip",
  "compressionLevel": "best_speed",
  "stagePath": "$CRIBL_HOME/state/outputs/staging",
  "emptyDirCleanupSec": 300
}'

Create a Pipeline

This example creates a Pipeline that filters events and keeps only data in the eventSource and eventID fields.

API (Cribl.Cloud)API (Customer-Managed)
curl --request POST \
--url 'https://${workspaceName}-${organizationId}.cribl.cloud/api/v1/m/my-worker-group/pipelines' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
  "id": "my_pipeline",
  "conf": {
    "asyncFuncTimeout": 1000,
    "functions": [
      {
        "filter": "true",
        "conf": {
          "remove": [
            "*"
          ],
          "keep": [
            "eventSource",
            "eventID"
          ]
        },
        "id": "eval",
        "final": true
      }
    ]
  }
}'
curl --request POST \
--url 'https://${hostname}:${port}/api/v1/m/my-worker-group/pipelines' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
  "id": "my_pipeline",
  "conf": {
    "asyncFuncTimeout": 1000,
    "functions": [
      {
        "filter": "true",
        "conf": {
          "remove": [
            "*"
          ],
          "keep": [
            "eventSource",
            "eventID"
          ]
        },
        "id": "eval",
        "final": true
      }
    ]
  }
}'

Create a Route

This example creates a Route that connects the Syslog Source, Pipeline, and S3 Destination and adds it to the end of the default Routing table. The Routing table already includes a default Route, so this request changes the default Route’s setting for final to false so that it won’t block the new Route.

The PATCH /routes/default endpoint requires a complete representation of the Routing table and its existing Routes in the request body. This endpoint does not support partial updates. Cribl removes any omitted fields when updating the Routing table. Use GET /routes/default to retrieve the existing Routing table to use in the body of the PATCH request.

API (Cribl.Cloud)API (Customer-Managed)
curl --request PATCH \
--url 'https://${workspaceName}-${organizationId}.cribl.cloud/api/v1/m/my-worker-group/routes/default' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
  "id": "default",
  "routes": [
    {
      "id": "0abcd9",
      "name": "default",
      "final": false,
      "disabled": false,
      "pipeline": "main",
      "enableOutputExpression": false,
      "filter": "true",
      "output": "default"
    },
    {
      "name": "my_route",
      "final": true,
      "disabled": false,
      "pipeline": "my_pipeline",
      "enableOutputExpression": false,
      "filter": "__inputId == 'syslog:in_syslog_9021:tcp'",
      "output": "out_s3",
      "description": "This is my new Route"
    }
  ]
}'
curl --request PATCH \
--url 'https://${hostname}:${port}/api/v1/m/my-worker-group/routes/default' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
  "id": "default",
  "routes": [
    {
      "id": "0abcd9",
      "name": "default",
      "final": false,
      "disabled": false,
      "pipeline": "main",
      "enableOutputExpression": false,
      "filter": "true",
      "output": "default"
    },
    {
      "name": "my_route",
      "final": true,
      "disabled": false,
      "pipeline": "my_pipeline",
      "enableOutputExpression": false,
      "filter": "__inputId == 'syslog:in_syslog_9021:tcp'",
      "output": "out_s3",
      "description": "This is my new Route"
    }
  ]
}'

Commit and Deploy the Resource Configurations

This example demonstrates how to commit and deploy the resource configurations to your Worker Group, then commit to the Leader to keep it in sync with the Worker Group.

Committing and deploying the Worker Group configuration requires three requests: commit to the Worker Group, deploy to the Worker Group, and commit the the Leader to keep it in sync.

First, commit the pending resource configurations to the Worker Group:

API (Cribl.Cloud)API (Customer-Managed)
curl --request POST \
--url 'https://${workspaceName}-${organizationId}.cribl.cloud/api/v1/version/commit?groupId=my-worker-group' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
  "message": "Commit resource configurations to my-worker-group"
}'
curl --request POST \
--url 'https://${hostname}-${port}/api/v1/version/commit?groupId=my-worker-group' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
  "message": "Commit resource configurations to my-worker-group"
}'

Next, deploy the committed changes to the Worker Group. This request includes the version body parameter, which uses the value of commit from the response body for the commit request:

API (Cribl.Cloud)API (Customer-Managed)
curl --request PATCH \
--url 'https://${workspaceName}-${organizationId}.cribl.cloud/api/v1/products/stream/groups/my-worker-group/deploy' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
  "version": 1234abcd5678efgh9012ijkl3456mnop7EXAMPLE
}'
curl --request PATCH \
--url 'https://${hostname}-${port}/api/v1/products/stream/groups/my-worker-group/deploy' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
  "version": 1234abcd5678efgh9012ijkl3456mnop7EXAMPLE
}'

Finally, commit the changes to the Leader to keep the Leader in sync with the Worker Group:

API (Cribl.Cloud)API (Customer-Managed)
curl --request POST \
--url 'https://${workspaceName}-${organizationId}.cribl.cloud/api/v1/version/commit' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
  "message": "Sync my-worker-group resource configurations with Leader"
}
curl --request POST \
--url 'https://${hostname}-${port}/api/v1/version/commit' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
  "message": "Sync my-worker-group resource configurations with Leader"
}