These docs are for Cribl Api 4.15 and are no longer actively maintained.
See the latest version (4.18).
Configure Worker Groups
Preview Feature
The Cribl SDKs are Preview features that are still being developed. We do not recommend using them in a production environment, because the features might not be fully tested or optimized for performance, and related documentation could be incomplete.
Please continue to submit feedback through normal Cribl support channels, but assistance might be limited while the features remain in Preview.
These code examples demonstrate how to use the Cribl Python SDK for the control plane or the Cribl API to configure, scale, and replicate Worker Groups in Cribl Stream.
About the Code Examples
The code examples use Bearer token authentication. Read the authentication documentation for the API or SDKs to learn how to configure authentication. The Permissions granted to your Bearer token must include creating and managing Worker Groups.
Replace the variables in the examples with the corresponding information for your Cribl deployment.
For on-prem deployments, to use
httpsin the URLs, you must configure Transport Layer Security (TLS).The configurations in the examples do not include all available body parameters. For a complete list of body parameters for each endpoint, refer to the documentation in the API Reference.
Configure Worker Groups with the Python SDK
The examples in this section demonstrate how to create, scale, replicate, and deploy Worker Groups using the Cribl Python SDK for the control plane.
The examples for creating and scaling a Worker Group in Cribl.Cloud include the estimatedIngestRate property, which allows you to configure Worker Groups for optimal performance. For each supported estimatedIngestRate value, the following table maps the corresponding throughput and number of Worker Processes:
estimatedIngestRate | Throughput | Worker Processes |
|---|---|---|
| 1024 | 12 MB/s | 6 |
| 2048 | 24 MB/s | 9 |
| 3072 | 36 MB/s | 14 |
| 4096 | 48 MB/s | 21 |
| 5120 | 60 MB/s | 30 |
| 7168 | 84 MB/s | 45 |
| 10240 | 120 MB/s | 62 |
| 13312 | 156 MB/s | 93 |
| 15360 | 180 MB/s | 186 |
1. Create a Worker Group
This example creates a new Worker Group in Cribl Stream.
In the Cribl.Cloud example, the estimatedIngestRate is set to 2048, which is equivalent to a maximum of 24 MB/s with nine Worker Processes. In the on-prem deployment example, the new Worker Group is created with eight Workers.
"""
Create a Worker Group
- Creates a new Worker Group in Cribl Stream.
- Commits and deploys the Worker Group configuration to make it active.
NOTE: This example is for Cribl.Cloud deployments only.
Required to use this example:
- A Cribl.Cloud Enterprise account.
- A Cribl.Cloud Organization ID and Workspace name, which are used to build the
base URL for the SDK calls.
- The Client ID and Secret for a Cribl.Cloud API Credential, which are used to
authenticate the SDK calls. To get the Client ID and Secret, follow
https://docs.cribl.io/cribl-as-code/sdks-auth/#sdks-auth-cloud. The Client ID
and Secret are sensitive information and should be kept private.
"""
# Import block
# Imports asyncio so that the file can run an asynchronous control plane
# sequence for authentication and creating and deploying a Worker Group.
#
# Imports CriblControlPlane as the API client from the cribl_control_plane
# SDK package.
#
# Imports model classes that provide the Python types used for Cribl Stream
# resource configurations and API payloads in this file from the
# cribl_control_plane.models subpackage.
import asyncio
from cribl_control_plane import CriblControlPlane
from cribl_control_plane.models import (
ConfigGroup,
ConfigGroupCloud,
CloudProvider,
Security,
SchemeClientOauth,
ProductsCore,
EstimatedIngestRateOptionsConfigGroup,
)
# User-supplied parameters block
# Values to use in the URL block, Authentication block, and Worker Group
# definition block. Replace the placeholder values before executing
# this file.
ORG_ID = "your-org-id" # Replace with the Organization ID
WORKSPACE_NAME = "your-workspace-name" # Replace with the Workspace name
WORKER_GROUP_ID = "your-group" # Replace with the ID to use for the new Worker Group
CLIENT_ID = "your-client-id" # Replace with the Client ID for the API Credential
CLIENT_SECRET = "your-client-secret" # Replace with the Client Secret for the API Credential
# URL block
# Builds the base URL and Worker Group-specific URL to use for the
# API requests that this file makes using the ORG_ID, WORKSPACE_NAME,
# and WORKER_GROUP_ID provided in the user-supplied parameters block.
base_url = f"https://{WORKSPACE_NAME}-{ORG_ID}.cribl.cloud/api/v1"
group_url = f"{base_url}/m/{WORKER_GROUP_ID}"
# Estimated ingest rate block
# Sets the maximum estimated ingest throughput tier for the new Worker Group
# using EstimatedIngestRateOptionsConfigGroup. RATE24_MB_PER_SEC is the 24 MB/s
# tier, equivalent to 24 MB/s maximum estimated ingest rate with 9 Worker
# Processes. This value is passed as estimated_ingest_rate to groups.create in
# the Create Worker Group block.
ESTIMATED_INGEST_RATE = EstimatedIngestRateOptionsConfigGroup.RATE24_MB_PER_SEC
# Worker Group definition block
# The configuration for a new Worker Group that uses the WORKER_GROUP_ID
# provided in the user-supplied parameters block. The settings in this
# configuration are passed to groups.create in the Create Worker Group block,
# with ESTIMATED_INGEST_RATE from the estimated ingest rate block.
group = ConfigGroup(
id=WORKER_GROUP_ID,
name="my-worker-group",
description="Cribl.Cloud Worker Group",
cloud=ConfigGroupCloud(provider=CloudProvider.AWS, region="us-west-2"),
worker_remote_access=True,
is_fleet=False,
is_search=False,
on_prem=False,
provisioned=False,
)
# Workflow block
# The async function that contains the full automation and runs when you
# execute this file. Authenticates using your Cribl.Cloud API Credentials,
# creates the Worker Group, and commits and deploys the configuration.
async def main():
# Authentication block
# Creates an OAuth client (SchemeClientOauth) that exchanges CLIENT_ID and
# CLIENT_SECRET from the user-supplied parameters block for a Bearer
# token and wraps the client in Security.
# Constructs the SDK client CriblControlPlane to make authenticated API
# requests using the base_url from the URL block and Security, which holds
# the Bearer token.
client_oauth = SchemeClientOauth(
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
token_url="https://login.cribl.cloud/oauth/token",
audience="https://api.cribl.cloud",
)
security = Security(client_oauth=client_oauth)
cribl = CriblControlPlane(server_url=base_url, security=security)
# Create Worker Group block
# Checks for a Worker Group with the WORKER_GROUP_ID provided in the
# user-supplied parameters block and exits if it already exists.
# Otherwise, creates the Worker Group using settings from the Worker Group
# definition block and ESTIMATED_INGEST_RATE from the Estimated ingest
# rate block and prints a confirmation message.
worker_group_response = cribl.groups.get(id=group.id, product=ProductsCore.STREAM)
if worker_group_response.items and len(worker_group_response.items) > 0:
print(f"❌ Worker Group already exists: {group.id}. Try a different group ID.")
return
cribl.groups.create(
product=ProductsCore.STREAM,
id=group.id,
name=group.name,
description=group.description,
cloud=group.cloud,
worker_remote_access=group.worker_remote_access,
is_fleet=group.is_fleet,
is_search=group.is_search,
on_prem=group.on_prem,
estimated_ingest_rate=ESTIMATED_INGEST_RATE,
provisioned=group.provisioned,
)
print(f"✅ Worker Group created: {group.id}")
# Commit block
# Records a new version of the Worker Group configuration, marks that
# version as effective, and captures the commit ID to use in the
# Deploy block. Raises an exception if the API returns no commit.
# Otherwise, prints a confirmation message.
commit_response = cribl.versions.commits.create(
message="Commit for create Worker Group example",
effective=True,
files=["."],
server_url=group_url,
)
if not commit_response.items or len(commit_response.items) == 0:
raise Exception("Failed to commit configuration changes")
version = commit_response.items[0].commit
print(
f"✅ Committed configuration changes to the group: {group.id}, "
f"commit ID: {version}"
)
# Deploy block
# Pushes the committed configuration version (using the commit ID from
# the Commit block) to the Cribl Stream Worker Group so that Workers load
# and run that version, then prints a confirmation message.
cribl.groups.deploy(
product=ProductsCore.STREAM,
id=group.id,
version=version,
)
print(f"✅ Worker Group changes deployed: {group.id}")
# Script entry block
# Starts the async function main() with the standard library helper
# asyncio.run and prints an error message if the run fails. Runs only when you
# execute this file as the main script (not when another file imports it).
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception as error:
print(f"❌ Something went wrong: {error}")
"""
Create a Worker Group
- Creates a new Worker Group in Cribl Stream.
NOTE: This example is for on-prem deployments only.
Required to use this example:
- An Enterprise license.
- A server URL, which is used to build the base URL for the SDK calls.
- A username and password, which are used to authenticate the SDK calls.
The username and password credentials are sensitive information and should
be kept private.
"""
# Import block
# Imports asyncio so that the file can await the on-prem token request and
# other asynchronous control plane calls (Worker Group create, commit, deploy).
#
# Imports CriblControlPlane as the API client from the cribl_control_plane
# SDK package.
#
# Imports Security (Bearer token wrapper after username/password login),
# generated model types for Cribl Stream resources, ProductsCore for
# product identifiers, and other API payloads from the
# cribl_control_plane.models subpackage.
import asyncio
from cribl_control_plane import CriblControlPlane
from cribl_control_plane.models import ProductsCore, Security
# User-supplied parameters block
# Values to use in the URL block, Authentication block, and Worker Group
# configuration block. Replace the placeholder values before executing
# this file.
ONPREM_SERVER_URL = "http://localhost:9000" # Replace with your server URL
ONPREM_USERNAME = "admin" # Replace with your username
ONPREM_PASSWORD = "admin" # Replace with your password
WORKER_GROUP_ID = "your-worker-group-id"
# URL block
# Builds the base URL and Worker Group-specific URL to use for the
# API requests that this file makes using the ONPREM_SERVER_URL and
# WORKER_GROUP_ID provided in the user-supplied parameters block.
base_url = f"{ONPREM_SERVER_URL}/api/v1"
group_url = f"{base_url}/m/{WORKER_GROUP_ID}"
# Workflow block
# The async function that contains the full automation and runs when you
# execute this file. Authenticates using your username and password,
# creates the Worker Group, and commits and deploys the configuration.
async def main():
# Authentication block
# Constructs CriblControlPlane with the base_url from the URL block and
# calls the on-prem authentication endpoint with the username and password
# from the user-supplied parameters block.
# Retrieves the Bearer token from the on-prem authentication endpoint
# response and wraps the token in Security.
# Reconstructs CriblControlPlane as an authenticated SDK client using the
# same base_url and Security (which holds the Bearer token).
cribl = CriblControlPlane(server_url=base_url)
response = await cribl.auth.tokens.get_async(
username=ONPREM_USERNAME, password=ONPREM_PASSWORD
)
token = response.result.token
security = Security(bearer_auth=token)
cribl = CriblControlPlane(server_url=base_url, security=security)
# Create Worker Group block
# Checks for a Worker Group with the WORKER_GROUP_ID provided in the
# user-supplied parameters block and exits if it already exists.
# Otherwise, creates the Worker Group and prints a confirmation message.
worker_group_response = cribl.groups.get(id=WORKER_GROUP_ID, product=ProductsCore.STREAM)
if worker_group_response.items and len(worker_group_response.items) > 0:
print(f"❌ Worker Group already exists: {WORKER_GROUP_ID}. Try a different group ID.")
return
cribl.groups.create(
product=ProductsCore.STREAM,
id=WORKER_GROUP_ID,
name="my-worker-group",
description="My Worker Group description",
worker_remote_access=True,
is_fleet=False,
is_search=False,
on_prem=True,
)
print(f"✅ Worker Group created: {WORKER_GROUP_ID}")
# Commit block
# Records a new version of the Worker Group configuration, marks that
# version as effective, and captures the commit ID to use in the
# Deploy block. Raises an exception if the API returns no commit.
# Otherwise, prints a confirmation message.
commit_response = cribl.versions.commits.create(
message="Commit for Cribl Stream example for creating a Worker Group",
server_url=group_url,
effective=True,
files=["."],
)
if not commit_response.items or len(commit_response.items) == 0:
raise Exception("Failed to commit configuration changes")
version = commit_response.items[0].commit
print(f"✅ Committed configuration changes to the group: {WORKER_GROUP_ID}, commit ID: {version}")
# Deploy block
# Pushes the committed configuration version (using the commit ID from
# the Commit block) to the Cribl Stream Worker Group so that Workers load
# and run that version, then prints a confirmation message.
cribl.groups.deploy(
product=ProductsCore.STREAM,
id=WORKER_GROUP_ID,
version=version,
)
print(f"✅ Worker Group changes deployed: {WORKER_GROUP_ID}")
# Script entry block
# Starts the async function main() with the standard library helper
# asyncio.run and prints an error message if the run fails. Runs only when you
# execute this file as the main script (not when another file imports it).
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception as error:
print(f"❌ Something went wrong: {error}")
2. Scale and Provision the Worker Group
The Cribl.Cloud example scales the Worker Group to an estimatedIngestRate of 4096, which is equivalent to a maximum of 48 MB/s with 21 Worker Processes. The example also sets provisioned to True to activate Cribl.Cloud resources.
The Cribl SDKs do not support scaling Worker Groups in on-prem deployments.
The
groups.updatemethod requires a complete representation of the Worker Group that you want to update in the request body. This method does not support partial updates. Cribl removes any omitted fields when updating the Worker Group to scale and provision it.
"""
Scale (Provision) a Worker Group
- Verifies that a Worker Group exists in Cribl Stream.
- Updates estimated ingest rate and provisioning for the Worker Group.
- Commits and deploys the Worker Group configuration to make it active.
NOTE: This example is for Cribl.Cloud deployments only.
Required to use this example:
- A Cribl.Cloud Enterprise account (if you are scaling a non-default Worker Group).
- A Cribl.Cloud Organization ID and Workspace name, which are used to build the
base URL for the SDK calls.
- The Client ID and Secret for a Cribl.Cloud API Credential, which are used to
authenticate the SDK calls. To get the Client ID and Secret, follow
https://docs.cribl.io/cribl-as-code/sdks-auth/#sdks-auth-cloud. The Client ID
and Secret are sensitive information and should be kept private.
- An existing Worker Group ID (WORKER_GROUP_ID) to scale.
"""
# Import block
# Imports asyncio so that the file can run an asynchronous control plane
# sequence for authentication, scaling a Worker Group, and committing and
# deploying the configuration.
#
# Imports CriblControlPlane as the API client from the cribl_control_plane
# SDK package.
#
# Imports model classes that provide the Python types used for Cribl Stream
# resource configurations and API payloads in this file from the
# cribl_control_plane.models subpackage.
import asyncio
from cribl_control_plane import CriblControlPlane
from cribl_control_plane.models import (
ProductsCore,
SchemeClientOauth,
Security,
EstimatedIngestRateOptionsConfigGroup,
)
# User-supplied parameters block
# Values to use in the URL block, Authentication block, and Scale Worker Group
# block. Replace the placeholder values before executing this file.
ORG_ID = "your-org-id" # Replace with the Organization ID
WORKSPACE_NAME = "your-workspace-name" # Replace with the Workspace name
WORKER_GROUP_ID = "your-cloud-worker-group-id" # Replace with the Worker Group ID to scale
CLIENT_ID = "your-client-id" # Replace with the Client ID for the API Credential
CLIENT_SECRET = "your-client-secret" # Replace with the Client Secret for the API Credential
# URL block
# Builds the base URL and Worker Group-specific URL to use for the API requests
# that this file makes using the ORG_ID, WORKSPACE_NAME, and WORKER_GROUP_ID
# from the user-supplied parameters block. The Worker Group URL is used for the
# Commit block.
base_url = f"https://{WORKSPACE_NAME}-{ORG_ID}.cribl.cloud/api/v1"
group_url = f"{base_url}/m/{WORKER_GROUP_ID}"
# Estimated ingest rate block
# Target tier after scaling, using EstimatedIngestRateOptionsConfigGroup.
# RATE48_MB_PER_SEC is the 48 MB/s tier, equivalent to 48 MB/s maximum estimated
# ingest rate with 21 Worker Processes. This value is written to
# estimated_ingest_rate in the Scale Worker Group block.
SCALED_ESTIMATED_INGEST_RATE = EstimatedIngestRateOptionsConfigGroup.RATE48_MB_PER_SEC
# Workflow block
# The async function that contains the full automation and runs when you
# execute this file. Authenticates using your Cribl.Cloud API Credentials,
# verifies the Worker Group, updates scaling fields, and commits and deploys the
# configuration.
async def main():
# Authentication block
# Creates an OAuth client (SchemeClientOauth) that exchanges CLIENT_ID and
# CLIENT_SECRET from the user-supplied parameters block for a Bearer token
# and wraps the client in Security.
# Constructs the SDK client CriblControlPlane to make authenticated API
# requests using the base_url from the URL block and Security, which holds
# the Bearer token.
client_oauth = SchemeClientOauth(
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
token_url="https://login.cribl.cloud/oauth/token",
audience="https://api.cribl.cloud",
)
security = Security(client_oauth=client_oauth)
cribl = CriblControlPlane(server_url=base_url, security=security)
# Verify Worker Group block
# Fetches the Worker Group using WORKER_GROUP_ID from the user-supplied
# parameters block and exits if it does not exist so that scaling does
# not run against a missing Worker Group.
worker_group_response = cribl.groups.get(id=WORKER_GROUP_ID, product=ProductsCore.STREAM)
if not worker_group_response.items or len(worker_group_response.items) == 0:
print(f"❌ Worker Group not found: {WORKER_GROUP_ID}. Create the Worker Group first.")
return
group = worker_group_response.items[0]
if not group or not group.id:
print(f"❌ Worker Group not found: {WORKER_GROUP_ID}. Create the Worker Group first.")
return
# Scale Worker Group block
# Sets estimated_ingest_rate to SCALED_ESTIMATED_INGEST_RATE from the Estimated
# ingest rate block and provisioned to True, then persists the Worker Group with
# groups.update and prints a confirmation message.
group.estimated_ingest_rate = SCALED_ESTIMATED_INGEST_RATE
group.provisioned = True
cribl.groups.update(
product=ProductsCore.STREAM,
id=group.id,
id_param=group.id,
name=group.name,
description=group.description,
cloud=group.cloud,
worker_remote_access=group.worker_remote_access,
is_fleet=group.is_fleet,
is_search=group.is_search,
on_prem=group.on_prem,
estimated_ingest_rate=group.estimated_ingest_rate,
provisioned=group.provisioned,
)
print(f"✅ Worker Group scaled and provisioned: {group.id}")
# Commit block
# Records a new version of the Worker Group configuration, marks that
# version as effective, and captures the commit ID to use in the Deploy
# block. Raises an exception if the API returns no commit. Otherwise, prints
# a confirmation message.
commit_response = cribl.versions.commits.create(
message="Commit for scale Worker Group example",
effective=True,
files=["."],
server_url=group_url,
)
if not commit_response.items or len(commit_response.items) == 0:
raise Exception("Failed to commit configuration changes")
version = commit_response.items[0].commit
print(f"✅ Committed configuration changes to the group: {group.id}, commit ID: {version}")
# Deploy block
# Pushes the committed configuration version (using the commit ID from the
# Commit block) to the Cribl Stream Worker Group so that Workers load and run
# that version, then prints a confirmation message.
cribl.groups.deploy(
product=ProductsCore.STREAM,
id=group.id,
version=version,
)
print(f"✅ Worker Group changes deployed: {group.id}")
# Script entry block
# Starts the async function main() with the standard library helper
# asyncio.run and prints an error message if the run fails. Runs only when you
# execute this file as the main script (not when another file imports it).
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception as error:
print(f"❌ Something went wrong: {error}")
3. Replicate a Worker Group
This example creates a replica Worker Group in Cribl Stream by cloning an existing Worker Group configuration. The request body uses the source_group_id parameter to specify the Worker Group to clone.
The replica Worker Group inherits the configuration from the source Worker Group, including settings and resources like Sources, Destinations, and Pipelines.
To run this example, you must have at least one existing Worker Group named my-worker-group in Cribl Stream to use as the source Worker Group.
"""
Replicate a Worker Group
- Verifies that a source Worker Group exists in Cribl Stream.
- Creates a replica Worker Group cloned from the source with the
same configuration.
- Commits and deploys the replica Worker Group configuration to make it active.
NOTE: This example is for Cribl.Cloud deployments only.
Required to use this example:
- A Cribl.Cloud Enterprise account.
- A Cribl.Cloud Organization ID and Workspace name, which are used to build the
base URL for the SDK calls.
- The Client ID and Secret for a Cribl.Cloud API Credential, which are used to
authenticate the SDK calls. To get the Client ID and Secret, follow
https://docs.cribl.io/cribl-as-code/sdks-auth/#sdks-auth-cloud. The Client ID
and Secret are sensitive information and should be kept private.
- An existing Worker Group to replicate.
"""
# Import block
# Imports asyncio so that the file can run an asynchronous control plane
# sequence for authentication, replicating a Worker Group, and committing and
# deploying the replica configuration.
#
# Imports CriblControlPlane as the API client from the cribl_control_plane
# SDK package.
#
# Imports model classes that provide the Python types used for Cribl Stream
# resource configurations and API payloads in this file from the
# cribl_control_plane.models subpackage.
import asyncio
from cribl_control_plane import CriblControlPlane
from cribl_control_plane.models import (
CloudProvider,
ConfigGroupCloud,
EstimatedIngestRateOptionsConfigGroup,
ProductsCore,
SchemeClientOauth,
Security,
)
# User-supplied parameters block
# Values to use in the URL block, Authentication block, and Replicate Worker
# Group block. Replace the placeholder values before executing this file.
ORG_ID = "your-org-id" # Replace with the Organization ID
WORKSPACE_NAME = "your-workspace-name" # Replace with the Workspace name
SOURCE_WORKER_GROUP_ID = "my-worker-group" # Replace with the ID of the Worker Group to clone
REPLICA_WORKER_GROUP_ID = "my-replica-worker-group" # Replace with the ID for the new replica Worker Group
CLIENT_ID = "your-client-id" # Replace with the Client ID for the API Credential
CLIENT_SECRET = "your-client-secret" # Replace with the Client Secret for the API Credential
# URL block
# Builds the base URL and replica Worker Group-specific URL to use for the API
# requests that this file makes using the ORG_ID, WORKSPACE_NAME, and
# REPLICA_WORKER_GROUP_ID from the user-supplied parameters block.
base_url = f"https://{WORKSPACE_NAME}-{ORG_ID}.cribl.cloud/api/v1"
group_url = f"{base_url}/m/{REPLICA_WORKER_GROUP_ID}"
# Estimated ingest rate block
# Sets the maximum estimated ingest throughput tier for the replica Worker Group
# using EstimatedIngestRateOptionsConfigGroup. RATE24_MB_PER_SEC is the 24 MB/s
# tier, equivalent to 24 MB/s maximum estimated ingest rate with 9 Worker
# Processes. This value is passed as estimated_ingest_rate to groups.create in
# the Replicate Worker Group block.
ESTIMATED_INGEST_RATE = EstimatedIngestRateOptionsConfigGroup.RATE24_MB_PER_SEC
# Workflow block
# The async function that contains the full automation and runs when you
# execute this file. Authenticates using your Cribl.Cloud API Credentials,
# verifies the source Worker Group, creates the replica, and commits and deploys
# the replica configuration.
async def main():
# Authentication block
# Creates an OAuth client (SchemeClientOauth) that exchanges CLIENT_ID and
# CLIENT_SECRET from the user-supplied parameters block for a Bearer token
# and wraps the client in Security.
# Constructs the SDK client CriblControlPlane to make authenticated API
# requests using the base_url from the URL block and Security, which holds
# the Bearer token.
client_oauth = SchemeClientOauth(
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
token_url="https://login.cribl.cloud/oauth/token",
audience="https://api.cribl.cloud",
)
security = Security(client_oauth=client_oauth)
cribl = CriblControlPlane(server_url=base_url, security=security)
# Verify source Worker Group block
# Fetches the source Worker Group using SOURCE_WORKER_GROUP_ID from the
# user-supplied parameters block. Exits with a message if the group does not
# exist so replication does not run without a valid clone source.
source_group_response = cribl.groups.get(
id=SOURCE_WORKER_GROUP_ID, product=ProductsCore.STREAM
)
if not source_group_response.items or len(source_group_response.items) == 0:
print(f"❌ Source Worker Group not found: {SOURCE_WORKER_GROUP_ID}. Create the source Worker Group first.")
return
# Replicate Worker Group block
# Checks for a Worker Group with REPLICA_WORKER_GROUP_ID from the user-supplied
# parameters block and exits if it already exists. Otherwise, creates the
# replica using groups.create with source_group_id set to the source Worker
# Group ID, cloud settings, and ESTIMATED_INGEST_RATE from the Estimated
# ingest rate block, then prints a confirmation message.
replica_group_response = cribl.groups.get(
id=REPLICA_WORKER_GROUP_ID, product=ProductsCore.STREAM
)
if replica_group_response.items and len(replica_group_response.items) > 0:
print(f"❌ Replica Worker Group already exists: {REPLICA_WORKER_GROUP_ID}. Try a different Worker Group ID.")
return
cribl.groups.create(
product=ProductsCore.STREAM,
id=REPLICA_WORKER_GROUP_ID,
name="my-replica-worker-group",
description=f"Worker Group cloned from {SOURCE_WORKER_GROUP_ID} with identical configuration",
cloud=ConfigGroupCloud(provider=CloudProvider.AWS, region="us-east-1"),
worker_remote_access=True,
is_fleet=False,
is_search=False,
on_prem=False,
estimated_ingest_rate=ESTIMATED_INGEST_RATE,
source_group_id=SOURCE_WORKER_GROUP_ID,
)
print(f"✅ Worker Group replicated: {REPLICA_WORKER_GROUP_ID} (cloned from {SOURCE_WORKER_GROUP_ID})")
# Commit block
# Records a new version of the replica Worker Group configuration, marks that
# version as effective, and captures the commit ID to use in the Deploy
# block. Raises an exception if the API returns no commit. Otherwise, prints
# a confirmation message. Uses group_url from the URL block.
commit_response = cribl.versions.commits.create(
message="Commit for replicate Worker Group example",
effective=True,
files=["."],
server_url=group_url,
)
if not commit_response.items or len(commit_response.items) == 0:
raise Exception("Failed to commit configuration changes")
version = commit_response.items[0].commit
print(f"✅ Committed configuration changes to the group: {REPLICA_WORKER_GROUP_ID}, commit ID: {version}")
# Deploy block
# Pushes the committed configuration version (using the commit ID from the
# Commit block) to the Cribl Stream replica Worker Group so that Workers load
# and run that version, then prints a confirmation message.
cribl.groups.deploy(
product=ProductsCore.STREAM,
id=REPLICA_WORKER_GROUP_ID,
version=version,
)
print(f"✅ Worker Group changes deployed: {REPLICA_WORKER_GROUP_ID}")
# Script entry block
# Starts the async function main() with the standard library helper
# asyncio.run and prints an error message if the run fails. Runs only when you
# execute this file as the main script (not when another file imports it).
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception as error:
print(f"❌ Something went wrong: {error}")
"""
Replicate a Worker Group
- Verifies that a source Worker Group exists in Cribl Stream.
- Creates a replica Worker Group cloned from the source with the
same configuration.
- Commits and deploys the replica Worker Group configuration to make it active.
NOTE: This example is for on-prem deployments only.
Required to use this example:
- An Enterprise license.
- A server URL, which is used to build the base URL for the SDK calls.
- A username and password, which are used to authenticate the SDK calls.
The username and password credentials are sensitive information and should
be kept private.
- An existing Worker Group to replicate.
"""
# Import block
# Imports asyncio so that the file can await the on-prem token request and
# other asynchronous control plane calls (Worker Group replicate, commit,
# deploy).
#
# Imports CriblControlPlane as the API client from the cribl_control_plane
# SDK package.
#
# Imports Security (Bearer token wrapper after username/password login),
# ProductsCore for product identifiers, and other API payloads from the
# cribl_control_plane.models subpackage.
import asyncio
from cribl_control_plane import CriblControlPlane
from cribl_control_plane.models import ProductsCore, Security
# User-supplied parameters block
# Values to use in the URL block, Authentication block, and Replicate Worker
# Group block. Replace the placeholder values before executing this file.
ONPREM_SERVER_URL = "http://localhost:9000" # Replace with your server URL
ONPREM_USERNAME = "admin" # Replace with your username
ONPREM_PASSWORD = "admin" # Replace with your password
SOURCE_WORKER_GROUP_ID = "my-worker-group" # Replace with the ID of the Worker Group to clone
REPLICA_WORKER_GROUP_ID = "my-replica-worker-group" # Replace with the ID for the new replica Worker Group
# URL block
# Builds the base URL and replica Worker Group-specific URL to use for the
# API requests that this file makes using the ONPREM_SERVER_URL and
# REPLICA_WORKER_GROUP_ID from the user-supplied parameters block.
base_url = f"{ONPREM_SERVER_URL}/api/v1"
group_url = f"{base_url}/m/{REPLICA_WORKER_GROUP_ID}"
# Workflow block
# The async function that contains the full automation and runs when you
# execute this file. Authenticates using your username and password, verifies
# the source Worker Group, creates the replica, and commits and deploys the
# replica configuration.
async def main():
# Authentication block
# Constructs CriblControlPlane with the base_url from the URL block and
# calls the on-prem authentication endpoint with the username and password
# from the user-supplied parameters block.
# Retrieves the Bearer token from the on-prem authentication endpoint
# response and wraps the token in Security.
# Reconstructs CriblControlPlane as an authenticated SDK client using the
# same base_url and Security (which holds the Bearer token).
cribl = CriblControlPlane(server_url=base_url)
response = await cribl.auth.tokens.get_async(
username=ONPREM_USERNAME, password=ONPREM_PASSWORD
)
token = response.result.token
security = Security(bearer_auth=token)
cribl = CriblControlPlane(server_url=base_url, security=security)
# Verify source Worker Group block
# Fetches the source Worker Group using SOURCE_WORKER_GROUP_ID from the
# user-supplied parameters block and exits if it does not exist so that
# replication does not run without a valid source Worker Group.
source_group_response = cribl.groups.get(id=SOURCE_WORKER_GROUP_ID, product=ProductsCore.STREAM)
if not source_group_response.items or len(source_group_response.items) == 0:
print(f"❌ Source Worker Group not found: {SOURCE_WORKER_GROUP_ID}. Create the source Worker Group first.")
return
# Replicate Worker Group block
# Checks for a Worker Group with REPLICA_WORKER_GROUP_ID from the user-supplied
# parameters block and exits if it already exists. Otherwise, creates the
# replica using groups.create with source_group_id set to the source Worker
# Group ID, then prints a confirmation message.
replica_group_response = cribl.groups.get(id=REPLICA_WORKER_GROUP_ID, product=ProductsCore.STREAM)
if replica_group_response.items and len(replica_group_response.items) > 0:
print(f"❌ Replica Worker Group already exists: {REPLICA_WORKER_GROUP_ID}. Try a different group ID.")
return
cribl.groups.create(
product=ProductsCore.STREAM,
id=REPLICA_WORKER_GROUP_ID,
name="my-replica-worker-group",
description=f"Worker Group cloned from {SOURCE_WORKER_GROUP_ID} with identical configuration",
worker_remote_access=True,
is_fleet=False,
is_search=False,
on_prem=True,
source_group_id=SOURCE_WORKER_GROUP_ID,
)
print(f"✅ Worker Group replicated: {REPLICA_WORKER_GROUP_ID} (cloned from {SOURCE_WORKER_GROUP_ID})")
# Commit block
# Records a new version of the replica Worker Group configuration, marks that
# version as effective, and captures the commit ID to use in the Deploy
# block. Raises an exception if the API returns no commit. Otherwise, prints
# a confirmation message.
commit_response = cribl.versions.commits.create(
message="Commit for Cribl Stream example for replicating a Worker Group",
server_url=group_url,
effective=True,
files=["."],
)
if not commit_response.items or len(commit_response.items) == 0:
raise Exception("Failed to commit configuration changes")
version = commit_response.items[0].commit
print(f"✅ Committed configuration changes to the group: {REPLICA_WORKER_GROUP_ID}, commit ID: {version}")
# Deploy block
# Pushes the committed configuration version (using the commit ID from the
# Commit block) to the Cribl Stream replica Worker Group so that Workers load
# and run that version, then prints a confirmation message.
cribl.groups.deploy(
product=ProductsCore.STREAM,
id=REPLICA_WORKER_GROUP_ID,
version=version,
)
print(f"✅ Worker Group changes deployed: {REPLICA_WORKER_GROUP_ID}")
# Script entry block
# Starts the async function main() with the standard library helper
# asyncio.run and prints an error message if the run fails. Runs only when you
# execute this file as the main script (not when another file imports it).
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception as error:
print(f"❌ Something went wrong: {error}")
4. Confirm the Worker Group Configuration
Use this example to retrieve a list of all Worker Groups in Cribl Stream so that you can review and confirm their configurations.
"""
List Worker Groups
- Lists all Worker Group configurations for Cribl Stream in a Workspace.
NOTE: This example is for Cribl.Cloud deployments only.
Required to use this example:
- A Cribl.Cloud Organization ID and Workspace name, which are used to build the
base URL for the SDK calls.
- The Client ID and Secret for a Cribl.Cloud API Credential, which are used to
authenticate the SDK calls. To get the Client ID and Secret, follow
https://docs.cribl.io/cribl-as-code/sdks-auth/#sdks-auth-cloud. The Client ID
and Secret are sensitive information and should be kept private.
"""
# Import block
# Imports asyncio so that the file can run an asynchronous control plane
# sequence for authentication and listing Worker Groups.
#
# Imports CriblControlPlane as the API client from the cribl_control_plane
# SDK package.
#
# Imports model classes that provide the Python types used for Cribl Stream
# resource configurations and API payloads in this file from the
# cribl_control_plane.models subpackage.
import asyncio
from cribl_control_plane import CriblControlPlane
from cribl_control_plane.models import (
ProductsCore,
SchemeClientOauth,
Security,
)
# User-supplied parameters block
# Values to use in the URL block, Authentication block, and List Worker Groups
# block. Replace the placeholder values before executing this file.
ORG_ID = "your-org-id" # Replace with the Organization ID
WORKSPACE_NAME = "your-workspace-name" # Replace with the Workspace name
CLIENT_ID = "your-client-id" # Replace with the Client ID for the API Credential
CLIENT_SECRET = "your-client-secret" # Replace with the Client Secret for the API Credential
# URL block
# Builds the base URL to use for the API requests that this file makes using the
# ORG_ID and WORKSPACE_NAME provided in the user-supplied parameters block.
base_url = f"https://{WORKSPACE_NAME}-{ORG_ID}.cribl.cloud/api/v1"
# Workflow block
# The async function that contains the full automation and runs when you
# execute this file. Authenticates using your Cribl.Cloud API Credentials and
# lists Worker Groups for Cribl Stream.
async def main():
# Authentication block
# Creates an OAuth client (SchemeClientOauth) that exchanges CLIENT_ID and
# CLIENT_SECRET from the user-supplied parameters block for a Bearer token
# and wraps the client in Security.
# Constructs the SDK client CriblControlPlane to make authenticated API
# requests using the base_url from the URL block and Security, which holds
# the Bearer token.
client_oauth = SchemeClientOauth(
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
token_url="https://login.cribl.cloud/oauth/token",
audience="https://api.cribl.cloud",
)
security = Security(client_oauth=client_oauth)
cribl = CriblControlPlane(server_url=base_url, security=security)
# List Worker Groups block
# Calls groups.list for Cribl Stream (ProductsCore.STREAM). If the response
# includes items, prints each Worker Group configuration; otherwise prints
# that no Worker Groups were found.
worker_groups_response = cribl.groups.list(product=ProductsCore.STREAM)
if worker_groups_response.items:
print(f"✅ List of Worker Group Configurations:")
for group in worker_groups_response.items:
print(group) # Print the entire configuration for each Worker Group
else:
print(f"❌ No Worker Groups found.")
# Script entry block
# Starts the async function main() with the standard library helper
# asyncio.run and prints an error message if the run fails. Runs only when you
# execute this file as the main script (not when another file imports it).
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception as error:
print(f"❌ Something went wrong: {error}")
"""
List Worker Groups
- Lists all Worker Group configurations for Cribl Stream.
NOTE: This example is for on-prem deployments only.
Required to use this example:
- A server URL, which is used to build the base URL for the SDK calls.
- A username and password, which are used to authenticate the SDK calls.
The username and password credentials are sensitive information and should
be kept private.
"""
# Import block
# Imports asyncio so that the file can await the on-prem token request and
# other asynchronous control plane calls (listing Worker Groups).
#
# Imports CriblControlPlane as the API client from the cribl_control_plane
# SDK package.
#
# Imports Security (Bearer token wrapper after username/password login),
# ProductsCore for product identifiers, and other API payloads from the
# cribl_control_plane.models subpackage.
import asyncio
from cribl_control_plane import CriblControlPlane
from cribl_control_plane.models import (
ProductsCore,
Security,
)
# User-supplied parameters block
# Values to use in the URL block, Authentication block, and List Worker Groups
# block. Replace the placeholder values before executing this file.
ONPREM_SERVER_URL = "http://localhost:9000" # Replace with your server URL
ONPREM_USERNAME = "admin" # Replace with your username
ONPREM_PASSWORD = "admin" # Replace with your password
# URL block
# Builds the base URL to use for the API requests that this file makes using the
# ONPREM_SERVER_URL provided in the user-supplied parameters block.
base_url = f"{ONPREM_SERVER_URL}/api/v1"
# Workflow block
# The async function that contains the full automation and runs when you
# execute this file. Authenticates using your username and password and lists
# Worker Groups for Cribl Stream.
async def main():
# Authentication block
# Constructs CriblControlPlane with the base_url from the URL block and
# calls the on-prem authentication endpoint with the username and password
# from the user-supplied parameters block.
# Retrieves the Bearer token from the on-prem authentication endpoint
# response and wraps the token in Security.
# Reconstructs CriblControlPlane as an authenticated SDK client using the
# same base_url and Security (which holds the Bearer token).
cribl = CriblControlPlane(server_url=base_url)
response = await cribl.auth.tokens.get_async(
username=ONPREM_USERNAME, password=ONPREM_PASSWORD
)
token = response.result.token
security = Security(bearer_auth=token)
cribl = CriblControlPlane(server_url=base_url, security=security)
# List Worker Groups block
# Calls groups.list for Cribl Stream (ProductsCore.STREAM). If the response
# includes items, prints each Worker Group configuration; otherwise prints
# that no Worker Groups were found.
worker_groups_response = cribl.groups.list(product=ProductsCore.STREAM)
if worker_groups_response.items:
print(f"✅ List of Worker Group Configurations:")
for group in worker_groups_response.items:
print(group) # Print the entire configuration for each Worker Group
else:
print(f"❌ No Worker Groups found.")
# Script entry block
# Starts the async function main() with the standard library helper
# asyncio.run and prints an error message if the run fails. Runs only when you
# execute this file as the main script (not when another file imports it).
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception as error:
print(f"❌ Something went wrong: {error}")
5. Commit and Deploy the Worker Group
This example demonstrates how to commit and deploy the Worker Group configuration, then commit to the Leader to keep it in sync with the Worker Group. You can commit and deploy immediately after a single create or update request or after multiple requests.
Committing and deploying the Worker Group configuration requires three requests, which the Python SDK example chains together:
- Commit pending changes to the Worker Group. This request commits only the configuration changes for Worker Groups by specifying the file
local/cribl/groups.yml. - Deploy the committed changes to the Worker Group. This request includes the
versionbody parameter, which uses the value ofcommitfrom the response body for the commit request. - Commit the changes to the Leader to keep the Leader in sync with the Worker Group.
"""
Commit and deploy configuration
- Commits pending configuration changes for a Cribl Stream Worker Group in a
Workspace, then deploys the commit so that Workers load it.
NOTE: This example is for Cribl.Cloud deployments only.
Required to use this example:
- A Cribl.Cloud Organization ID and Workspace name, which are used to build the
base URL for the SDK calls.
- The Client ID and Secret for a Cribl.Cloud API Credential, which are used to
authenticate the SDK calls. To get the Client ID and Secret, follow
https://docs.cribl.io/cribl-as-code/sdks-auth/#sdks-auth-cloud. The Client ID
and Secret are sensitive information and should be kept private.
- A Worker Group with pending changes to commit and deploy.
"""
# Import block
# Imports asyncio so that the file can run an asynchronous control plane
# sequence for authentication, commit, and deploy.
#
# Imports CriblControlPlane as the API client from the cribl_control_plane
# SDK package.
#
# Imports model classes that provide the Python types used for Cribl Stream
# resource configurations and API payloads in this file from the
# cribl_control_plane.models subpackage.
import asyncio
from cribl_control_plane import CriblControlPlane
from cribl_control_plane.models import (
ProductsCore,
SchemeClientOauth,
Security,
)
# User-supplied parameters block
# Values to use in the URL block, Authentication block, Commit block, and Deploy
# block. Replace the placeholder values before executing this file.
ORG_ID = "your-org-id" # Replace with the Organization ID
WORKSPACE_NAME = "your-workspace-name" # Replace with the Workspace name
WORKER_GROUP_ID = "your-cloud-worker-group-id" # Replace with the Worker Group ID to commit and deploy
CLIENT_ID = "your-client-id" # Replace with the Client ID for the API Credential
CLIENT_SECRET = "your-client-secret" # Replace with the Client Secret for the API Credential
# URL block
# Builds the base URL and Worker Group-specific URL to use for the API requests
# that this file makes using the ORG_ID, WORKSPACE_NAME, and WORKER_GROUP_ID from
# the user-supplied parameters block. The Worker Group URL is used for the Commit
# block.
base_url = f"https://{WORKSPACE_NAME}-{ORG_ID}.cribl.cloud/api/v1"
group_url = f"{base_url}/m/{WORKER_GROUP_ID}"
# Workflow block
# The async function that contains the full automation and runs when you
# execute this file. Authenticates using your Cribl.Cloud API Credentials,
# commits pending configuration for the Worker Group, and deploys that version.
async def main():
# Authentication block
# Creates an OAuth client (SchemeClientOauth) that exchanges CLIENT_ID and
# CLIENT_SECRET from the user-supplied parameters block for a Bearer token
# and wraps the client in Security.
# Constructs the SDK client CriblControlPlane to make authenticated API
# requests using the base_url from the URL block and Security, which holds
# the Bearer token.
client_oauth = SchemeClientOauth(
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
token_url="https://login.cribl.cloud/oauth/token",
audience="https://api.cribl.cloud",
)
security = Security(client_oauth=client_oauth)
cribl = CriblControlPlane(server_url=base_url, security=security)
# Commit block
# Records a new version of the Worker Group configuration, marks that
# version as effective, and captures the commit ID to use in the Deploy
# block. Raises an exception if the API returns no commit. Otherwise, prints
# a confirmation message.
commit_response = cribl.versions.commits.create(
message="Commit for Cribl Stream example",
effective=True,
files=["."],
server_url=group_url,
)
if not commit_response.items or len(commit_response.items) == 0:
raise Exception("Failed to commit configuration changes")
version = commit_response.items[0].commit
print(f"✅ Committed configuration changes to the group: {WORKER_GROUP_ID}, commit ID: {version}")
# Deploy block
# Pushes the committed configuration version (using the commit ID from the
# Commit block) to the Cribl Stream Worker Group so that Workers load and run
# that version, then prints a confirmation message.
cribl.groups.deploy(
product=ProductsCore.STREAM,
id=WORKER_GROUP_ID,
version=version,
)
print(f"✅ Worker Group changes deployed: {WORKER_GROUP_ID}")
# Script entry block
# Starts the async function main() with the standard library helper
# asyncio.run and prints an error message if the run fails. Runs only when you
# execute this file as the main script (not when another file imports it).
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception as error:
print(f"❌ Something went wrong: {error}")
"""
Commit and deploy configuration
- Commits pending configuration changes for a Worker Group, then deploys the
commit so that Workers load it.
NOTE: This example is for on-prem deployments only.
Required to use this example:
- A server URL, which is used to build the base URL for the SDK calls.
- A username and password, which are used to authenticate the SDK calls.
The username and password credentials are sensitive information and should
be kept private.
- A Worker Group with pending changes to commit and deploy.
"""
# Import block
# Imports asyncio so that the file can await the on-prem token request and
# other asynchronous control plane calls (commit and deploy).
#
# Imports CriblControlPlane as the API client from the cribl_control_plane
# SDK package.
#
# Imports Security (Bearer token wrapper after username/password login),
# ProductsCore for product identifiers, and other API payloads from the
# cribl_control_plane.models subpackage.
import asyncio
from cribl_control_plane import CriblControlPlane
from cribl_control_plane.models import (
ProductsCore,
Security,
)
# User-supplied parameters block
# Values to use in the URL block, Authentication block, Commit block, and Deploy
# block. Replace the placeholder values before executing this file.
ONPREM_SERVER_URL = "http://localhost:9000" # Replace with your server URL
ONPREM_USERNAME = "admin" # Replace with your username
ONPREM_PASSWORD = "admin" # Replace with your password
WORKER_GROUP_ID = "your-worker-group-id" # Replace with the Worker Group ID to commit and deploy
# URL block
# Builds the base URL and Worker Group-specific URL to use for the API requests
# that this file makes using the ONPREM_SERVER_URL and WORKER_GROUP_ID provided in
# the user-supplied parameters block. The Worker Group URL is used for the Commit
# block.
base_url = f"{ONPREM_SERVER_URL}/api/v1"
group_url = f"{base_url}/m/{WORKER_GROUP_ID}"
# Workflow block
# The async function that contains the full automation and runs when you
# execute this file. Authenticates using your username and password, commits
# pending configuration for the Worker Group, and deploys that version.
async def main():
# Authentication block
# Constructs CriblControlPlane with the base_url from the URL block and
# calls the on-prem authentication endpoint with the username and password
# from the user-supplied parameters block.
# Retrieves the Bearer token from the on-prem authentication endpoint
# response and wraps the token in Security.
# Reconstructs CriblControlPlane as an authenticated SDK client using the
# same base_url and Security (which holds the Bearer token).
cribl = CriblControlPlane(server_url=base_url)
response = await cribl.auth.tokens.get_async(
username=ONPREM_USERNAME, password=ONPREM_PASSWORD
)
token = response.result.token
security = Security(bearer_auth=token)
cribl = CriblControlPlane(server_url=base_url, security=security)
# Commit block
# Records a new version of the Worker Group configuration, marks that
# version as effective, and captures the commit ID to use in the Deploy
# block. Raises an exception if the API returns no commit. Otherwise, prints
# a confirmation message.
commit_response = cribl.versions.commits.create(
message="Commit for Cribl Stream example",
effective=True,
files=["."],
server_url=group_url,
)
if not commit_response.items or len(commit_response.items) == 0:
raise Exception("Failed to commit configuration changes")
version = commit_response.items[0].commit
print(f"✅ Committed configuration changes to the group: {WORKER_GROUP_ID}, commit ID: {version}")
# Deploy block
# Pushes the committed configuration version (using the commit ID from the
# Commit block) to the Cribl Stream Worker Group so that Workers load and run
# that version, then prints a confirmation message.
cribl.groups.deploy(
product=ProductsCore.STREAM,
id=WORKER_GROUP_ID,
version=version,
)
print(f"✅ Worker Group changes deployed: {WORKER_GROUP_ID}")
# Script entry block
# Starts the async function main() with the standard library helper
# asyncio.run and prints an error message if the run fails. Runs only when you
# execute this file as the main script (not when another file imports it).
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception as error:
print(f"❌ Something went wrong: {error}")
Configure Worker Groups with the Cribl API
The examples in this section demonstrate how to create, scale, replicate, and deploy Worker Groups using the Cribl API.
The examples for creating and scaling a Worker Group on Cribl.Cloud include the estimatedIngestRate property, which allows you to configure Worker Groups for optimal performance. For each supported estimatedIngestRate value, the following table maps the corresponding throughput and number of Worker Processes:
estimatedIngestRate | Throughput | Worker Processes |
|---|---|---|
| 1024 | 12 MB/s | 6 |
| 2048 | 24 MB/s | 9 |
| 3072 | 36 MB/s | 14 |
| 4096 | 48 MB/s | 21 |
| 5120 | 60 MB/s | 30 |
| 7168 | 84 MB/s | 45 |
| 10240 | 120 MB/s | 62 |
| 13312 | 156 MB/s | 93 |
| 15360 | 180 MB/s | 186 |
1. Create a Worker Group
This example creates a new Worker Group in Cribl Stream.
In the Cribl.Cloud example, the estimatedIngestRate is set to 2048, which is equivalent to a maximum of 24 MB/s with nine Worker Processes. In the on-prem deployment example, the new Worker Group is created with eight Workers.
curl --request POST \
--url "https://${workspaceName}-${organizationId}.cribl.cloud/api/v1/products/stream/groups" \
--header "Authorization: Bearer ${token}" \
--header "Content-Type: application/json" \
--data '{
"id": "my-worker-group",
"name": "my-worker-group",
"description": "Cribl.Cloud Worker Group",
"cloud": {
"provider": "aws",
"region": "us-west-2"
},
"workerRemoteAccess": true,
"isFleet": false,
"isSearch": false,
"onPrem": false,
"estimatedIngestRate": 2048,
"provisioned": false
}'curl --request POST \
--url "https://${hostname}:${port}/api/v1/products/stream/groups" \
--header "Authorization: Bearer ${token}" \
--header "Content-Type: application/json" \
--data '{
"id": "my-worker-group",
"name": "my-worker-group",
"description": "My Worker Group description",
"workerRemoteAccess": true,
"isFleet": false,
"isSearch": false,
"onPrem": true,
"workerCount": 8
}'2. Scale and Provision the Worker Group
The Cribl.Cloud example scales the Worker Group to an estimatedIngestRate of 4096, which is equivalent to a maximum of 48 MB/s with 21 Worker Processes. The example also sets provisioned to true to activate Cribl.Cloud resources.
The on-prem deployment example scales the Worker Group to 11 Workers.
The
PATCH /products/{product}/groups/{id}endpoint requires a complete representation of the Worker Group that you want to update in the request body. This endpoint does not support partial updates. Cribl removes any omitted fields when updating the Worker Group to scale it.
curl --request PATCH \
--url "https://${workspaceName}-${organizationId}.cribl.cloud/api/v1/products/stream/groups/my-worker-group" \
--header "Authorization: Bearer ${token}" \
--header "Content-Type: application/json" \
--data '{
"id": "my-worker-group",
"name": "my-worker-group",
"description": "Cribl.Cloud Worker Group",
"cloud": {
"provider": "aws",
"region": "us-west-2"
},
"workerRemoteAccess": true,
"isFleet": false,
"isSearch": false,
"onPrem": false,
"estimatedIngestRate": 2048,
"provisioned": true
}'curl --request PATCH \
--url "https://${hostname}:${port}/api/v1/products/stream/groups/my-worker-group" \
--header "Authorization: Bearer ${token}" \
--header "Content-Type: application/json" \
--data '{
"id": "my-worker-group",
"name": "my-worker-group",
"workerRemoteAccess": true,
"description": "My Worker Group description",
"isFleet": false,
"isSearch": false,
"onPrem": true,
"workerCount": 11
}'3. Replicate the Worker Group
This example creates a replica Worker Group in Cribl Stream by cloning an existing Worker Group configuration. The new Worker Group uses the sourceGroupId body parameter to specify the Worker Group to clone. The replica Worker Group inherits the configuration from the source Worker Group, including settings and resources like Sources, Destinations, and Pipelines.
To run this example, you must have at least one existing Worker Group named my-worker-group in Cribl Stream to use as the source Worker Group.
curl --request POST \
--url "https://${workspaceName}-${organizationId}.cribl.cloud/api/v1/products/stream/groups" \
--header "Authorization: Bearer ${token}" \
--header "Content-Type: application/json" \
--data '{
"id": "my-replica-worker-group",
"name": "my-replica-worker-group",
"description": "Worker Group cloned from my-worker-group with identical configuration",
"cloud": {
"provider": "aws",
"region": "us-west-2"
},
"workerRemoteAccess": true,
"isFleet": false,
"isSearch": false,
"onPrem": false,
"sourceGroupId": "my-worker-group"
}'curl --request POST \
--url "https://${hostname}:${port}/api/v1/products/stream/groups" \
--header "Authorization: Bearer ${token}" \
--header "Content-Type: application/json" \
--data '{
"id": "my-replica-worker-group",
"name": "my-replica-worker-group",
"description": "Worker Group cloned from my-worker-group with identical configuration",
"workerRemoteAccess": true,
"isFleet": false,
"isSearch": false,
"onPrem": true,
"sourceGroupId": "my-worker-group"
}'4. Confirm the Worker Group Configuration
Use this example to retrieve a list of all Worker Groups in Cribl Stream so that you can review and confirm their configurations.
curl --request GET \
--url "https://${workspaceName}-${organizationId}.cribl.cloud/api/v1/products/stream/groups" \
--header "Authorization: Bearer ${token}" \
--header "Content-Type: application/json"curl --request GET \
--url "https://${hostname}:${port}/api/v1/products/stream/groups" \
--header "Authorization: Bearer ${token}" \
--header "Content-Type: application/json"5. Commit and Deploy the Worker Group
This example demonstrates how to commit and deploy the Worker Group configuration, then commit to the Leader to keep it in sync with the Worker Group. You can commit and deploy immediately after a single create or update request or after multiple requests.
First, commit pending changes to the Worker Group. This request commits only the configuration changes for Worker Groups by specifying the file local/cribl/groups.yml.
curl --request POST \
--url "https://${workspaceName}-${organizationId}.cribl.cloud/api/v1/version/commit?groupId=my-worker-group" \
--header "Authorization: Bearer ${token}" \
--header "Content-Type: application/json" \
--data '{
"files": [
"local/cribl/groups.yml"
],
"message": "Commit Worker Group configuration"
}'curl --request POST \
--url "https://${hostname}:${port}/api/v1/version/commit?groupId=my-worker-group" \
--header "Authorization: Bearer ${token}" \
--header "Content-Type: application/json" \
--data '{
"files": [
"local/cribl/groups.yml"
],
"message": "Commit Worker Group configuration"
}'Second, deploy the committed changes to the Worker Group. This request includes the version body parameter, which uses the value of commit from the response body for the commit request.
curl --request PATCH \
--url "https://${workspaceName}-${organizationId}.cribl.cloud/api/v1/products/stream/groups/my-worker-group/deploy" \
--header "Authorization: Bearer ${token}" \
--header "Content-Type: application/json" \
--data '{
"version": 1234abcd5678efgh9012ijkl3456mnop7EXAMPLE
}'curl --request PATCH \
--url "https://${hostname}:${port}/api/v1/products/stream/groups/my-worker-group/deploy" \
--header "Authorization: Bearer ${token}" \
--header "Content-Type: application/json" \
--data '{
"version": 1234abcd5678efgh9012ijkl3456mnop7EXAMPLE
}'Finally, commit the changes to the Leader to keep the Leader in sync with the Worker Group.
curl --request POST \
--url "https://${workspaceName}-${organizationId}.cribl.cloud/api/v1/version/commit" \
--header "Authorization: Bearer ${token}" \
--header "Content-Type: application/json" \
--data '{
"message": "Sync my-worker-group with Leader"
}curl --request POST \
--url "https://${hostname}:${port}/api/v1/version/commit" \
--header "Authorization: Bearer ${token}" \
--header "Content-Type: application/json" \
--data '{
"message": "Sync my-worker-group with Leader"
}