Skip to content

criblio/cribl_control_plane_sdk_python

Repository files navigation

cribl_control_plane_sdk_python

The Cribl Python SDK for the control plane provides operational control over Cribl resources and helps streamline the process of integrating with Cribl.

In addition to the usage examples in this repository, the Cribl documentation includes code examples for common use cases.

Complementary API reference documentation is available at https://docs.cribl.io/cribl-as-code/api-reference. Product documentation is available at https://docs.cribl.io.

Important

Preview Feature The Cribl SDKs are Preview features that are still being developed. We do not recommend using them in a production environment, because the features might not be fully tested or optimized for performance, and related documentation could be incomplete.

Please continue to submit feedback through normal Cribl support channels, but assistance might be limited while the features remain in Preview.

Table of Contents

SDK Installation

Note

Python version upgrade policy

Once a Python version reaches its official end of life date, a 3-month grace period is provided for users to upgrade. Following this grace period, the minimum python version supported in the SDK will be updated.

The SDK can be installed with uv, pip, or poetry package managers.

uv

uv is a fast Python package installer and resolver, designed as a drop-in replacement for pip and pip-tools. It's recommended for its speed and modern Python tooling capabilities.

uv add cribl-control-plane

PIP

PIP is the default package installer for Python, enabling easy installation and management of packages from PyPI via the command line.

pip install cribl-control-plane

Poetry

Poetry is a modern tool that simplifies dependency management and package publishing by using a single pyproject.toml file to handle project metadata and dependencies.

poetry add cribl-control-plane

Shell and script usage with uv

You can use this SDK in a Python shell with uv and the uvx command that comes with it like so:

uvx --from cribl-control-plane python

It's also possible to write a standalone Python script without needing to set up a whole project like so:

#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.9"
# dependencies = [
#     "cribl-control-plane",
# ]
# ///

from cribl_control_plane import CriblControlPlane

sdk = CriblControlPlane(
  # SDK arguments
)

# Rest of script here...

Once that is saved to a file, you can run it with uv run script.py where script.py can be replaced with the actual file name.

IDE Support

PyCharm

Generally, the SDK will work well with most IDEs out of the box. However, when using PyCharm, you can enjoy much better integration with Pydantic by installing an additional plugin.

SDK Example Usage

Example

# Synchronous Example
from cribl_control_plane import CriblControlPlane, models
import os


with CriblControlPlane(
    server_url="https://api.example.com",
    security=models.Security(
        bearer_auth=os.getenv("CRIBLCONTROLPLANE_BEARER_AUTH", ""),
    ),
) as ccp_client:

    res = ccp_client.lake_datasets.create(lake_id="<id>", id="<id>", accelerated_fields=[
        "<value 1>",
        "<value 2>",
    ], bucket_name="<value>", cache_connection={
        "accelerated_fields": [
            "<value 1>",
            "<value 2>",
        ],
        "backfill_status": models.CacheConnectionBackfillStatus.PENDING,
        "cache_ref": "<value>",
        "created_at": 7795.06,
        "lakehouse_connection_type": models.LakehouseConnectionType.CACHE,
        "migration_query_id": "<id>",
        "retention_in_days": 1466.58,
    }, deletion_started_at=8310.58, description="pleased toothbrush long brush smooth swiftly rightfully phooey chapel", format_=models.CriblLakeDatasetFormat.DDSS, http_da_used=True, metrics={
        "current_size_bytes": 6170.04,
        "metrics_date": "<value>",
    }, retention_period_in_days=456.37, search_config={
        "datatypes": [
            "<value 1>",
        ],
        "metadata": {
            "earliest": "<value>",
            "enable_acceleration": True,
            "field_list": [
                "<value 1>",
                "<value 2>",
            ],
            "latest_run_info": {
                "earliest_scanned_time": 4334.7,
                "finished_at": 6811.22,
                "latest_scanned_time": 5303.3,
                "object_count": 9489.04,
            },
            "scan_mode": models.ScanMode.DETAILED,
        },
    }, storage_location_id="<id>", view_name="<value>")

    # Handle response
    print(res)

The same SDK client can also be used to make asynchronous requests by importing asyncio.

# Asynchronous Example
import asyncio
from cribl_control_plane import CriblControlPlane, models
import os

async def main():

    async with CriblControlPlane(
        server_url="https://api.example.com",
        security=models.Security(
            bearer_auth=os.getenv("CRIBLCONTROLPLANE_BEARER_AUTH", ""),
        ),
    ) as ccp_client:

        res = await ccp_client.lake_datasets.create_async(lake_id="<id>", id="<id>", accelerated_fields=[
            "<value 1>",
            "<value 2>",
        ], bucket_name="<value>", cache_connection={
            "accelerated_fields": [
                "<value 1>",
                "<value 2>",
            ],
            "backfill_status": models.CacheConnectionBackfillStatus.PENDING,
            "cache_ref": "<value>",
            "created_at": 7795.06,
            "lakehouse_connection_type": models.LakehouseConnectionType.CACHE,
            "migration_query_id": "<id>",
            "retention_in_days": 1466.58,
        }, deletion_started_at=8310.58, description="pleased toothbrush long brush smooth swiftly rightfully phooey chapel", format_=models.CriblLakeDatasetFormat.DDSS, http_da_used=True, metrics={
            "current_size_bytes": 6170.04,
            "metrics_date": "<value>",
        }, retention_period_in_days=456.37, search_config={
            "datatypes": [
                "<value 1>",
            ],
            "metadata": {
                "earliest": "<value>",
                "enable_acceleration": True,
                "field_list": [
                    "<value 1>",
                    "<value 2>",
                ],
                "latest_run_info": {
                    "earliest_scanned_time": 4334.7,
                    "finished_at": 6811.22,
                    "latest_scanned_time": 5303.3,
                    "object_count": 9489.04,
                },
                "scan_mode": models.ScanMode.DETAILED,
            },
        }, storage_location_id="<id>", view_name="<value>")

        # Handle response
        print(res)

asyncio.run(main())

Authentication

Except for the health.get and auth.tokens.get methods, all Cribl SDK requests require you to authenticate with a Bearer token. You must include a valid Bearer token in the configuration when initializing your SDK client. The Bearer token verifies your identity and ensures secure access to the requested resources. The SDK automatically manages the Authorization header for subsequent requests once properly authenticated.

For information about Bearer token expiration, see Token Management in the Cribl as Code documentation.

Authentication happens once during SDK initialization. After you initialize the SDK client with authentication as shown in the authentication examples, the SDK automatically handles authentication for all subsequent API calls. You do not need to include authentication parameters in individual API requests. The SDK Example Usage section shows how to initialize the SDK and make API calls, but if you've properly initialized your client as shown in the authentication examples, you only need to make the API method calls themselves without re-initializing.

Per-Client Security Schemes

This SDK supports the following security schemes globally:

Name Type Scheme Environment Variable
bearer_auth http HTTP Bearer CRIBLCONTROLPLANE_BEARER_AUTH
client_oauth oauth2 OAuth2 token CRIBLCONTROLPLANE_CLIENT_OAUTH

To configure authentication on Cribl.Cloud and in hybrid deployments, use the client_oauth security scheme. The SDK uses the OAuth credentials that you provide to obtain a Bearer token and refresh the token within its expiration window using the standard OAuth2 flow.

In on-prem deployments, use the bearer_auth security scheme. The SDK uses the username/password credentials that you provide to obtain a Bearer token. Automatically refreshing the Bearer token within its expiration window requires a callback function as shown in the On-Prem Authentication Example.

Set the security scheme through the security optional parameter when initializing the SDK client instance. The SDK uses the selected scheme by default to authenticate with the API for all operations that support it.

Authentication Examples

The Cribl.Cloud and Hybrid Authentication Example demonstrates how to configure authentication on Cribl.Cloud and in hybrid deployments. To obtain the Client ID and Client Secret you'll need to initialize using the client_oauth security schema, follow the instructions for creating an API Credential in the Cribl as Code documentation.

The On-Prem Authentication Example demonstrates how to configure authentication in on-prem deployments using your username and password.

Available Resources and Operations

Available methods
  • get - Log in and fetch an authentication token
  • list - List all Destinations
  • create - Create a Destination
  • get - Get a Destination
  • update - Update a Destination
  • delete - Delete a Destination
  • clear - Clear the persistent queue for a Destination
  • get - Get information about the latest job to clear the persistent queue for a Destination
  • get - Get sample event data for a Destination
  • create - Send sample event data to a Destination
  • list - List all Worker Groups or Edge Fleets for the specified Cribl product
  • create - Create a Worker Group or Edge Fleet for the specified Cribl product
  • get - Get a Worker Group or Edge Fleet
  • update - Update a Worker Group or Edge Fleet
  • delete - Delete a Worker Group or Edge Fleet
  • deploy - Deploy commits to a Worker Group or Edge Fleet
  • get - Get the Access Control List for a Worker Group or Edge Fleet
  • get - Get the Access Control List for teams with permissions on a Worker Group or Edge Fleet for the specified Cribl product
  • get - Get the configuration version for a Worker Group or Edge Fleet
  • get - Retrieve health status of the server
  • create - Create a Lake Dataset
  • list - List all Lake Datasets
  • delete - Delete a Lake Dataset
  • get - Get a Lake Dataset
  • update - Update a Lake Dataset
  • list - Get detailed metadata for Worker and Edge Nodes
  • count - Get a count of Worker and Edge Nodes
  • get - Get a summary of the Distributed deployment
  • list - List all Pipelines
  • create - Create a Pipeline
  • get - Get a Pipeline
  • update - Update a Pipeline
  • delete - Delete a Pipeline
  • list - List all Routes
  • get - Get a Routing table
  • update - Update a Route
  • append - Add a Route to the end of the Routing table
  • create - Add an HEC token and optional metadata to a Splunk HEC Source
  • update - Update metadata for an HEC token for a Splunk HEC Source
  • list - List all branches in the Git repository used for Cribl configuration
  • get - Get the name of the Git branch that the Cribl configuration is checked out to
  • create - Create a new commit for pending changes to the Cribl configuration
  • diff - Get the diff for a commit
  • list - List the commit history
  • push - Push local commits to the remote repository
  • revert - Revert a commit in the local repository
  • get - Get the diff and log message for a commit
  • undo - Discard uncommitted (staged) changes
  • count - Get a count of files that changed since a commit
  • list - Get the names and statuses of files that changed since a commit
  • get - Get the configuration and status for the Git integration
  • get - Get the status of the current working tree

File uploads

Certain SDK methods accept file objects as part of a request body or multi-part request. It is possible and typically recommended to upload files as a stream rather than reading the entire contents into memory. This avoids excessive memory consumption and potentially crashing with out-of-memory errors when working with very large files. The following example demonstrates how to attach a file stream to a request.

Tip

For endpoints that handle file uploads bytes arrays can also be used. However, using streams is recommended for large files.

from cribl_control_plane import CriblControlPlane, models
import os


with CriblControlPlane(
    server_url="https://api.example.com",
    security=models.Security(
        bearer_auth=os.getenv("CRIBLCONTROLPLANE_BEARER_AUTH", ""),
    ),
) as ccp_client:

    res = ccp_client.packs.upload(filename="example.file", request_body=open("example.file", "rb"))

    # Handle response
    print(res)

Retries

Some of the endpoints in this SDK support retries. If you use the SDK without any configuration, it will fall back to the default retry strategy provided by the API. However, the default retry strategy can be overridden on a per-operation basis, or across the entire SDK.

To change the default retry strategy for a single API call, simply provide a RetryConfig object to the call:

from cribl_control_plane import CriblControlPlane, models
from cribl_control_plane.utils import BackoffStrategy, RetryConfig
import os


with CriblControlPlane(
    server_url="https://api.example.com",
    security=models.Security(
        bearer_auth=os.getenv("CRIBLCONTROLPLANE_BEARER_AUTH", ""),
    ),
) as ccp_client:

    res = ccp_client.lake_datasets.create(lake_id="<id>", id="<id>", accelerated_fields=[
        "<value 1>",
        "<value 2>",
    ], bucket_name="<value>", cache_connection={
        "accelerated_fields": [
            "<value 1>",
            "<value 2>",
        ],
        "backfill_status": models.CacheConnectionBackfillStatus.PENDING,
        "cache_ref": "<value>",
        "created_at": 7795.06,
        "lakehouse_connection_type": models.LakehouseConnectionType.CACHE,
        "migration_query_id": "<id>",
        "retention_in_days": 1466.58,
    }, deletion_started_at=8310.58, description="pleased toothbrush long brush smooth swiftly rightfully phooey chapel", format_=models.CriblLakeDatasetFormat.DDSS, http_da_used=True, metrics={
        "current_size_bytes": 6170.04,
        "metrics_date": "<value>",
    }, retention_period_in_days=456.37, search_config={
        "datatypes": [
            "<value 1>",
        ],
        "metadata": {
            "earliest": "<value>",
            "enable_acceleration": True,
            "field_list": [
                "<value 1>",
                "<value 2>",
            ],
            "latest_run_info": {
                "earliest_scanned_time": 4334.7,
                "finished_at": 6811.22,
                "latest_scanned_time": 5303.3,
                "object_count": 9489.04,
            },
            "scan_mode": models.ScanMode.DETAILED,
        },
    }, storage_location_id="<id>", view_name="<value>",
        RetryConfig("backoff", BackoffStrategy(1, 50, 1.1, 100), False))

    # Handle response
    print(res)

If you'd like to override the default retry strategy for all operations that support retries, you can use the retry_config optional parameter when initializing the SDK:

from cribl_control_plane import CriblControlPlane, models
from cribl_control_plane.utils import BackoffStrategy, RetryConfig
import os


with CriblControlPlane(
    server_url="https://api.example.com",
    retry_config=RetryConfig("backoff", BackoffStrategy(1, 50, 1.1, 100), False),
    security=models.Security(
        bearer_auth=os.getenv("CRIBLCONTROLPLANE_BEARER_AUTH", ""),
    ),
) as ccp_client:

    res = ccp_client.lake_datasets.create(lake_id="<id>", id="<id>", accelerated_fields=[
        "<value 1>",
        "<value 2>",
    ], bucket_name="<value>", cache_connection={
        "accelerated_fields": [
            "<value 1>",
            "<value 2>",
        ],
        "backfill_status": models.CacheConnectionBackfillStatus.PENDING,
        "cache_ref": "<value>",
        "created_at": 7795.06,
        "lakehouse_connection_type": models.LakehouseConnectionType.CACHE,
        "migration_query_id": "<id>",
        "retention_in_days": 1466.58,
    }, deletion_started_at=8310.58, description="pleased toothbrush long brush smooth swiftly rightfully phooey chapel", format_=models.CriblLakeDatasetFormat.DDSS, http_da_used=True, metrics={
        "current_size_bytes": 6170.04,
        "metrics_date": "<value>",
    }, retention_period_in_days=456.37, search_config={
        "datatypes": [
            "<value 1>",
        ],
        "metadata": {
            "earliest": "<value>",
            "enable_acceleration": True,
            "field_list": [
                "<value 1>",
                "<value 2>",
            ],
            "latest_run_info": {
                "earliest_scanned_time": 4334.7,
                "finished_at": 6811.22,
                "latest_scanned_time": 5303.3,
                "object_count": 9489.04,
            },
            "scan_mode": models.ScanMode.DETAILED,
        },
    }, storage_location_id="<id>", view_name="<value>")

    # Handle response
    print(res)

Error Handling

CriblControlPlaneError is the base class for all HTTP error responses. It has the following properties:

Property Type Description
err.message str Error message
err.status_code int HTTP response status code eg 404
err.headers httpx.Headers HTTP response headers
err.body str HTTP body. Can be empty string if no body is returned.
err.raw_response httpx.Response Raw HTTP response
err.data Optional. Some errors may contain structured data. See Error Classes.

Example

from cribl_control_plane import CriblControlPlane, errors, models
import os


with CriblControlPlane(
    server_url="https://api.example.com",
    security=models.Security(
        bearer_auth=os.getenv("CRIBLCONTROLPLANE_BEARER_AUTH", ""),
    ),
) as ccp_client:
    res = None
    try:

        res = ccp_client.lake_datasets.create(lake_id="<id>", id="<id>", accelerated_fields=[
            "<value 1>",
            "<value 2>",
        ], bucket_name="<value>", cache_connection={
            "accelerated_fields": [
                "<value 1>",
                "<value 2>",
            ],
            "backfill_status": models.CacheConnectionBackfillStatus.PENDING,
            "cache_ref": "<value>",
            "created_at": 7795.06,
            "lakehouse_connection_type": models.LakehouseConnectionType.CACHE,
            "migration_query_id": "<id>",
            "retention_in_days": 1466.58,
        }, deletion_started_at=8310.58, description="pleased toothbrush long brush smooth swiftly rightfully phooey chapel", format_=models.CriblLakeDatasetFormat.DDSS, http_da_used=True, metrics={
            "current_size_bytes": 6170.04,
            "metrics_date": "<value>",
        }, retention_period_in_days=456.37, search_config={
            "datatypes": [
                "<value 1>",
            ],
            "metadata": {
                "earliest": "<value>",
                "enable_acceleration": True,
                "field_list": [
                    "<value 1>",
                    "<value 2>",
                ],
                "latest_run_info": {
                    "earliest_scanned_time": 4334.7,
                    "finished_at": 6811.22,
                    "latest_scanned_time": 5303.3,
                    "object_count": 9489.04,
                },
                "scan_mode": models.ScanMode.DETAILED,
            },
        }, storage_location_id="<id>", view_name="<value>")

        # Handle response
        print(res)


    except errors.CriblControlPlaneError as e:
        # The base class for HTTP error responses
        print(e.message)
        print(e.status_code)
        print(e.body)
        print(e.headers)
        print(e.raw_response)

        # Depending on the method different errors may be thrown
        if isinstance(e, errors.Error):
            print(e.data.message)  # Optional[str]

Error Classes

Primary errors:

Less common errors (6)

Network errors:

Inherit from CriblControlPlaneError:

  • HealthServerStatusError: Healthy status. Status code 420. Applicable to 1 of 63 methods.*
  • ResponseValidationError: Type mismatch between the response data and the expected Pydantic model. Provides access to the Pydantic validation error via the cause attribute.

* Check the method documentation to see if the error is applicable.

Custom HTTP Client

The Python SDK makes API calls using the httpx HTTP library. In order to provide a convenient way to configure timeouts, cookies, proxies, custom headers, and other low-level configuration, you can initialize the SDK client with your own HTTP client instance. Depending on whether you are using the sync or async version of the SDK, you can pass an instance of HttpClient or AsyncHttpClient respectively, which are Protocol's ensuring that the client has the necessary methods to make API calls. This allows you to wrap the client with your own custom logic, such as adding custom headers, logging, or error handling, or you can just pass an instance of httpx.Client or httpx.AsyncClient directly.

For example, you could specify a header for every request that this sdk makes as follows:

from cribl_control_plane import CriblControlPlane
import httpx

http_client = httpx.Client(headers={"x-custom-header": "someValue"})
s = CriblControlPlane(client=http_client)

or you could wrap the client with your own custom logic:

from cribl_control_plane import CriblControlPlane
from cribl_control_plane.httpclient import AsyncHttpClient
import httpx

class CustomClient(AsyncHttpClient):
    client: AsyncHttpClient

    def __init__(self, client: AsyncHttpClient):
        self.client = client

    async def send(
        self,
        request: httpx.Request,
        *,
        stream: bool = False,
        auth: Union[
            httpx._types.AuthTypes, httpx._client.UseClientDefault, None
        ] = httpx.USE_CLIENT_DEFAULT,
        follow_redirects: Union[
            bool, httpx._client.UseClientDefault
        ] = httpx.USE_CLIENT_DEFAULT,
    ) -> httpx.Response:
        request.headers["Client-Level-Header"] = "added by client"

        return await self.client.send(
            request, stream=stream, auth=auth, follow_redirects=follow_redirects
        )

    def build_request(
        self,
        method: str,
        url: httpx._types.URLTypes,
        *,
        content: Optional[httpx._types.RequestContent] = None,
        data: Optional[httpx._types.RequestData] = None,
        files: Optional[httpx._types.RequestFiles] = None,
        json: Optional[Any] = None,
        params: Optional[httpx._types.QueryParamTypes] = None,
        headers: Optional[httpx._types.HeaderTypes] = None,
        cookies: Optional[httpx._types.CookieTypes] = None,
        timeout: Union[
            httpx._types.TimeoutTypes, httpx._client.UseClientDefault
        ] = httpx.USE_CLIENT_DEFAULT,
        extensions: Optional[httpx._types.RequestExtensions] = None,
    ) -> httpx.Request:
        return self.client.build_request(
            method,
            url,
            content=content,
            data=data,
            files=files,
            json=json,
            params=params,
            headers=headers,
            cookies=cookies,
            timeout=timeout,
            extensions=extensions,
        )

s = CriblControlPlane(async_client=CustomClient(httpx.AsyncClient()))

Resource Management

The CriblControlPlane class implements the context manager protocol and registers a finalizer function to close the underlying sync and async HTTPX clients it uses under the hood. This will close HTTP connections, release memory and free up other resources held by the SDK. In short-lived Python programs and notebooks that make a few SDK method calls, resource management may not be a concern. However, in longer-lived programs, it is beneficial to create a single SDK instance via a context manager and reuse it across the application.

from cribl_control_plane import CriblControlPlane, models
import os
def main():

    with CriblControlPlane(
        server_url="https://api.example.com",
        security=models.Security(
            bearer_auth=os.getenv("CRIBLCONTROLPLANE_BEARER_AUTH", ""),
        ),
    ) as ccp_client:
        # Rest of application here...


# Or when using async:
async def amain():

    async with CriblControlPlane(
        server_url="https://api.example.com",
        security=models.Security(
            bearer_auth=os.getenv("CRIBLCONTROLPLANE_BEARER_AUTH", ""),
        ),
    ) as ccp_client:
        # Rest of application here...

Debugging

You can setup your SDK to emit debug logs for SDK requests and responses.

You can pass your own logger class directly into your SDK.

from cribl_control_plane import CriblControlPlane
import logging

logging.basicConfig(level=logging.DEBUG)
s = CriblControlPlane(server_url="https://example.com", debug_logger=logging.getLogger("cribl_control_plane"))

You can also enable a default debug logger by setting an environment variable CRIBLCONTROLPLANE_DEBUG to true.

About

The Cribl SDKs are Preview features that are still being developed

Resources

Contributing

Stars

Watchers

Forks

Packages

No packages published

Contributors 6