-
Notifications
You must be signed in to change notification settings - Fork 3k
Description
Component: BigQueryAgentAnalyticsPlugin
Severity: High (Breaks common multiprocessing patterns on Linux)
Environment: Linux (where multiprocessing defaults to fork), Python 3.10+
Summary
The BigQueryAgentAnalyticsPlugin causes deadlocks and TimeoutErrors when used with concurrent.futures.ProcessPoolExecutor on Linux.
This occurs because the plugin initializes a BigQueryWriteAsyncClient (and its underlying gRPC channel) immediately in its init method. The standard Python grpc library is not fork-safe. When a process forks (default start method on Linux), the child process inherits the parent's memory, including the already-open (and now broken) gRPC channel, leading to hangs or timeouts when the child tries to use it.
Root Cause Analysis
- Eager Initialization:
BigQueryAgentAnalyticsPlugin.__init__createsself.write_client = BigQueryWriteAsyncClient(...). This establishes a gRPC channel in the parent process. - Fork Safety: On Linux,
multiprocessing.Process(andProcessPoolExecutor) defaults tofork(). The child process inherits the parent's address space. - gRPC Deadlock: The inherited gRPC channel is in a bad state in the child process. Any attempt to use it (even just waiting for connectivity) results in a deadlock or
TimeoutError(after 30s).
Why It Works on macOS / Windows
- macOS / Windows: The default multiprocessing start method is
spawn. - Spawn Behavior:
spawnstarts a fresh Python interpreter process. The module is re-imported, and the BigQueryAgentAnalyticsPlugin is re-instantiated, creating a new, clean gRPC channel in the child process. This is safe. - Linux: Defaults to
fork. The child inherits the existing, unsafe parent channel.
Reproduction Code (Broken on Linux)
This pattern is common in agent applications where plugins are instantiated globally or at the module level.
# broken_example.py
from google.adk.plugins.bigquery_agent_analytics_plugin import BigQueryAgentAnalyticsPlugin, BigQueryLoggerConfig
import concurrent.futures
import asyncio
# 1. Global/Module-level instantiation creates gRPC channel in PARENT process
plugin = BigQueryAgentAnalyticsPlugin(
project_id="my-project",
dataset_id="logging",
table_id="events",
config=BigQueryLoggerConfig(enabled=True)
)
async def worker_task():
# 3. Child process attempts to use the INHERITED parent plugin/channel
# RESULT: HANGS or TIMEOUTS on Linux (fork)
await plugin.log_event(...)
def worker_wrapper():
asyncio.run(worker_task())
if __name__ == "__main__":
# 2. ProcessPoolExecutor uses fork() on Linux by default
with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
executor.submit(worker_wrapper)Workaround (User Side)
Users can force Linux to use spawn instead of fork to match macOS behavior and avoid the issue.
# fixed_example.py
import multiprocessing
import concurrent.futures
if __name__ == "__main__":
# FORCE 'spawn' context to create fresh processes/channels
mp_context = multiprocessing.get_context('spawn')
with concurrent.futures.ProcessPoolExecutor(max_workers=2, mp_context=mp_context) as executor:
executor.submit(worker_wrapper)Recommended Library Fix
The plugin should lazily initialize the gRPC client. Do not create self.write_client in [init]
Initialize it only when needed (inside the running process).
Proposed Change:
class BigQueryAgentAnalyticsPlugin(BasePlugin):
def __init__(self, ...):
# self.write_client = BigQueryWriteAsyncClient(...) <-- REMOVE THIS
self._write_client = None
@property
async def write_client(self):
if self._write_client is None:
# Create channel ONLY when needed (inside the correct process)
self._write_client = BigQueryWriteAsyncClient(...)
return self._write_client