On This Page

Home / Cribl as Code/ Cribl SDKs (Preview)/ SDK Code Examples/Configure Resources with the Cribl SDK

Configure Resources with the Cribl SDK

Preview Feature

The Cribl SDKs are Preview features that are still being developed. We do not recommend using them in a production environment, because the features might not be fully tested or optimized for performance, and related documentation could be incomplete.

Please continue to submit feedback through normal Cribl support channels, but assistance might be limited while the features remain in Preview.

This code example demonstrates how to use the Python SDK for the control plane to create the following resources in Cribl Stream:

  • A Worker Group to manage the configuration.
  • A Syslog Source to receive data on port 9021.
  • An S3 Destination to store processed data.
  • A Pipeline that filters events and keeps only data in the eventSource and eventID fields.
  • A Route that connects the Source, Pipeline, and Destination.

This example also deploys the resource configurations to a Worker Group to make them active.

About the Code Examples

The code examples use Bearer token authentication. Read the SDK authentication documentation to learn how to configure authentication. The Permissions granted to your Bearer token must include creating and managing resources.

Replace the variables in the examples with the corresponding information for your Cribl deployment.

For on-prem deployments, to use https in the URLs, you must configure Transport Layer Security (TLS).

The resource configurations in the examples do not include all available body parameters. For a complete list of body parameters for each resource, refer to the endpoint documentation in the API Reference.

Python SDK (Cribl.Cloud)Python SDK (On-Prem Deployment)
"""
Replace the placeholder values for ORG_ID, CLIENT_ID, CLIENT_SECRET,
and WORKSPACE_NAME with your Organization ID, Client ID and Secret, and
Workspace name. To get your CLIENT_ID and CLIENT_SECRET values, follow the steps
at https://docs.cribl.io/cribl-as-code/sdks-auth/#sdks-auth-cloud.

Your Client ID and Secret are sensitive information and should be kept private.

NOTE: This example is for Cribl.Cloud deployments only.

Prerequisites:
- Your AWS S3 values for AWS_API_KEY, AWS_SECRET_KEY, AWS_BUCKET_NAME, and
AWS_REGION.
- An Enterprise License on the server.
"""

import asyncio

from cribl_control_plane import CriblControlPlane

from cribl_control_plane.models import (
    ProductsCore,
    CreateInputInputSyslogSyslog2,
    CreateInputInputSyslogType2,
    CreateOutputOutputS3,
    CreateOutputTypeS3,
    CompressionOptions2,
    CompressionLevelOptions,
    Pipeline,
    RouteConf,
    RouteConfInput,
    PipelineConf,
    ConfInput,
    PipelineFunctionEval,
    PipelineFunctionEvalID,
    PipelineFunctionConf,
    FunctionConfSchemaEval,
    TLSSettingsServerSideType,
    Security,
    SchemeClientOauth,
    ConfigGroupCloud,
    CloudProvider,
    EstimatedIngestRateOptionsConfigGroup
)
from typing import List, cast

ORG_ID = "your-org-id"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"
WORKSPACE_NAME = "your-workspace-name"
WORKER_GROUP_ID = "my-group"

base_url = f"https://{WORKSPACE_NAME}-{ORG_ID}.cribl.cloud/api/v1"
group_url = f"{base_url}/m/{WORKER_GROUP_ID}"

# Syslog Source configuration
SYSLOG_PORT = 9021

# S3 Destination configuration: Replace the placeholder values
AWS_API_KEY = "your-aws-api-key"  # Replace with your AWS Access Key ID
AWS_SECRET_KEY = "your-aws-secret-key"  # Replace with your AWS Secret Access Key
AWS_BUCKET_NAME = "your-aws-bucket-name"  # Replace with your S3 bucket name
AWS_REGION = "us-east-2"  # Replace with your S3 bucket region

# Syslog Source configuration
syslog_source = CreateInputInputSyslogSyslog2(
    id="in-syslog-9021",
    type=CreateInputInputSyslogType2.SYSLOG,
    host="0.0.0.0",
    tcp_port=SYSLOG_PORT,
    tls=TLSSettingsServerSideType(disabled=True),
)

# S3 Destination configuration
s3_destination = CreateOutputOutputS3(
    id="out_s3",
    type=CreateOutputTypeS3.S3,
    bucket=AWS_BUCKET_NAME,
    stage_path="/tmp/cribl_stage",
    region=AWS_REGION,
    aws_secret_key=AWS_SECRET_KEY,
    aws_api_key=AWS_API_KEY,
    compress=CompressionOptions2.GZIP,
    compression_level=CompressionLevelOptions.BEST_SPEED,
    empty_dir_cleanup_sec=300,
)

# Pipeline configuration: filter events and keep only data in the "eventSource" and "eventID" fields
pipeline = Pipeline(
    id="my_pipeline",
    conf=PipelineConf(
        async_func_timeout=1000,
        functions=cast(
            List[PipelineFunctionConf],
            [
                PipelineFunctionEval(
                    filter_="true",
                    conf=FunctionConfSchemaEval(
                        remove=["*"],
                        keep=["eventSource", "eventID"],
                    ),
                    id=PipelineFunctionEvalID.EVAL,
                    final=True,
                )
            ],
        ),
    ),
)

# Route configuration: route data from the Source to the Pipeline and Destination
route = RouteConf(
    final=False,
    id="my_route",
    name="my_route",
    pipeline=pipeline.id,
    output=s3_destination.id,
    filter_=f"__inputId=='{syslog_source.id}'",
    description="This is my new Route",
)

async def main():
    # Create authenticated SDK client
    client_oauth = SchemeClientOauth(
      client_id=CLIENT_ID,
      client_secret=CLIENT_SECRET,
      token_url="https://login.cribl.cloud/oauth/token",
      audience="https://api.cribl.cloud",
    )

    security = Security(client_oauth=client_oauth)
    cribl = CriblControlPlane(server_url=base_url, security=security)

    # Verify that Worker Group doesn't already exist
    worker_group_response = cribl.groups.get(id=WORKER_GROUP_ID, product=ProductsCore.STREAM)
    if worker_group_response.items and len(worker_group_response.items) > 0:
        print(
            f"❌ Worker Group already exists: {WORKER_GROUP_ID}. Try a different Worker Group ID."
        )
        return

    # Create Worker Group
    cribl.groups.create(
        product=ProductsCore.STREAM,
        id=WORKER_GROUP_ID,
        on_prem=False,
        worker_remote_access=True,
        is_fleet=False,
        is_search=False,
        name=WORKER_GROUP_ID,
        estimated_ingest_rate=EstimatedIngestRateOptionsConfigGroup.RATE12_MB_PER_SEC,
        cloud=ConfigGroupCloud(
            provider=CloudProvider.AWS,
            region="us-east-1"
        ),
    )
    print(f"✅ Worker Group created: {WORKER_GROUP_ID}")

    # Create Syslog Source
    cribl.sources.create(request=syslog_source, server_url=group_url)
    print(f"✅ Syslog source created: {syslog_source.id}")

    # Create S3 Destination
    cribl.destinations.create(request=s3_destination, server_url=group_url)
    print(f"✅ S3 Destination created: {s3_destination.id}")

    # Create Pipeline
    cribl.pipelines.create(id=pipeline.id, conf=ConfInput.model_validate(pipeline.conf.model_dump()), server_url=group_url)
    print(f"✅ Pipeline created: {pipeline.id}")

    # Add Route to Routing table
    routes_list_response = cribl.routes.list(server_url=group_url)
    if not routes_list_response.items or len(routes_list_response.items) == 0:
        raise Exception("No Routes found")

    routes = routes_list_response.items[0]
    if not routes or not routes.id:
        raise Exception("No Routes found")

    routes.routes = [route] + (routes.routes or [])
    routes_input = [
        RouteConfInput.model_validate(r.model_dump()) for r in routes.routes
    ]
    cribl.routes.update(
        id_param=routes.id, id=routes.id, routes=routes_input, server_url=group_url
    )
    print(f"✅ Route added: {route.id}")

    # Commit configuration changes
    commit_response = cribl.versions.commits.create(
        message="Commit for Cribl Stream example",
        effective=True,
        files=["."],
        server_url=group_url
    )
    
    if not commit_response.items or len(commit_response.items) == 0:
        raise Exception("Failed to commit configuration changes")
    
    version = commit_response.items[0].commit
    print(f"✅ Committed configuration changes to the group: {WORKER_GROUP_ID}, commit ID: {version}")

    # Deploy configuration changes
    cribl.groups.deploy(
        product=ProductsCore.STREAM,
        id=WORKER_GROUP_ID,
        version=version
    )
    print(f"✅ Worker Group changes deployed: {WORKER_GROUP_ID}")

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except Exception as error:
        print(f"❌ Something went wrong: {error}")

"""
Replace the placeholder values for ONPREM_SERVER_URL, ONPREM_USERNAME, and
ONPREM_PASSWORD with your server URL and credentials. Your credentials are
sensitive information and should be kept private.

NOTE: This example is for on-prem deployments only.

Prerequisites:
- Your AWS S3 values for AWS_API_KEY, AWS_SECRET_KEY, AWS_BUCKET_NAME, and
AWS_REGION.
- An Enterprise License on the server.
"""

import asyncio

from cribl_control_plane import CriblControlPlane

from cribl_control_plane.models import (
    Security,
    ProductsCore,
    CreateInputInputSyslogSyslog2,
    CreateInputInputSyslogType2,
    CreateOutputOutputS3,
    CreateOutputTypeS3,
    CompressionOptions2,
    CompressionLevelOptions,
    Pipeline,
    RouteConf,
    RouteConfInput,
    PipelineConf,
    ConfInput,
    PipelineFunctionEval,
    PipelineFunctionEvalID,
    PipelineFunctionConf,
    FunctionConfSchemaEval,
    TLSSettingsServerSideType,
)
from typing import List, cast

ONPREM_SERVER_URL = "http://localhost:9000"  # Replace with your server URL
ONPREM_USERNAME = "admin"  # Replace with your username
ONPREM_PASSWORD = "admin"  # Replace with your password
WORKER_GROUP_ID = "your-worker-group-id"

# Syslog Source configuration
SYSLOG_PORT = 9021

# S3 Destination configuration: Replace the placeholder values
AWS_API_KEY = "your-aws-api-key"  # Replace with your AWS Access Key ID
AWS_SECRET_KEY = "your-aws-secret-key"  # Replace with your AWS Secret Access Key
AWS_BUCKET_NAME = "your-aws-bucket-name"  # Replace with your S3 bucket name
AWS_REGION = "us-east-2"  # Replace with your S3 bucket region

base_url = f"{ONPREM_SERVER_URL}/api/v1"
group_url = f"{base_url}/m/{WORKER_GROUP_ID}"

# Syslog Source configuration
syslog_source = CreateInputInputSyslogSyslog2(
    id="in-syslog-9021",
    type=CreateInputInputSyslogType2.SYSLOG,
    host="0.0.0.0",
    tcp_port=SYSLOG_PORT,
    tls=TLSSettingsServerSideType(disabled=True),
)

# S3 Destination configuration
s3_destination = CreateOutputOutputS3(
    id="out_s3",
    type=CreateOutputTypeS3.S3,
    bucket=AWS_BUCKET_NAME,
    stage_path="/tmp/cribl_stage",
    region=AWS_REGION,
    aws_secret_key=AWS_SECRET_KEY,
    aws_api_key=AWS_API_KEY,
    compress=CompressionOptions2.GZIP,
    compression_level=CompressionLevelOptions.BEST_SPEED,
    empty_dir_cleanup_sec=300,
)

# Pipeline configuration: filter events and keep only data in the "eventSource" and "eventID" fields
pipeline = Pipeline(
    id="my_pipeline",
    conf=PipelineConf(
        async_func_timeout=1000,
        functions=cast(
            List[PipelineFunctionConf],
            [
                PipelineFunctionEval(
                    filter_="true",
                    conf=FunctionConfSchemaEval(
                        remove=["*"],
                        keep=["eventSource", "eventID"],
                    ),
                    id=PipelineFunctionEvalID.EVAL,
                    final=True,
                )
            ],
        ),
    ),
)

# Route configuration: route data from the Source to the Pipeline and Destination
route = RouteConf(
    final=False,
    id="my_route",
    name="my_route",
    pipeline=pipeline.id,
    output=s3_destination.id,
    filter_=f"__inputId=='{syslog_source.id}'",
    description="This is my new Route",
)

async def main():
    # Initialize Cribl client
    cribl = CriblControlPlane(server_url=base_url)

    response = await cribl.auth.tokens.get_async(
        username=ONPREM_USERNAME, password=ONPREM_PASSWORD
    )

    token = response.result.token
    security = Security(bearer_auth=token)
    cribl = CriblControlPlane(server_url=base_url, security=security)

    # Verify that Worker Group doesn't already exist
    worker_group_response = cribl.groups.get(id=WORKER_GROUP_ID, product=ProductsCore.STREAM)
    if worker_group_response.items and len(worker_group_response.items) > 0:
        print(
            f"❌ Worker Group already exists: {WORKER_GROUP_ID}. Try a different Worker Group ID."
        )
        return

    # Create Worker Group
    cribl.groups.create(
        product=ProductsCore.STREAM,
        id=WORKER_GROUP_ID,
        on_prem=True,
        worker_remote_access=True,
        is_fleet=False,
        is_search=False,
    )
    print(f"✅ Worker Group created: {WORKER_GROUP_ID}")

    # Create Syslog Source
    cribl.sources.create(request=syslog_source, server_url=group_url)
    print(f"✅ Syslog source created: {syslog_source.id}")

    # Create S3 Destination
    cribl.destinations.create(request=s3_destination, server_url=group_url)
    print(f"✅ S3 Destination created: {s3_destination.id}")

    # Create Pipeline
    cribl.pipelines.create(id=pipeline.id, conf=ConfInput.model_validate(pipeline.conf.model_dump()), server_url=group_url)
    print(f"✅ Pipeline created: {pipeline.id}")

    # Add Route to Routing table
    routes_list_response = cribl.routes.list(server_url=group_url)
    if not routes_list_response.items or len(routes_list_response.items) == 0:
        raise Exception("No Routes found")

    routes = routes_list_response.items[0]
    if not routes or not routes.id:
        raise Exception("No Routes found")

    routes.routes = [route] + (routes.routes or [])
    routes_input = [
        RouteConfInput.model_validate(r.model_dump()) for r in routes.routes
    ]
    cribl.routes.update(
        id_param=routes.id, id=routes.id, routes=routes_input, server_url=group_url
    )
    print(f"✅ Route added: {route.id}")

    # Commit configuration changes
    commit_response = cribl.versions.commits.create(
        message="Commit for Cribl Stream example",
        effective=True,
        files=["."],
        server_url=group_url
    )
    
    if not commit_response.items or len(commit_response.items) == 0:
        raise Exception("Failed to commit configuration changes")
    
    version = commit_response.items[0].commit
    print(f"✅ Committed configuration changes to the group: {WORKER_GROUP_ID}, commit ID: {version}")

    # Deploy configuration changes
    cribl.groups.deploy(
        product=ProductsCore.STREAM,
        id=WORKER_GROUP_ID,
        version=version
    )
    print(f"✅ Worker Group changes deployed: {WORKER_GROUP_ID}")

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except Exception as error:
        print(f"❌ Something went wrong: {error}")