On This Page

Home / Cribl as Code/ Code Examples/Configure Worker Groups

Configure Worker Groups

Preview Feature

The Cribl SDKs are Preview features that are still being developed. We do not recommend using them in a production environment, because the features might not be fully tested or optimized for performance, and related documentation could be incomplete.

Please continue to submit feedback through normal Cribl support channels, but assistance might be limited while the features remain in Preview.

These code examples demonstrate how to use the Cribl Python SDK for the control plane or the Cribl API to configure, scale, and replicate Worker Groups in Cribl Stream.

About the Code Examples

The code examples use Bearer token authentication. Read the authentication documentation to learn how to get a Bearer token. The Permissions granted to your Bearer token must include creating and managing Worker Groups.

Replace the variables in the examples with the corresponding information for your Cribl deployment.

For customer-managed deployments, to use https in the URLs, you must configure Transport Layer Security (TLS).

The configurations in the examples do not include all available body parameters. For a complete list of body parameters for each endpoint, refer to the documentation in the API Reference.

Configure Worker Groups with the Python SDK

The examples in this section demonstrate how to create, scale, replicate, and deploy Worker Groups using the Cribl Python SDK for the control plane.

The examples for creating and scaling a Worker Group on Cribl.Cloud include the estimatedIngestRate property, which allows you to configure Worker Groups for optimal performance. For each supported estimatedIngestRate value, the following table maps the corresponding throughput and number of Worker Processes:

estimatedIngestRateThroughputWorker Processes
102412 MB/s6
204824 MB/s9
307236 MB/s14
409648 MB/s21
512060 MB/s30
716884 MB/s45
10240120 MB/s62
13312156 MB/s93
15360180 MB/s186

1. Create a Worker Group

This example creates a new Worker Group in Cribl Stream.

In the Cribl.Cloud example, the estimatedIngestRate is set to 2048, which is equivalent to a maximum of 24 MB/s with nine Worker Processes. In the customer-managed deployment example, the new Worker Group is created with eight Workers.

Python SDK (Cribl.Cloud)Python SDK (Customer-Managed Deployment)
"""
Replace the placeholder values for ORG_ID, CLIENT_ID, CLIENT_SECRET,
and WORKSPACE_NAME with your Organization ID, Client ID and Secret, and
Workspace name. To get your CLIENT_ID and CLIENT_SECRET values, follow
the steps at https://docs.cribl.io/cribl-as-code/authentication/#cloud-auth.
Your Client ID and Secret are sensitive information and should be kept private.

NOTE: This example is for Cribl.Cloud deployments only.
"""

import asyncio
from cribl_control_plane import CriblControlPlane
from cribl_control_plane.models import ConfigGroup, ConfigGroupCloud, CloudProvider, Security, SchemeClientOauth, ProductsCore

ORG_ID = "your-org-id"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"
WORKSPACE_NAME = "your-workspace-name"
WORKER_GROUP_ID = "your-cloud-worker-group-id"
base_url = f"https://{WORKSPACE_NAME}-{ORG_ID}.cribl.cloud/api/v1"

group = ConfigGroup(
    id=WORKER_GROUP_ID,
    name="my-worker-group",
    description="Cribl.Cloud Worker Group",
    cloud=ConfigGroupCloud(provider=CloudProvider.AWS, region="us-west-2"),
    worker_remote_access=True,
    is_fleet=False,
    is_search=False,
    on_prem=False,
    estimated_ingest_rate=2048,  # Equivalent to 24 MB/s maximum estimated ingest rate with 9 Worker Processes
    provisioned=False,
)


async def main():
    # Create authenticated SDK client
    client_oauth = SchemeClientOauth(
        client_id=CLIENT_ID,
        client_secret=CLIENT_SECRET,
        token_url="https://login.cribl.cloud/oauth/token",
        audience="https://api.cribl.cloud",
    )
    security = Security(client_oauth=client_oauth)
    cribl = CriblControlPlane(server_url=base_url, security=security)

    # Verify that Worker Group doesn't already exist
    worker_group_response = cribl.groups.get(id=group.id, product=ProductsCore.STREAM)
    if worker_group_response.items and len(worker_group_response.items) > 0:
        print(f"❌ Worker Group already exists: {group.id}. Try a different group ID.")
        return

    # Create the worker group
    cribl.groups.create(
        product=ProductsCore.STREAM,
        id=group.id,
        name=group.name,
        description=group.description,
        cloud=group.cloud,
        worker_remote_access=group.worker_remote_access,
        is_fleet=group.is_fleet,
        is_search=group.is_search,
        on_prem=group.on_prem,
        estimated_ingest_rate=group.estimated_ingest_rate,
        provisioned=group.provisioned,
    )
    print(f"✅ Worker Group created: {group.id}")


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except Exception as error:
        print(f"❌ Something went wrong: {error}")
"""
Replace the placeholder values for ONPREM_SERVER_URL, ONPREM_USERNAME, and
ONPREM_PASSWORD with your server URL and credentials. Your credentials are
sensitive information and should be kept private.

NOTE: This example is for customer-managed deployments only.
"""

import asyncio
from cribl_control_plane import CriblControlPlane
from cribl_control_plane.models import ConfigGroup, ProductsCore, Security

ONPREM_SERVER_URL = "http://localhost:9000"  # Replace with your server URL
ONPREM_USERNAME = "admin"  # Replace with your username
ONPREM_PASSWORD = "admin"  # Replace with your password
WORKER_GROUP_ID = "your-worker-group-id"
base_url = f"{ONPREM_SERVER_URL}/api/v1"

group = ConfigGroup(
    id=WORKER_GROUP_ID,
    name="my-worker-group",
    description="My Worker Group description",
    worker_remote_access=True,
    is_fleet=False,
    is_search=False,
    on_prem=True,
    worker_count=8,
)

group_url = f"{base_url}/m/{group.id}"


async def main():
    # Initialize Cribl client
    cribl = CriblControlPlane(server_url=base_url)
    response = await cribl.auth.tokens.get_async(
        username=ONPREM_USERNAME, password=ONPREM_PASSWORD
    )
    token = response.token
    security = Security(bearer_auth=token)
    cribl = CriblControlPlane(server_url=base_url, security=security)

    # Verify that Worker Group doesn't already exist
    worker_group_response = cribl.groups.get(id=group.id, product=ProductsCore.STREAM)
    if worker_group_response.items and len(worker_group_response.items) > 0:
        print(f"❌ Worker Group already exists: {group.id}. Try a different group ID.")
        return

    # Create the Worker Group
    cribl.groups.create(
        product=ProductsCore.STREAM,
        id=group.id,
        name=group.name,
        description=group.description,
        worker_remote_access=group.worker_remote_access,
        is_fleet=group.is_fleet,
        is_search=group.is_search,
        on_prem=group.on_prem,
        worker_count=group.worker_count,
    )
    print(f"✅ Worker Group created: {group.id}")


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except Exception as error:
        print(f"❌ Something went wrong: {error}")

2. Scale and Provision the Worker Group

The Cribl.Cloud example scales the Worker Group to an estimatedIngestRate of 4096, which is equivalent to a maximum of 48 MB/s with 21 Worker Processes. The example also sets provisioned to true to activate Cribl.Cloud resources.

The Cribl SDKs do not support scaling Worker Groups in customer-managed deployments.

The groups.update method requires a complete representation of the Worker Group that you want to update in the request body. This method does not support partial updates. Cribl removes any omitted fields when updating the Worker Group to scale and provision it.

Python SDK (Cribl.Cloud)
"""
Replace the placeholder values for ORG_ID, CLIENT_ID, CLIENT_SECRET,
and WORKSPACE_NAME with your Organization ID, Client ID and Secret, and
Workspace name. To get your CLIENT_ID and CLIENT_SECRET values, follow
the steps at https://docs.cribl.io/cribl-as-code/authentication/#cloud-auth.
Your Client ID and Secret are sensitive information and should be kept private.

NOTE: This example is for Cribl.Cloud deployments only.
"""

import asyncio
from typing import Optional

from cribl_control_plane import CriblControlPlane
from cribl_control_plane.models import (ConfigGroup, ProductsCore,
                                        SchemeClientOauth, Security)

ORG_ID = "your-org-id"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"
WORKSPACE_NAME = "your-workspace-name"
WORKER_GROUP_ID = "your-cloud-worker-group-id" # Use the same Worker Group ID as in the previous example
base_url = f"https://{WORKSPACE_NAME}-{ORG_ID}.cribl.cloud/api/v1"


def resolve_group(cribl, group_id: str, product: ProductsCore) -> Optional[ConfigGroup]:
    resp = cribl.groups.get(id=group_id, product=product)

    # Case 1: List-style wrapper with items
    items = getattr(resp, "items", None)
    if items:
        return items[0]

    # Case 2: Direct ConfigGroup object
    if isinstance(resp, ConfigGroup):
        return resp

    # Nothing found or unexpected shape
    return None


async def main():
    # Create authenticated SDK client
    client_oauth = SchemeClientOauth(
        client_id=CLIENT_ID,
        client_secret=CLIENT_SECRET,
        token_url="https://login.cribl.cloud/oauth/token",
        audience="https://api.cribl.cloud",
    )
    security = Security(client_oauth=client_oauth)
    cribl = CriblControlPlane(server_url=base_url, security=security)

    # Verify that Worker Group already exists
    group = resolve_group(cribl, WORKER_GROUP_ID, ProductsCore.STREAM)
    if group is None:
        print(f"Worker Group {WORKER_GROUP_ID} not found.")
    else:
        # Scale and provision the Worker Group
        # Equivalent to 48 MB/s maximum estimated ingest rate with 21 Worker Processes
        group.estimated_ingest_rate = 4096
        group.provisioned = True
        cribl.groups.update(
            product=ProductsCore.STREAM,
            id=group.id,
            id_param=group.id,
            name=group.name,
            description=group.description,
            cloud=group.cloud,
            worker_remote_access=group.worker_remote_access,
            is_fleet=group.is_fleet,
            is_search=group.is_search,
            on_prem=group.on_prem,
            estimated_ingest_rate=group.estimated_ingest_rate,
            provisioned=group.provisioned,
        )
        print(f"✅ Worker Group scaled and provisioned: {group.id}")


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except Exception as error:
        print(f"❌ Something went wrong: {error}")

3. Replicate a Worker Group

This example uses the following operations to programmatically create a replica Worker Group in Cribl Stream based on an existing Worker Group configuration:

  1. Retrieve a list of all Worker Groups in Cribl Stream.
  2. Select the first Worker Group in the list as the source Worker Group to replicate.
  3. Retrieve the complete configuration of the source Worker Group.
  4. Create a new Worker Group that uses the same configuration as the source Worker Group. The replica Worker Group has a unique ID and a name and description that identify it as a replica.

To run this example, you must have at least one existing Worker Group in Cribl Stream.

Python SDK (Cribl.Cloud)Python SDK (Customer-Managed Deployment)
import asyncio
from typing import Optional

from cribl_control_plane import CriblControlPlane
from cribl_control_plane.models import (ConfigGroup, ProductsCore,
                                        SchemeClientOauth, Security)
ORG_ID = "your-org-id"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"
WORKSPACE_NAME = "your-workspace-name"
base_url = f"https://{WORKSPACE_NAME}-{ORG_ID}.cribl.cloud/api/v1"


async def main() -> None:
    """Main function that demonstrates Worker Group replication"""
    client_oauth = SchemeClientOauth(
        client_id=CLIENT_ID,
        client_secret=CLIENT_SECRET,
        token_url="https://login.cribl.cloud/oauth/token",
        audience="https://api.cribl.cloud",
    )

    security = Security(client_oauth=client_oauth)
    cribl = CriblControlPlane(server_url=base_url, security=security)
    try:
        # Get the first listed Worker Group
        worker_groups = cribl.groups.list(product=ProductsCore.STREAM)

        if worker_groups.items and len(worker_groups.items) > 0:
            first_worker_group = worker_groups.items[0]
            print(f"Replicating worker group: {first_worker_group.id}")

            # Replicate the first Worker Group
            replicate_worker_group(cribl, first_worker_group.id)
        else:
            print(
                "No Worker Groups found. Create at least one Worker Group before trying again."
            )
            exit(1)

    except Exception as error:
        message = str(error)
        print(f"Error: {message}")
        exit(1)


def replicate_worker_group(
    client: CriblControlPlane, source_id: str
) -> Optional[ConfigGroup]:
    """
    Replicates a Worker Group with a unique ID:

    - Retrieve the source Worker Group configuration
    - Generate a unique ID and name to use for the replica Worker Group
    - Filter out read-only fields from the source Worker Group configuration
    - Create the replica Worker Group with proper metadata

    Args:
        client: Cribl client instance
        source_id: ID of the Worker Group to replicate

    Returns:
        The created replica ConfigGroup or None if creation fails

    Raises:
        Exception: If the source Worker Group is not found or creation fails
    """
    try:
        # Retrieve the source Worker Group configuration
        source_response = client.groups.get(
            id=source_id, product=ProductsCore.STREAM)

        if not source_response.items or len(source_response.items) == 0:
            raise Exception(f"Worker Group '{source_id}' not found")

        source = source_response.items[0]

        # Generate a unique ID and name for the replica Worker Group
        replica_id = f"{source_id}-replica"
        replica_name = (
            f"{source.name}-replica" if source.name else f"{source_id}-replica"
        )

        # Create the replica Worker Group by copying the configuration of the source Worker Group
        # Filter out read-only fields like config_version, lookup_deployments, and worker_count
        result = client.groups.create(
            product=ProductsCore.STREAM,
            id=replica_id,
            name=replica_name,
            description=f"Replica of '{source_id}'",
            on_prem=source.on_prem,
            worker_remote_access=source.worker_remote_access,
            cloud=source.cloud,
            provisioned=source.provisioned,
            is_fleet=source.is_fleet,
            is_search=source.is_search,
            estimated_ingest_rate=source.estimated_ingest_rate,
            inherits=source.inherits,
            max_worker_age=source.max_worker_age,
            streamtags=source.streamtags,
            tags=source.tags,
            type_=source.type,
            upgrade_version=source.upgrade_version,
        )

        if result.items and len(result.items) > 0:
            created = result.items[0]
            print(f"✅ Worker Group replicated: {created.id}")
            return created
        else:
            raise Exception("Failed to create replica Worker Group")

    except Exception as error:
        message = str(error)
        print(f"Failed to replicate Worker Group: {message}")
        raise error


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except Exception as error:
        message = str(error)
        print(f"Something went wrong: {message}")
"""
Replace the placeholder values for ONPREM_SERVER_URL, ONPREM_USERNAME, and
ONPREM_PASSWORD with your server URL and credentials. Your credentials are
sensitive information and should be kept private.

NOTE: This example is for customer-managed deployments only.
"""

import asyncio
from typing import Optional

from cribl_control_plane import CriblControlPlane
from cribl_control_plane.models import ConfigGroup, ProductsCore, Security

ONPREM_SERVER_URL = "http://localhost:9000"  # Replace with your server URL
ONPREM_USERNAME = "admin"  # Replace with your username
ONPREM_PASSWORD = "admin"  # Replace with your password
WORKER_GROUP_ID = "your-worker-group-id" # Use the same Worker Group ID as in previous examples
base_url = f"{ONPREM_SERVER_URL}/api/v1"

group = ConfigGroup(
    id=WORKER_GROUP_ID,
    name="my-worker-group",
    description="My Worker Group description",
    worker_remote_access=True,
    is_fleet=False,
    is_search=False,
    on_prem=True,
    worker_count=8,
)

group_url = f"{base_url}/m/{group.id}"


async def main() -> None:
    """Main function that demonstrates Worker Group replication"""
    cribl = CriblControlPlane(server_url=base_url)
    response = await cribl.auth.tokens.get_async(
        username=ONPREM_USERNAME, password=ONPREM_PASSWORD
    )
    token = response.token
    security = Security(bearer_auth=token)
    cribl_client = CriblControlPlane(server_url=base_url, security=security)
    try:
        # Get the first listed Worker Group
        worker_groups = cribl_client.groups.list(product=ProductsCore.STREAM)

        if worker_groups.items and len(worker_groups.items) > 0:
            first_worker_group = worker_groups.items[0]
            print(f"Replicating Worker Group: {first_worker_group.id}")

            # Replicate the first Worker Group
            replicate_worker_group(cribl_client, first_worker_group.id)
        else:
            print(
                "No Worker Groups found. Create at least one Worker Group before trying again."
            )
            exit(1)

    except Exception as error:
        message = str(error)
        print(f"Error: {message}")
        exit(1)


def replicate_worker_group(
    client: CriblControlPlane, source_id: str
) -> Optional[ConfigGroup]:
    """
    Replicates a Worker Group with a unique ID:

    - Retrieve the source Worker Group configuration
    - Generate a unique ID and name to use for the replica Worker Group
    - Filter out read-only fields from the source Worker Group configuration
    - Create the replica Worker Group with proper metadata

    Args:
        client: Cribl client instance
        source_id: ID of the Worker Group to replicate

    Returns:
        The created replica ConfigGroup or None if creation fails

    Raises:
        Exception: If the source Worker Group is not found or creation fails
    """
    try:
        # Retrieve the source Worker Group configuration
        source_response = client.groups.get(
            id=source_id, product=ProductsCore.STREAM)

        if not source_response.items or len(source_response.items) == 0:
            raise Exception(f"Worker Group '{source_id}' not found")

        source = source_response.items[0]

        # Generate a unique ID and name for the replica Worker Group
        replica_id = f"{source_id}-replica"
        replica_name = (
            f"{source.name}-replica" if source.name else f"{source_id}-replica"
        )

        # Create the replica Worker Group by copying the configuration of the source Worker Group
        # Filter out read-only fields like config_version, lookup_deployments, and worker_count
        result = client.groups.create(
            product=ProductsCore.STREAM,
            id=replica_id,
            name=replica_name,
            description=f"Replica of '{source_id}'",
            on_prem=source.on_prem,
            worker_remote_access=source.worker_remote_access,
            cloud=source.cloud,
            provisioned=source.provisioned,
            is_fleet=source.is_fleet,
            is_search=source.is_search,
            estimated_ingest_rate=source.estimated_ingest_rate,
            inherits=source.inherits,
            max_worker_age=source.max_worker_age,
            streamtags=source.streamtags,
            tags=source.tags,
            type_=source.type,
            upgrade_version=source.upgrade_version,
        )

        if result.items and len(result.items) > 0:
            created = result.items[0]
            print(f"✅ Worker Group replicated: {created.id}")
            return created
        else:
            raise Exception("Failed to create replica Worker Group")

    except Exception as error:
        message = str(error)
        print(f"Failed to replicate Worker Group: {message}")
        raise error


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except Exception as error:
        message = str(error)
        print(f"Something went wrong: {message}")

4. Confirm the Worker Group Configuration

Use this example to retrieve a list of all Worker Groups in Cribl Stream so that you can review and confirm their configurations.

Python SDK (Cribl.Cloud)Python SDK (Customer-Managed Deployment)
"""
Replace the placeholder values for ORG_ID, CLIENT_ID, CLIENT_SECRET,
and WORKSPACE_NAME with your Organization ID, Client ID and Secret, and
Workspace name. To get your CLIENT_ID and CLIENT_SECRET values, follow
the steps at https://docs.cribl.io/cribl-as-code/authentication/#cloud-auth.
Your Client ID and Secret are sensitive information and should be kept private.

NOTE: This example is for Cribl.Cloud deployments only.
"""

import asyncio

from cribl_control_plane import CriblControlPlane
from cribl_control_plane.models import (ProductsCore, SchemeClientOauth,
                                        Security)

ORG_ID = "your-org-id"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"
WORKSPACE_NAME = "your-workspace-name"
base_url = f"https://{WORKSPACE_NAME}-{ORG_ID}.cribl.cloud/api/v1"


async def main():
    # Create authenticated SDK client
    client_oauth = SchemeClientOauth(
        client_id=CLIENT_ID,
        client_secret=CLIENT_SECRET,
        token_url="https://login.cribl.cloud/oauth/token",
        audience="https://api.cribl.cloud",
    )

    security = Security(client_oauth=client_oauth)
    cribl = CriblControlPlane(server_url=base_url, security=security)

    # Retrieve and list all Worker Groups
    worker_groups_response = cribl.groups.list(product=ProductsCore.STREAM)

    if worker_groups_response.items:
        print(f"✅ List of Worker Group Configurations:")
        for group in worker_groups_response.items:
            print(group)  # Print the entire configuration for each Worker Group
    else:
        print(f"❌ No Worker Groups found.")


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except Exception as error:
        print(f"❌ Something went wrong: {error}")
"""
Replace the placeholder values for ONPREM_SERVER_URL, ONPREM_USERNAME, and
ONPREM_PASSWORD with your server URL and credentials. Your credentials are
sensitive information and should be kept private.

NOTE: This example is for customer-managed deployments only.
"""

import asyncio

from cribl_control_plane import CriblControlPlane
from cribl_control_plane.models import ProductsCore, Security

ONPREM_SERVER_URL = "http://localhost:9000"  # Replace with your server URL
ONPREM_USERNAME = "admin"  # Replace with your username
ONPREM_PASSWORD = "admin"  # Replace with your password

base_url = f"{ONPREM_SERVER_URL}/api/v1"


async def main():
    cribl = CriblControlPlane(server_url=base_url)
    response = await cribl.auth.tokens.get_async(
        username=ONPREM_USERNAME, password=ONPREM_PASSWORD
    )
    token = response.token
    security = Security(bearer_auth=token)
    cribl = CriblControlPlane(server_url=base_url, security=security)

    # Retrieve and list all Worker Groups
    worker_groups_response = cribl.groups.list(product=ProductsCore.STREAM)

    if worker_groups_response.items:
        print(f"✅ List of Worker Group Configurations:")
        for group in worker_groups_response.items:
            print(group)  # Print the entire configuration for each Worker Group
    else:
        print(f"❌ No Worker Groups found.")


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except Exception as error:
        print(f"❌ Something went wrong: {error}")

5. Commit and Deploy the Worker Group

This example demonstrates how to commit and deploy the Worker Group configuration, then commit to the Leader to keep it in sync with the Worker Group. You can commit and deploy immediately after a single create or update request or after multiple requests.

Committing and deploying the Worker Group configuration requires three requests, which the Python SDK example chains together:

  1. Commit pending changes to the Worker Group. This request commits only the configuration changes for Worker Groups by specifying the file local/cribl/groups.yml.
  2. Deploy the committed changes to the Worker Group. This request includes the version body parameter, which uses the value of commit from the response body for the commit request.
  3. Commit the changes to the Leader to keep the Leader in sync with the Worker Group.
Python SDK (Cribl.Cloud)Python SDK (Customer-Managed Deployment)
"""
Replace the placeholder values for ORG_ID, CLIENT_ID, CLIENT_SECRET,
and WORKSPACE_NAME with your Organization ID, Client ID and Secret, and
Workspace name. To get your CLIENT_ID and CLIENT_SECRET values, follow
the steps at https://docs.cribl.io/cribl-as-code/authentication/#cloud-auth.
Your Client ID and Secret are sensitive information and should be kept private.

NOTE: This example is for Cribl.Cloud deployments only.
"""

import asyncio
from cribl_control_plane import CriblControlPlane
from cribl_control_plane.models import ConfigGroup, ConfigGroupCloud, CloudProvider, ProductsCore, Security, SchemeClientOauth

ORG_ID = "your-org-id"
CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"
WORKSPACE_NAME = "your-workspace-name"
WORKER_GROUP_ID = "your-cloud-worker-group-id" # Use the same Worker Group ID as in previous examples

base_url = f"https://{WORKSPACE_NAME}-{ORG_ID}.cribl.cloud/api/v1"

async def main():
    # Create authenticated SDK client
    client_oauth = SchemeClientOauth(
        client_id=CLIENT_ID,
        client_secret=CLIENT_SECRET,
        token_url="https://login.cribl.cloud/oauth/token",
        audience="https://api.cribl.cloud",
    )

    security = Security(client_oauth=client_oauth)
    cribl = CriblControlPlane(server_url=base_url, security=security)

    # Commit configuration changes
    commit_response = cribl.versions.commits.create(
        group_id=my_worker_group.id,
        message="Commit for Cribl Stream example",
        effective=True,
        files=["."]
    )
    
    if not commit_response.items or len(commit_response.items) == 0:
        raise Exception("Failed to commit configuration changes")
    
    version = commit_response.items[0].commit
    print(f"✅ Committed configuration changes to the group: {my_worker_group.id}, commit ID: {version}")

    # Deploy configuration changes
    cribl.groups.deploy(
        product=ProductsCore.STREAM,
        id=my_worker_group.id,
        version=version
    )
    print(f"✅ Worker Group changes deployed: {my_worker_group.id}")


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except Exception as error:
        print(f"❌ Something went wrong: {error}")
"""
Replace the placeholder values for ONPREM_SERVER_URL, ONPREM_USERNAME, and
ONPREM_PASSWORD with your server URL and credentials. Your credentials are
sensitive information and should be kept private.

NOTE: This example is for customer-managed deployments only.
"""

import asyncio
from cribl_control_plane import CriblControlPlane
from cribl_control_plane.models import ProductsCore, Security

ONPREM_SERVER_URL = "http://localhost:9000"  # Replace with your server URL
ONPREM_USERNAME = "admin"  # Replace with your username
ONPREM_PASSWORD = "admin"  # Replace with your password
WORKER_GROUP_ID = "your-worker-group-id" # Use the same Worker Group ID as in previous examples
base_url = f"{ONPREM_SERVER_URL}/api/v1"

group_url = f"{base_url}/m/{group.id}"


async def main():
    # Initialize Cribl client
    cribl = CriblControlPlane(server_url=base_url)
    response = await cribl.auth.tokens.get_async(
        username=ONPREM_USERNAME, password=ONPREM_PASSWORD
    )
    token = response.token
    security = Security(bearer_auth=token)
    cribl = CriblControlPlane(server_url=base_url, security=security)

    # Commit configuration changes
    commit_response = cribl.versions.commits.create(
        group_id=my_worker_group.id,
        message="Commit for Cribl Stream example",
        effective=True,
        files=["."]
    )
    
    if not commit_response.items or len(commit_response.items) == 0:
        raise Exception("Failed to commit configuration changes")
    
    version = commit_response.items[0].commit
    print(f"✅ Committed configuration changes to the group: {my_worker_group.id}, commit ID: {version}")

    # Deploy configuration changes
    cribl.groups.deploy(
        product=ProductsCore.STREAM,
        id=my_worker_group.id,
        version=version
    )
    print(f"✅ Worker Group changes deployed: {my_worker_group.id}")


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except Exception as error:
        print(f"❌ Something went wrong: {error}")

Configure Worker Groups with the Cribl API

The examples in this section demonstrate how to create, scale, replicate, and deploy Worker Groups using the Cribl API.

The examples for creating and scaling a Worker Group on Cribl.Cloud include the estimatedIngestRate property, which allows you to configure Worker Groups for optimal performance. For each supported estimatedIngestRate value, the following table maps the corresponding throughput and number of Worker Processes:

estimatedIngestRateThroughputWorker Processes
102412 MB/s6
204824 MB/s9
307236 MB/s14
409648 MB/s21
512060 MB/s30
716884 MB/s45
10240120 MB/s62
13312156 MB/s93
15360180 MB/s186

1. Create a Worker Group

This example creates a new Worker Group in Cribl Stream.

In the Cribl.Cloud example, the estimatedIngestRate is set to 2048, which is equivalent to a maximum of 24 MB/s with nine Worker Processes. In the customer-managed deployment example, the new Worker Group is created with eight Workers.

API (Cribl.Cloud)API (Customer-Managed)
curl --request POST \
--url 'https://${workspaceName}-${organizationId}.cribl.cloud/api/v1/products/stream/groups' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
  "id": "my-worker-group",
  "name": "my-worker-group",
  "description": "Cribl.Cloud Worker Group",
  "cloud": {
    "provider": "aws",
    "region": "us-west-2"
  },
  "workerRemoteAccess": true,
  "isFleet": false,
  "isSearch": false,
  "onPrem": false,
  "estimatedIngestRate": 2048,
  "provisioned": false
}'
curl --request POST \
--url 'https://${hostname}-${port}/api/v1/products/stream/groups' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
  "id": "my-worker-group",
  "name": "my-worker-group",
  "description": "My Worker Group description",
  "workerRemoteAccess": true,
  "isFleet": false,
  "isSearch": false,
  "onPrem": true,
  "workerCount": 8
}'

2. Scale and Provision the Worker Group

The Cribl.Cloud example scales the Worker Group to an estimatedIngestRate of 4096, which is equivalent to a maximum of 48 MB/s with 21 Worker Processes. The example also sets provisioned to true to activate Cribl.Cloud resources.

The customer-managed deployment example scales the Worker Group to 11 Workers.

The PATCH /products/{product}/groups/{id} endpoint requires a complete representation of the Worker Group that you want to update in the request body. This endpoint does not support partial updates. Cribl removes any omitted fields when updating the Worker Group to scale it.

API (Cribl.Cloud)API (Customer-Managed)
curl --request PATCH \
--url 'https://${workspaceName}-${organizationId}.cribl.cloud/api/v1/products/stream/groups/my-worker-group' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
  "id": "my-worker-group",
  "name": "my-worker-group",
  "description": "Cribl.Cloud Worker Group",
  "cloud": {
    "provider": "aws",
    "region": "us-west-2"
  },
  "workerRemoteAccess": true,
  "isFleet": false,
  "isSearch": false,
  "onPrem": false,
  "estimatedIngestRate": 2048,
  "provisioned": true
}'
curl --request PATCH \
--url 'https://${hostname}-${port}/api/v1/products/stream/groups/my-worker-group' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
  "id": "my-worker-group",
  "name": "my-worker-group",
  "workerRemoteAccess": true,
  "description": "My Worker Group description",
  "isFleet": false,
  "isSearch": false,
  "onPrem": true,
  "workerCount": 11
}'

3. Replicate the Worker Group

Using the API to create a replica Worker Group in Cribl Stream based on an existing Worker Group configuration requires the following requests:

  1. Retrieve a list of all Worker Groups to identify the source Worker Group that you want to replicate.
  2. Retrieve the complete configuration of the source Worker Group.
  3. Create a new Worker Group that uses the same configuration as the source Worker Group.

To use these examples, you must have at least one existing Worker Group in Cribl Stream.

First, retrieve a list of all Worker Groups in Cribl Stream to identify the source Worker Group that you want to replicate. The response to this request includes the id for each Worker Group. You need the id of the source Worker Group to retrieve its configuration in the next request.

If you already know the id of the Worker Group that you want to replicate, you can skip this and proceed with the next request.

API (Cribl.Cloud)API (Customer-Managed)
curl --request GET \
--url 'https://${workspaceName}-${organizationId}.cribl.cloud/api/v1/products/stream/groups' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json'
curl --request GET \
--url 'https://${hostname}-${port}/api/v1/products/stream/groups' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json'

Next, retrieve the complete configuration of the source Worker Group using its id. In this example, the id is my-worker-group.

API (Cribl.Cloud)API (Customer-Managed)
curl --request GET \
--url 'https://${workspaceName}-${organizationId}.cribl.cloud/api/v1/products/stream/groups/my-worker-group' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json'
curl --request GET \
--url 'https://${hostname}-${port}/api/v1/products/stream/groups/my-worker-group' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json'

Finally, copy the configuration for the source Worker Group from the response to the previous request to use as the basis for the replica Worker Group. Update the id and name to use unique values and add a description that identifies the Worker Group as a replica. Remove any read-only attributes like lookupDeployments, workerCount, and configVersion.

API (Cribl.Cloud)API (Customer-Managed)
curl --request POST \
--url 'https://${workspaceName}-${organizationId}.cribl.cloud/api/v1/products/stream/groups' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
  "id": "my-replica-worker-group",
  "name": "my-replica-worker-group",
  "description": "Replica of my-worker-group",
  "cloud": {
    "provider": "aws",
    "region": "us-west-2"
  },
  "workerRemoteAccess": true,
  "isFleet": false,
  "isSearch": false,
  "onPrem": false,
  "estimatedIngestRate": 2048,
  "provisioned": false
}'
curl --request POST \
--url 'https://${hostname}-${port}/api/v1/products/stream/groups' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
  "id": "my-replica-worker-group",
  "name": "my-replica-worker-group",
  "description": "Replica of my-worker-group",
  "workerRemoteAccess": true,
  "isFleet": false,
  "isSearch": false,
  "onPrem": true,
  "workerCount": 8
}'

4. Confirm the Worker Group Configuration

Use this example to retrieve a list of all Worker Groups in Cribl Stream so that you can review and confirm their configurations.

API (Cribl.Cloud)API (Customer-Managed)
curl --request GET \
--url 'https://${workspaceName}-${organizationId}.cribl.cloud/api/v1/products/stream/groups' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json'
curl --request GET \
--url 'https://${hostname}-${port}/api/v1/products/stream/groups' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json'

5. Commit and Deploy the Worker Group

This example demonstrates how to commit and deploy the Worker Group configuration, then commit to the Leader to keep it in sync with the Worker Group. You can commit and deploy immediately after a single create or update request or after multiple requests.

First, commit pending changes to the Worker Group. This request commits only the configuration changes for Worker Groups by specifying the file local/cribl/groups.yml.

API (Cribl.Cloud)API (Customer-Managed)
curl --request POST \
--url 'https://${workspaceName}-${organizationId}.cribl.cloud/api/v1/version/commit?groupId=my-worker-group' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
  "files": [
    "local/cribl/groups.yml"
  ],
  "message": "Commit Worker Group configuration"
}'
curl --request POST \
--url 'https://${hostname}-${port}/api/v1/version/commit?groupId=my-worker-group' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
  "files": [
    "local/cribl/groups.yml"
  ],
  "message": "Commit Worker Group configuration"
}'

Second, deploy the committed changes to the Worker Group. This request includes the version body parameter, which uses the value of commit from the response body for the commit request.

API (Cribl.Cloud)API (Customer-Managed)
curl --request PATCH \
--url 'https://${workspaceName}-${organizationId}.cribl.cloud/api/v1/products/stream/groups/my-worker-group/deploy' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
  "version": 1234abcd5678efgh9012ijkl3456mnop7EXAMPLE
}'
curl --request PATCH \
--url 'https://${hostname}-${port}/api/v1/products/stream/groups/my-worker-group/deploy' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
  "version": 1234abcd5678efgh9012ijkl3456mnop7EXAMPLE
}'

Finally, commit the changes to the Leader to keep the Leader in sync with the Worker Group.

API (Cribl.Cloud)API (Customer-Managed)
curl --request POST \
--url 'https://${workspaceName}-${organizationId}.cribl.cloud/api/v1/version/commit' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
  "message": "Sync my-worker-group with Leader"
}
curl --request POST \
--url 'https://${hostname}-${port}/api/v1/version/commit' \
--header 'Authorization: Bearer ${token}' \
--header 'Content-Type: application/json' \
--data '{
  "message": "Sync my-worker-group with Leader"
}