Configure Resources
Preview Feature
The Cribl SDKs are Preview features that are still being developed. We do not recommend using them in a production environment, because the features might not be fully tested or optimized for performance, and related documentation could be incomplete.
Please continue to submit feedback through normal Cribl support channels, but assistance might be limited while the features remain in Preview.
These code examples demonstrate how to use the Cribl Python SDK for the control plane or the Cribl API to configure a Syslog Source, an S3 Destination, a Pipeline, and a Route in Cribl Stream.
About the Code Examples
The code examples use Bearer token authentication. Read the authentication documentation to learn how to get a Bearer token. The Permissions granted to your Bearer token must include creating and managing resources.
Replace the variables in the examples with the corresponding information for your Cribl deployment.
For customer-managed deployments, to use
https
in the URLs, you must configure Transport Layer Security (TLS).The resource configurations in the examples do not include all available body parameters. For a complete list of body parameters for each resource, refer to the endpoint documentation in the API Reference.
Configure Resources with the Python SDK
This example demonstrates how to use the Python SDK for the control plane to create the following resources in Cribl Stream:
- A Worker Group to manage the configuration.
- A Syslog Source to receive data on port 9021.
- An S3 Destination to store processed data.
- A Pipeline that filters events and keeps only data in the
eventSource
andeventID
fields. - A Route that connects the Source, Pipeline, and Destination.
This example also deploys the resource configurations to a Worker Group to make them active.
"""
Replace the placeholder values for ORG_ID, CLIENT_ID, CLIENT_SECRET,
and WORKSPACE_NAME with your Organization ID, Client ID and Secret, and
Workspace name. To get your CLIENT_ID and CLIENT_SECRET values, follow
the steps at https://docs.cribl.io/cribl-as-code/authentication/#cloud-auth.
Your Client ID and Secret are sensitive information and should be kept private.
NOTE: This example is for Cribl.Cloud deployments only.
Prerequisites:
- Your AWS S3 values for AWS_API_KEY, AWS_SECRET_KEY, AWS_BUCKET_NAME, and
AWS_REGION.
- An Enterprise License on the server.
"""
import asyncio
from cribl_control_plane import CriblControlPlane
from cribl_control_plane.models import (
ConfigGroup,
ProductsCore,
InputSyslogSyslog2,
InputSyslogType2,
OutputS3,
OutputS3Type,
OutputS3Compression,
OutputS3CompressionLevel,
Pipeline,
RoutesRoute,
Conf,
PipelineFunctionConf,
FunctionSpecificConfigs,
InputSyslogTLSSettingsServerSide2,
Security,
SchemeClientOauth
)
ORG_ID = "your-org-id"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"
WORKSPACE_NAME = "your-workspace-name"
WORKER_GROUP_ID = "my-group"
base_url = f"https://{WORKSPACE_NAME}-{ORG_ID}.cribl.cloud/api/v1"
group_url = f"{base_url}/m/{WORKER_GROUP_ID}"
# Syslog Source configuration
SYSLOG_PORT = 9021
# S3 Destination configuration: Replace the placeholder values
AWS_API_KEY = "your-aws-api-key" # Replace with your AWS Access Key ID
AWS_SECRET_KEY = "your-aws-secret-key" # Replace with your AWS Secret Access Key
AWS_BUCKET_NAME = "your-aws-bucket-name" # Replace with your S3 bucket name
AWS_REGION = "us-east-2" # Replace with your S3 bucket region
# Worker Group configuration
my_worker_group = ConfigGroup(
on_prem=True,
worker_remote_access=True,
is_fleet=False,
is_search=False,
id=WORKER_GROUP_ID,
)
# Syslog Source configuration
syslog_source = InputSyslogSyslog2(
id="in-syslog-9021",
type=InputSyslogType2.SYSLOG,
tcp_port=SYSLOG_PORT,
tls=InputSyslogTLSSettingsServerSide2(disabled=True),
)
# S3 Destination configuration
s3_destination = OutputS3(
id="out_s3",
type=OutputS3Type.S3,
bucket=AWS_BUCKET_NAME,
region=AWS_REGION,
aws_secret_key=AWS_SECRET_KEY,
aws_api_key=AWS_API_KEY,
compress=OutputS3Compression.GZIP,
compression_level=OutputS3CompressionLevel.BEST_SPEED,
empty_dir_cleanup_sec=300,
)
# Pipeline configuration: filter events and keep only data in the "eventSource" and "eventID" fields
pipeline = Pipeline(
id="my_pipeline",
conf=Conf(
async_func_timeout=1000,
functions=[
PipelineFunctionConf(
filter_="true",
conf=FunctionSpecificConfigs.model_validate(
{ # type: ignore
"remove": ["*"],
"keep": ["eventSource", "eventID"],
}
),
id="eval",
final=True,
)
],
),
)
# Route configuration: route data from the Source to the Pipeline and Destination
route = RoutesRoute(
final=False,
id="my_route",
name="my_route",
pipeline=pipeline.id,
output=s3_destination.id,
filter_=f"__inputId=='{syslog_source.id}'",
description="This is my new Route",
)
async def main():
# Create authenticated SDK client
client_oauth = SchemeClientOauth(
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
token_url="https://login.cribl.cloud/oauth/token",
audience="https://api.cribl.cloud",
)
security = Security(client_oauth=client_oauth)
cribl = CriblControlPlane(server_url=base_url, security=security)
# Verify that Worker Group doesn't already exist
worker_group_response = cribl.groups.get(id=my_worker_group.id, product=ProductsCore.STREAM)
if worker_group_response.items and len(worker_group_response.items) > 0:
print(
f"❌ Worker Group already exists: {my_worker_group.id}. Try a different Worker Group ID."
)
return
# Create Worker Group
cribl.groups.create(
product=ProductsCore.STREAM,
id=my_worker_group.id,
on_prem=my_worker_group.on_prem,
worker_remote_access=my_worker_group.worker_remote_access,
is_fleet=my_worker_group.is_fleet,
is_search=my_worker_group.is_search,
)
print(f"✅ Worker Group created: {my_worker_group.id}")
# Create Syslog Source
cribl.sources.create(request=syslog_source, server_url=group_url)
print(f"✅ Syslog source created: {syslog_source.id}")
# Create S3 Destination
cribl.destinations.create(request=s3_destination, server_url=group_url)
print(f"✅ S3 Destination created: {s3_destination.id}")
# Create Pipeline
cribl.pipelines.create(id=pipeline.id, conf=pipeline.conf, server_url=group_url)
print(f"✅ Pipeline created: {pipeline.id}")
# Add Route to Routing table
routes_list_response = cribl.routes.list(server_url=group_url)
if not routes_list_response.items or len(routes_list_response.items) == 0:
raise Exception("No Routes found")
routes = routes_list_response.items[0]
if not routes or not routes.id:
raise Exception("No Routes found")
routes.routes = [route] + (routes.routes or [])
cribl.routes.update(
id_param=routes.id, id=routes.id, routes=routes.routes, server_url=group_url
)
print(f"✅ Route added: {route.id}")
# Commit configuration changes
commit_response = cribl.versions.commits.create(
group_id=my_worker_group.id,
message="Commit for Cribl Stream example",
effective=True,
files=["."]
)
if not commit_response.items or len(commit_response.items) == 0:
raise Exception("Failed to commit configuration changes")
version = commit_response.items[0].commit
print(f"✅ Committed configuration changes to the group: {my_worker_group.id}, commit ID: {version}")
# Deploy configuration changes
cribl.groups.deploy(
product=ProductsCore.STREAM,
id=my_worker_group.id,
version=version
)
print(f"✅ Worker Group changes deployed: {my_worker_group.id}")
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception as error:
print(f"❌ Something went wrong: {error}")
"""
Replace the placeholder values for ONPREM_SERVER_URL, ONPREM_USERNAME, and
ONPREM_PASSWORD with your server URL and credentials. Your credentials are
sensitive information and should be kept private.
NOTE: This example is for customer-managed deployments only.
Prerequisites:
- Your AWS S3 values for AWS_API_KEY, AWS_SECRET_KEY, AWS_BUCKET_NAME, and
AWS_REGION.
- An Enterprise License on the server.
"""
import asyncio
from cribl_control_plane import CriblControlPlane
from cribl_control_plane.models import (
Security,
ConfigGroup,
ProductsCore,
InputSyslogSyslog2,
InputSyslogType2,
OutputS3,
OutputS3Type,
OutputS3Compression,
OutputS3CompressionLevel,
Pipeline,
RoutesRoute,
Conf,
PipelineFunctionConf,
FunctionSpecificConfigs,
InputSyslogTLSSettingsServerSide2,
)
ONPREM_SERVER_URL = "http://localhost:9000" # Replace with your server URL
ONPREM_USERNAME = "admin" # Replace with your username
ONPREM_PASSWORD = "admin" # Replace with your password
WORKER_GROUP_ID = "your-worker-group-id"
# Syslog Source configuration
SYSLOG_PORT = 9021
# S3 Destination configuration: Replace the placeholder values
AWS_API_KEY = "your-aws-api-key" # Replace with your AWS Access Key ID
AWS_SECRET_KEY = "your-aws-secret-key" # Replace with your AWS Secret Access Key
AWS_BUCKET_NAME = "your-aws-bucket-name" # Replace with your S3 bucket name
AWS_REGION = "us-east-2" # Replace with your S3 bucket region
base_url = f"{ONPREM_SERVER_URL}/api/v1"
# Worker Group configuration
my_worker_group = ConfigGroup(
on_prem=True,
worker_remote_access=True,
is_fleet=False,
is_search=False,
id=WORKER_GROUP_ID,
)
# Syslog Source configuration
syslog_source = InputSyslogSyslog2(
id="in-syslog-9021",
type=InputSyslogType2.SYSLOG,
tcp_port=SYSLOG_PORT,
tls=InputSyslogTLSSettingsServerSide2(disabled=True),
)
# S3 Destination configuration
s3_destination = OutputS3(
id="out_s3",
type=OutputS3Type.S3,
bucket=AWS_BUCKET_NAME,
region=AWS_REGION,
aws_secret_key=AWS_SECRET_KEY,
aws_api_key=AWS_API_KEY,
compress=OutputS3Compression.GZIP,
compression_level=OutputS3CompressionLevel.BEST_SPEED,
empty_dir_cleanup_sec=300,
)
# Pipeline configuration: filter events and keep only data in the "eventSource" and "eventID" fields
pipeline = Pipeline(
id="my_pipeline",
conf=Conf(
async_func_timeout=1000,
functions=[
PipelineFunctionConf(
filter_="true",
conf=FunctionSpecificConfigs.model_validate(
{ # type: ignore
"remove": ["*"],
"keep": ["eventSource", "eventID"],
}
),
id="eval",
final=True,
)
],
),
)
# Route configuration: route data from the Source to the Pipeline and Destination
route = RoutesRoute(
final=False,
id="my_route",
name="my_route",
pipeline=pipeline.id,
output=s3_destination.id,
filter_=f"__inputId=='{syslog_source.id}'",
description="This is my new Route",
)
group_url = f"{base_url}/m/{my_worker_group.id}"
async def main():
# Initialize Cribl client
cribl = CriblControlPlane(server_url=base_url)
response = await cribl.auth.tokens.get_async(
username=ONPREM_USERNAME, password=ONPREM_PASSWORD
)
token = response.token
security = Security(bearer_auth=token)
cribl = CriblControlPlane(server_url=base_url, security=security)
# Verify that Worker Group doesn't already exist
worker_group_response = cribl.groups.get(id=my_worker_group.id, product=ProductsCore.STREAM)
if worker_group_response.items and len(worker_group_response.items) > 0:
print(
f"❌ Worker Group already exists: {my_worker_group.id}. Try a different Worker Group ID."
)
return
# Create Worker Group
cribl.groups.create(
product=ProductsCore.STREAM,
id=my_worker_group.id,
on_prem=my_worker_group.on_prem,
worker_remote_access=my_worker_group.worker_remote_access,
is_fleet=my_worker_group.is_fleet,
is_search=my_worker_group.is_search,
)
print(f"✅ Worker Group created: {my_worker_group.id}")
# Create Syslog Source
cribl.sources.create(request=syslog_source, server_url=group_url)
print(f"✅ Syslog source created: {syslog_source.id}")
# Create S3 Destination
cribl.destinations.create(request=s3_destination, server_url=group_url)
print(f"✅ S3 Destination created: {s3_destination.id}")
# Create Pipeline
cribl.pipelines.create(id=pipeline.id, conf=pipeline.conf, server_url=group_url)
print(f"✅ Pipeline created: {pipeline.id}")
# Add Route to Routing table
routes_list_response = cribl.routes.list(server_url=group_url)
if not routes_list_response.items or len(routes_list_response.items) == 0:
raise Exception("No Routes found")
routes = routes_list_response.items[0]
if not routes or not routes.id:
raise Exception("No Routes found")
routes.routes = [route] + (routes.routes or [])
cribl.routes.update(
id_param=routes.id, id=routes.id, routes=routes.routes, server_url=group_url
)
print(f"✅ Route added: {route.id}")
# Commit configuration changes
commit_response = cribl.versions.commits.create(
group_id=my_worker_group.id,
message="Commit for Cribl Stream example",
effective=True,
files=["."]
)
if not commit_response.items or len(commit_response.items) == 0:
raise Exception("Failed to commit configuration changes")
version = commit_response.items[0].commit
print(f"✅ Committed configuration changes to the group: {my_worker_group.id}, commit ID: {version}")
# Deploy configuration changes
cribl.groups.deploy(
product=ProductsCore.STREAM,
id=my_worker_group.id,
version=version
)
print(f"✅ Worker Group changes deployed: {my_worker_group.id}")
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception as error:
print(f"❌ Something went wrong: {error}")
Configure Resources with the Cribl API
The example requests in this section demonstrate how to use the Cribl API to create the following resources in Cribl Stream:
- A Syslog Source to receive data on port 9021.
- An S3 Destination to store processed data.
- A Pipeline that filters events and keeps only data in the
eventSource
andeventID
fields. - A Route that connects the Source, Pipeline, and Destination.
The example also deploys the resource configurations to a Worker Group to make them active.
The examples use the Worker Group created in Configure Worker Groups to manage the resource configuration.
Create a Source
This example creates a Syslog Source to receive data on port 9021.
curl --request POST \
--url 'https://${workspaceName}-${organizationId}.cribl.cloud/api/v1/m/my-worker-group/system/inputs' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
"id": "in_syslog_9021",
"type": "syslog",
"disabled": true,
"tcpPort": 9021,
"host": "192.168.1.100"
}'
curl --request POST \
--url 'https://${hostname}:${port}/api/v1/m/my-worker-group/system/inputs' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
"id": "in_syslog_9021",
"type": "syslog",
"disabled": true,
"tcpPort": 9021,
"host": "192.168.1.100"
}'
Create a Destination
This example creates an S3 Destination to store processed data. Replace placeholder values like your-aws-api-key
before you run the example.
curl --request POST \
--url 'https://${workspaceName}-${organizationId}.cribl.cloud/api/v1/m/my-worker-group/system/outputs' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
"id": "out_s3",
"type": "s3",
"awsAuthenticationMethod": "manual",
"awsApiKey": "your-aws-api-key",
"awsSecretKey": "your-aws-secret-key",
"region": "us-east-2",
"bucket": "your-aws-bucket-name",
"compress": "gzip",
"compressionLevel": "best_speed",
"stagePath": "$CRIBL_HOME/state/outputs/staging",
"emptyDirCleanupSec": 300
}'
curl --request POST \
--url 'https://${hostname}:${port}/api/v1/m/my-worker-group/system/outputs' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
"id": "out_s3",
"type": "s3",
"awsAuthenticationMethod": "manual",
"awsApiKey": "your-aws-api-key",
"awsSecretKey": "your-aws-secret-key",
"region": "us-east-2",
"bucket": "your-aws-bucket-name",
"compress": "gzip",
"compressionLevel": "best_speed",
"stagePath": "$CRIBL_HOME/state/outputs/staging",
"emptyDirCleanupSec": 300
}'
Create a Pipeline
This example creates a Pipeline that filters events and keeps only data in the eventSource
and eventID
fields.
curl --request POST \
--url 'https://${workspaceName}-${organizationId}.cribl.cloud/api/v1/m/my-worker-group/pipelines' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
"id": "my_pipeline",
"conf": {
"asyncFuncTimeout": 1000,
"functions": [
{
"filter": "true",
"conf": {
"remove": [
"*"
],
"keep": [
"eventSource",
"eventID"
]
},
"id": "eval",
"final": true
}
]
}
}'
curl --request POST \
--url 'https://${hostname}:${port}/api/v1/m/my-worker-group/pipelines' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
"id": "my_pipeline",
"conf": {
"asyncFuncTimeout": 1000,
"functions": [
{
"filter": "true",
"conf": {
"remove": [
"*"
],
"keep": [
"eventSource",
"eventID"
]
},
"id": "eval",
"final": true
}
]
}
}'
Create a Route
This example creates a Route that connects the Syslog Source, Pipeline, and S3 Destination and adds it to the end of the default
Routing table. The Routing table already includes a default
Route, so this request changes the default
Route’s setting for final
to false
so that it won’t block the new Route.
The
PATCH /routes/default
endpoint requires a complete representation of the Routing table and its existing Routes in the request body. This endpoint does not support partial updates. Cribl removes any omitted fields when updating the Routing table. UseGET /routes/default
to retrieve the existing Routing table to use in the body of the PATCH request.
curl --request PATCH \
--url 'https://${workspaceName}-${organizationId}.cribl.cloud/api/v1/m/my-worker-group/routes/default' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
"id": "default",
"routes": [
{
"id": "0abcd9",
"name": "default",
"final": false,
"disabled": false,
"pipeline": "main",
"enableOutputExpression": false,
"filter": "true",
"output": "default"
},
{
"name": "my_route",
"final": true,
"disabled": false,
"pipeline": "my_pipeline",
"enableOutputExpression": false,
"filter": "__inputId == 'syslog:in_syslog_9021:tcp'",
"output": "out_s3",
"description": "This is my new Route"
}
]
}'
curl --request PATCH \
--url 'https://${hostname}:${port}/api/v1/m/my-worker-group/routes/default' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
"id": "default",
"routes": [
{
"id": "0abcd9",
"name": "default",
"final": false,
"disabled": false,
"pipeline": "main",
"enableOutputExpression": false,
"filter": "true",
"output": "default"
},
{
"name": "my_route",
"final": true,
"disabled": false,
"pipeline": "my_pipeline",
"enableOutputExpression": false,
"filter": "__inputId == 'syslog:in_syslog_9021:tcp'",
"output": "out_s3",
"description": "This is my new Route"
}
]
}'
Commit and Deploy the Resource Configurations
This example demonstrates how to commit and deploy the resource configurations to your Worker Group, then commit to the Leader to keep it in sync with the Worker Group.
Committing and deploying the Worker Group configuration requires three requests: commit to the Worker Group, deploy to the Worker Group, and commit the the Leader to keep it in sync.
First, commit the pending resource configurations to the Worker Group:
curl --request POST \
--url 'https://${workspaceName}-${organizationId}.cribl.cloud/api/v1/version/commit?groupId=my-worker-group' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
"message": "Commit resource configurations to my-worker-group"
}'
curl --request POST \
--url 'https://${hostname}-${port}/api/v1/version/commit?groupId=my-worker-group' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
"message": "Commit resource configurations to my-worker-group"
}'
Next, deploy the committed changes to the Worker Group. This request includes the version
body parameter, which uses the value of commit
from the response body for the commit request:
curl --request PATCH \
--url 'https://${workspaceName}-${organizationId}.cribl.cloud/api/v1/products/stream/groups/my-worker-group/deploy' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
"version": 1234abcd5678efgh9012ijkl3456mnop7EXAMPLE
}'
curl --request PATCH \
--url 'https://${hostname}-${port}/api/v1/products/stream/groups/my-worker-group/deploy' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
"version": 1234abcd5678efgh9012ijkl3456mnop7EXAMPLE
}'
Finally, commit the changes to the Leader to keep the Leader in sync with the Worker Group:
curl --request POST \
--url 'https://${workspaceName}-${organizationId}.cribl.cloud/api/v1/version/commit' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
"message": "Sync my-worker-group resource configurations with Leader"
}
curl --request POST \
--url 'https://${hostname}-${port}/api/v1/version/commit' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
"message": "Sync my-worker-group resource configurations with Leader"
}