The Cribl Python SDK for the control plane provides operational control over Cribl resources and helps streamline the process of integrating with Cribl.
In addition to the usage examples in this repository, the Cribl documentation includes code examples for common use cases.
Complementary API reference documentation is available at https://docs.cribl.io/cribl-as-code/api-reference. Product documentation is available at https://docs.cribl.io.
Important
Preview Feature The Cribl SDKs are Preview features that are still being developed. We do not recommend using them in a production environment, because the features might not be fully tested or optimized for performance, and related documentation could be incomplete.
Please continue to submit feedback through normal Cribl support channels, but assistance might be limited while the features remain in Preview.
Note
Python version upgrade policy
Once a Python version reaches its official end of life date, a 3-month grace period is provided for users to upgrade. Following this grace period, the minimum python version supported in the SDK will be updated.
The SDK can be installed with uv, pip, or poetry package managers.
uv is a fast Python package installer and resolver, designed as a drop-in replacement for pip and pip-tools. It's recommended for its speed and modern Python tooling capabilities.
uv add cribl-control-planePIP is the default package installer for Python, enabling easy installation and management of packages from PyPI via the command line.
pip install cribl-control-planePoetry is a modern tool that simplifies dependency management and package publishing by using a single pyproject.toml file to handle project metadata and dependencies.
poetry add cribl-control-planeYou can use this SDK in a Python shell with uv and the uvx command that comes with it like so:
uvx --from cribl-control-plane pythonIt's also possible to write a standalone Python script without needing to set up a whole project like so:
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.9"
# dependencies = [
# "cribl-control-plane",
# ]
# ///
from cribl_control_plane import CriblControlPlane
sdk = CriblControlPlane(
# SDK arguments
)
# Rest of script here...Once that is saved to a file, you can run it with uv run script.py where
script.py can be replaced with the actual file name.
Generally, the SDK will work well with most IDEs out of the box. However, when using PyCharm, you can enjoy much better integration with Pydantic by installing an additional plugin.
# Synchronous Example
from cribl_control_plane import CriblControlPlane, models
import os
with CriblControlPlane(
server_url="https://api.example.com",
security=models.Security(
bearer_auth=os.getenv("CRIBLCONTROLPLANE_BEARER_AUTH", ""),
),
) as ccp_client:
res = ccp_client.lake_datasets.create(lake_id="<id>", id="<id>", accelerated_fields=[
"<value 1>",
"<value 2>",
], bucket_name="<value>", cache_connection={
"accelerated_fields": [
"<value 1>",
"<value 2>",
],
"backfill_status": models.CacheConnectionBackfillStatus.PENDING,
"cache_ref": "<value>",
"created_at": 7795.06,
"lakehouse_connection_type": models.LakehouseConnectionType.CACHE,
"migration_query_id": "<id>",
"retention_in_days": 1466.58,
}, deletion_started_at=8310.58, description="pleased toothbrush long brush smooth swiftly rightfully phooey chapel", format_=models.CriblLakeDatasetFormat.DDSS, http_da_used=True, metrics={
"current_size_bytes": 6170.04,
"metrics_date": "<value>",
}, retention_period_in_days=456.37, search_config={
"datatypes": [
"<value 1>",
],
"metadata": {
"earliest": "<value>",
"enable_acceleration": True,
"field_list": [
"<value 1>",
"<value 2>",
],
"latest_run_info": {
"earliest_scanned_time": 4334.7,
"finished_at": 6811.22,
"latest_scanned_time": 5303.3,
"object_count": 9489.04,
},
"scan_mode": models.ScanMode.DETAILED,
},
}, storage_location_id="<id>", view_name="<value>")
# Handle response
print(res)The same SDK client can also be used to make asynchronous requests by importing asyncio.
# Asynchronous Example
import asyncio
from cribl_control_plane import CriblControlPlane, models
import os
async def main():
async with CriblControlPlane(
server_url="https://api.example.com",
security=models.Security(
bearer_auth=os.getenv("CRIBLCONTROLPLANE_BEARER_AUTH", ""),
),
) as ccp_client:
res = await ccp_client.lake_datasets.create_async(lake_id="<id>", id="<id>", accelerated_fields=[
"<value 1>",
"<value 2>",
], bucket_name="<value>", cache_connection={
"accelerated_fields": [
"<value 1>",
"<value 2>",
],
"backfill_status": models.CacheConnectionBackfillStatus.PENDING,
"cache_ref": "<value>",
"created_at": 7795.06,
"lakehouse_connection_type": models.LakehouseConnectionType.CACHE,
"migration_query_id": "<id>",
"retention_in_days": 1466.58,
}, deletion_started_at=8310.58, description="pleased toothbrush long brush smooth swiftly rightfully phooey chapel", format_=models.CriblLakeDatasetFormat.DDSS, http_da_used=True, metrics={
"current_size_bytes": 6170.04,
"metrics_date": "<value>",
}, retention_period_in_days=456.37, search_config={
"datatypes": [
"<value 1>",
],
"metadata": {
"earliest": "<value>",
"enable_acceleration": True,
"field_list": [
"<value 1>",
"<value 2>",
],
"latest_run_info": {
"earliest_scanned_time": 4334.7,
"finished_at": 6811.22,
"latest_scanned_time": 5303.3,
"object_count": 9489.04,
},
"scan_mode": models.ScanMode.DETAILED,
},
}, storage_location_id="<id>", view_name="<value>")
# Handle response
print(res)
asyncio.run(main())Except for the health.get and auth.tokens.get methods, all Cribl SDK requests require you to authenticate with a Bearer token. You must include a valid Bearer token in the configuration when initializing your SDK client. The Bearer token verifies your identity and ensures secure access to the requested resources. The SDK automatically manages the Authorization header for subsequent requests once properly authenticated.
For information about Bearer token expiration, see Token Management in the Cribl as Code documentation.
Authentication happens once during SDK initialization. After you initialize the SDK client with authentication as shown in the authentication examples, the SDK automatically handles authentication for all subsequent API calls. You do not need to include authentication parameters in individual API requests. The SDK Example Usage section shows how to initialize the SDK and make API calls, but if you've properly initialized your client as shown in the authentication examples, you only need to make the API method calls themselves without re-initializing.
This SDK supports the following security schemes globally:
| Name | Type | Scheme | Environment Variable |
|---|---|---|---|
bearer_auth |
http | HTTP Bearer | CRIBLCONTROLPLANE_BEARER_AUTH |
client_oauth |
oauth2 | OAuth2 token | CRIBLCONTROLPLANE_CLIENT_OAUTH |
To configure authentication on Cribl.Cloud and in hybrid deployments, use the client_oauth security scheme. The SDK uses the OAuth credentials that you provide to obtain a Bearer token and refresh the token within its expiration window using the standard OAuth2 flow.
In on-prem deployments, use the bearer_auth security scheme. The SDK uses the username/password credentials that you provide to obtain a Bearer token. Automatically refreshing the Bearer token within its expiration window requires a callback function as shown in the On-Prem Authentication Example.
Set the security scheme through the security optional parameter when initializing the SDK client instance. The SDK uses the selected scheme by default to authenticate with the API for all operations that support it.
The Cribl.Cloud and Hybrid Authentication Example demonstrates how to configure authentication on Cribl.Cloud and in hybrid deployments. To obtain the Client ID and Client Secret you'll need to initialize using the client_oauth security schema, follow the instructions for creating an API Credential in the Cribl as Code documentation.
The On-Prem Authentication Example demonstrates how to configure authentication in on-prem deployments using your username and password.
Available methods
- get - Log in and fetch an authentication token
- list - List all Destinations
- create - Create a Destination
- get - Get a Destination
- update - Update a Destination
- delete - Delete a Destination
- clear - Clear the persistent queue for a Destination
- get - Get information about the latest job to clear the persistent queue for a Destination
- list - List all Worker Groups or Edge Fleets for the specified Cribl product
- create - Create a Worker Group or Edge Fleet for the specified Cribl product
- get - Get a Worker Group or Edge Fleet
- update - Update a Worker Group or Edge Fleet
- delete - Delete a Worker Group or Edge Fleet
- deploy - Deploy commits to a Worker Group or Edge Fleet
- get - Get the Access Control List for a Worker Group or Edge Fleet
- get - Get the Access Control List for teams with permissions on a Worker Group or Edge Fleet for the specified Cribl product
- get - Get the configuration version for a Worker Group or Edge Fleet
- get - Retrieve health status of the server
- create - Create a Lake Dataset
- list - List all Lake Datasets
- delete - Delete a Lake Dataset
- get - Get a Lake Dataset
- update - Update a Lake Dataset
- get - Get a summary of the Distributed deployment
- install - Install a Pack
- list - List all Packs
- upload - Upload a Pack file
- delete - Uninstall a Pack
- get - Get a Pack
- update - Upgrade a Pack
- list - List all Pipelines
- create - Create a Pipeline
- get - Get a Pipeline
- update - Update a Pipeline
- delete - Delete a Pipeline
- list - List all Routes
- get - Get a Routing table
- update - Update a Route
- append - Add a Route to the end of the Routing table
- list - List all Sources
- create - Create a Source
- get - Get a Source
- update - Update a Source
- delete - Delete a Source
- create - Add an HEC token and optional metadata to a Splunk HEC Source
- update - Update metadata for an HEC token for a Splunk HEC Source
- list - List all branches in the Git repository used for Cribl configuration
- get - Get the name of the Git branch that the Cribl configuration is checked out to
- create - Create a new commit for pending changes to the Cribl configuration
- diff - Get the diff for a commit
- list - List the commit history
- push - Push local commits to the remote repository
- revert - Revert a commit in the local repository
- get - Get the diff and log message for a commit
- undo - Discard uncommitted (staged) changes
- count - Get a count of files that changed since a commit
- list - Get the names and statuses of files that changed since a commit
- get - Get the configuration and status for the Git integration
- get - Get the status of the current working tree
Certain SDK methods accept file objects as part of a request body or multi-part request. It is possible and typically recommended to upload files as a stream rather than reading the entire contents into memory. This avoids excessive memory consumption and potentially crashing with out-of-memory errors when working with very large files. The following example demonstrates how to attach a file stream to a request.
Tip
For endpoints that handle file uploads bytes arrays can also be used. However, using streams is recommended for large files.
from cribl_control_plane import CriblControlPlane, models
import os
with CriblControlPlane(
server_url="https://api.example.com",
security=models.Security(
bearer_auth=os.getenv("CRIBLCONTROLPLANE_BEARER_AUTH", ""),
),
) as ccp_client:
res = ccp_client.packs.upload(filename="example.file", request_body=open("example.file", "rb"))
# Handle response
print(res)Some of the endpoints in this SDK support retries. If you use the SDK without any configuration, it will fall back to the default retry strategy provided by the API. However, the default retry strategy can be overridden on a per-operation basis, or across the entire SDK.
To change the default retry strategy for a single API call, simply provide a RetryConfig object to the call:
from cribl_control_plane import CriblControlPlane, models
from cribl_control_plane.utils import BackoffStrategy, RetryConfig
import os
with CriblControlPlane(
server_url="https://api.example.com",
security=models.Security(
bearer_auth=os.getenv("CRIBLCONTROLPLANE_BEARER_AUTH", ""),
),
) as ccp_client:
res = ccp_client.lake_datasets.create(lake_id="<id>", id="<id>", accelerated_fields=[
"<value 1>",
"<value 2>",
], bucket_name="<value>", cache_connection={
"accelerated_fields": [
"<value 1>",
"<value 2>",
],
"backfill_status": models.CacheConnectionBackfillStatus.PENDING,
"cache_ref": "<value>",
"created_at": 7795.06,
"lakehouse_connection_type": models.LakehouseConnectionType.CACHE,
"migration_query_id": "<id>",
"retention_in_days": 1466.58,
}, deletion_started_at=8310.58, description="pleased toothbrush long brush smooth swiftly rightfully phooey chapel", format_=models.CriblLakeDatasetFormat.DDSS, http_da_used=True, metrics={
"current_size_bytes": 6170.04,
"metrics_date": "<value>",
}, retention_period_in_days=456.37, search_config={
"datatypes": [
"<value 1>",
],
"metadata": {
"earliest": "<value>",
"enable_acceleration": True,
"field_list": [
"<value 1>",
"<value 2>",
],
"latest_run_info": {
"earliest_scanned_time": 4334.7,
"finished_at": 6811.22,
"latest_scanned_time": 5303.3,
"object_count": 9489.04,
},
"scan_mode": models.ScanMode.DETAILED,
},
}, storage_location_id="<id>", view_name="<value>",
RetryConfig("backoff", BackoffStrategy(1, 50, 1.1, 100), False))
# Handle response
print(res)If you'd like to override the default retry strategy for all operations that support retries, you can use the retry_config optional parameter when initializing the SDK:
from cribl_control_plane import CriblControlPlane, models
from cribl_control_plane.utils import BackoffStrategy, RetryConfig
import os
with CriblControlPlane(
server_url="https://api.example.com",
retry_config=RetryConfig("backoff", BackoffStrategy(1, 50, 1.1, 100), False),
security=models.Security(
bearer_auth=os.getenv("CRIBLCONTROLPLANE_BEARER_AUTH", ""),
),
) as ccp_client:
res = ccp_client.lake_datasets.create(lake_id="<id>", id="<id>", accelerated_fields=[
"<value 1>",
"<value 2>",
], bucket_name="<value>", cache_connection={
"accelerated_fields": [
"<value 1>",
"<value 2>",
],
"backfill_status": models.CacheConnectionBackfillStatus.PENDING,
"cache_ref": "<value>",
"created_at": 7795.06,
"lakehouse_connection_type": models.LakehouseConnectionType.CACHE,
"migration_query_id": "<id>",
"retention_in_days": 1466.58,
}, deletion_started_at=8310.58, description="pleased toothbrush long brush smooth swiftly rightfully phooey chapel", format_=models.CriblLakeDatasetFormat.DDSS, http_da_used=True, metrics={
"current_size_bytes": 6170.04,
"metrics_date": "<value>",
}, retention_period_in_days=456.37, search_config={
"datatypes": [
"<value 1>",
],
"metadata": {
"earliest": "<value>",
"enable_acceleration": True,
"field_list": [
"<value 1>",
"<value 2>",
],
"latest_run_info": {
"earliest_scanned_time": 4334.7,
"finished_at": 6811.22,
"latest_scanned_time": 5303.3,
"object_count": 9489.04,
},
"scan_mode": models.ScanMode.DETAILED,
},
}, storage_location_id="<id>", view_name="<value>")
# Handle response
print(res)CriblControlPlaneError is the base class for all HTTP error responses. It has the following properties:
| Property | Type | Description |
|---|---|---|
err.message |
str |
Error message |
err.status_code |
int |
HTTP response status code eg 404 |
err.headers |
httpx.Headers |
HTTP response headers |
err.body |
str |
HTTP body. Can be empty string if no body is returned. |
err.raw_response |
httpx.Response |
Raw HTTP response |
err.data |
Optional. Some errors may contain structured data. See Error Classes. |
from cribl_control_plane import CriblControlPlane, errors, models
import os
with CriblControlPlane(
server_url="https://api.example.com",
security=models.Security(
bearer_auth=os.getenv("CRIBLCONTROLPLANE_BEARER_AUTH", ""),
),
) as ccp_client:
res = None
try:
res = ccp_client.lake_datasets.create(lake_id="<id>", id="<id>", accelerated_fields=[
"<value 1>",
"<value 2>",
], bucket_name="<value>", cache_connection={
"accelerated_fields": [
"<value 1>",
"<value 2>",
],
"backfill_status": models.CacheConnectionBackfillStatus.PENDING,
"cache_ref": "<value>",
"created_at": 7795.06,
"lakehouse_connection_type": models.LakehouseConnectionType.CACHE,
"migration_query_id": "<id>",
"retention_in_days": 1466.58,
}, deletion_started_at=8310.58, description="pleased toothbrush long brush smooth swiftly rightfully phooey chapel", format_=models.CriblLakeDatasetFormat.DDSS, http_da_used=True, metrics={
"current_size_bytes": 6170.04,
"metrics_date": "<value>",
}, retention_period_in_days=456.37, search_config={
"datatypes": [
"<value 1>",
],
"metadata": {
"earliest": "<value>",
"enable_acceleration": True,
"field_list": [
"<value 1>",
"<value 2>",
],
"latest_run_info": {
"earliest_scanned_time": 4334.7,
"finished_at": 6811.22,
"latest_scanned_time": 5303.3,
"object_count": 9489.04,
},
"scan_mode": models.ScanMode.DETAILED,
},
}, storage_location_id="<id>", view_name="<value>")
# Handle response
print(res)
except errors.CriblControlPlaneError as e:
# The base class for HTTP error responses
print(e.message)
print(e.status_code)
print(e.body)
print(e.headers)
print(e.raw_response)
# Depending on the method different errors may be thrown
if isinstance(e, errors.Error):
print(e.data.message) # Optional[str]Primary errors:
CriblControlPlaneError: The base class for HTTP error responses.Error: Unexpected error. Status code500.
Less common errors (6)
Network errors:
httpx.RequestError: Base class for request errors.httpx.ConnectError: HTTP client was unable to make a request to a server.httpx.TimeoutException: HTTP request timed out.
Inherit from CriblControlPlaneError:
HealthServerStatusError: Healthy status. Status code420. Applicable to 1 of 63 methods.*ResponseValidationError: Type mismatch between the response data and the expected Pydantic model. Provides access to the Pydantic validation error via thecauseattribute.
* Check the method documentation to see if the error is applicable.
The Python SDK makes API calls using the httpx HTTP library. In order to provide a convenient way to configure timeouts, cookies, proxies, custom headers, and other low-level configuration, you can initialize the SDK client with your own HTTP client instance.
Depending on whether you are using the sync or async version of the SDK, you can pass an instance of HttpClient or AsyncHttpClient respectively, which are Protocol's ensuring that the client has the necessary methods to make API calls.
This allows you to wrap the client with your own custom logic, such as adding custom headers, logging, or error handling, or you can just pass an instance of httpx.Client or httpx.AsyncClient directly.
For example, you could specify a header for every request that this sdk makes as follows:
from cribl_control_plane import CriblControlPlane
import httpx
http_client = httpx.Client(headers={"x-custom-header": "someValue"})
s = CriblControlPlane(client=http_client)or you could wrap the client with your own custom logic:
from cribl_control_plane import CriblControlPlane
from cribl_control_plane.httpclient import AsyncHttpClient
import httpx
class CustomClient(AsyncHttpClient):
client: AsyncHttpClient
def __init__(self, client: AsyncHttpClient):
self.client = client
async def send(
self,
request: httpx.Request,
*,
stream: bool = False,
auth: Union[
httpx._types.AuthTypes, httpx._client.UseClientDefault, None
] = httpx.USE_CLIENT_DEFAULT,
follow_redirects: Union[
bool, httpx._client.UseClientDefault
] = httpx.USE_CLIENT_DEFAULT,
) -> httpx.Response:
request.headers["Client-Level-Header"] = "added by client"
return await self.client.send(
request, stream=stream, auth=auth, follow_redirects=follow_redirects
)
def build_request(
self,
method: str,
url: httpx._types.URLTypes,
*,
content: Optional[httpx._types.RequestContent] = None,
data: Optional[httpx._types.RequestData] = None,
files: Optional[httpx._types.RequestFiles] = None,
json: Optional[Any] = None,
params: Optional[httpx._types.QueryParamTypes] = None,
headers: Optional[httpx._types.HeaderTypes] = None,
cookies: Optional[httpx._types.CookieTypes] = None,
timeout: Union[
httpx._types.TimeoutTypes, httpx._client.UseClientDefault
] = httpx.USE_CLIENT_DEFAULT,
extensions: Optional[httpx._types.RequestExtensions] = None,
) -> httpx.Request:
return self.client.build_request(
method,
url,
content=content,
data=data,
files=files,
json=json,
params=params,
headers=headers,
cookies=cookies,
timeout=timeout,
extensions=extensions,
)
s = CriblControlPlane(async_client=CustomClient(httpx.AsyncClient()))The CriblControlPlane class implements the context manager protocol and registers a finalizer function to close the underlying sync and async HTTPX clients it uses under the hood. This will close HTTP connections, release memory and free up other resources held by the SDK. In short-lived Python programs and notebooks that make a few SDK method calls, resource management may not be a concern. However, in longer-lived programs, it is beneficial to create a single SDK instance via a context manager and reuse it across the application.
from cribl_control_plane import CriblControlPlane, models
import os
def main():
with CriblControlPlane(
server_url="https://api.example.com",
security=models.Security(
bearer_auth=os.getenv("CRIBLCONTROLPLANE_BEARER_AUTH", ""),
),
) as ccp_client:
# Rest of application here...
# Or when using async:
async def amain():
async with CriblControlPlane(
server_url="https://api.example.com",
security=models.Security(
bearer_auth=os.getenv("CRIBLCONTROLPLANE_BEARER_AUTH", ""),
),
) as ccp_client:
# Rest of application here...You can setup your SDK to emit debug logs for SDK requests and responses.
You can pass your own logger class directly into your SDK.
from cribl_control_plane import CriblControlPlane
import logging
logging.basicConfig(level=logging.DEBUG)
s = CriblControlPlane(server_url="https://example.com", debug_logger=logging.getLogger("cribl_control_plane"))You can also enable a default debug logger by setting an environment variable CRIBLCONTROLPLANE_DEBUG to true.