Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions util/opentelemetry-util-genai/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3943](#3943))
- Add more Semconv attributes to LLMInvocation spans.
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3862](#3862))
- Limit the upload hook thread pool to 64 workers
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3944](#3944))

## Version 0.2b0 (2025-10-14)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ def __init__(

# Use a ThreadPoolExecutor for its queueing and thread management. The semaphore
# limits the number of queued tasks. If the queue is full, data will be dropped.
self._executor = ThreadPoolExecutor(max_workers=self._max_queue_size)
self._executor = ThreadPoolExecutor(
max_workers=min(self._max_queue_size, 64)
)
self._semaphore = threading.BoundedSemaphore(self._max_queue_size)

def _submit_all(self, upload_data: UploadData) -> None:
Expand Down
13 changes: 13 additions & 0 deletions util/opentelemetry-util-genai/tests/test_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,19 @@ def test_upload_after_shutdown_logs(self):
logs.output[0],
)

def test_threadpool_max_workers(self):
for max_queue_size, expect_threadpool_workers in ((10, 10), (100, 64)):
with patch(
"opentelemetry.util.genai._upload.completion_hook.ThreadPoolExecutor"
) as mock:
hook = UploadCompletionHook(
base_path=BASE_PATH, max_queue_size=max_queue_size
)
self.addCleanup(hook.shutdown)
mock.assert_called_once_with(
max_workers=expect_threadpool_workers
)


class TestUploadCompletionHookIntegration(TestBase):
def setUp(self):
Expand Down