Skip to content

Commit 27462cb

Browse files
committed
refactor(RuntimeFactory): refactor runtime and factory
This is done in order to follow single responsibility principles (SRP).
1 parent 72abea4 commit 27462cb

File tree

5 files changed

+704
-852
lines changed

5 files changed

+704
-852
lines changed

src/uipath/runtime/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from uipath.runtime.base import UiPathBaseRuntime
44
from uipath.runtime.context import UiPathRuntimeContext
55
from uipath.runtime.events import UiPathRuntimeEvent
6-
from uipath.runtime.factory import UiPathRuntimeExecutor, UiPathRuntimeFactory
6+
from uipath.runtime.factory import UiPathRuntimeFactory
77
from uipath.runtime.result import (
88
UiPathApiTrigger,
99
UiPathBreakpointResult,
@@ -16,7 +16,6 @@
1616
"UiPathRuntimeContext",
1717
"UiPathBaseRuntime",
1818
"UiPathRuntimeFactory",
19-
"UiPathRuntimeExecutor",
2019
"UiPathRuntimeResult",
2120
"UiPathRuntimeEvent",
2221
"UiPathBreakpointResult",

src/uipath/runtime/base.py

Lines changed: 152 additions & 160 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,38 @@
11
"""Base runtime class and async context manager implementation."""
22

3-
import json
43
import logging
5-
import os
64
from abc import ABC, abstractmethod
7-
from typing import AsyncGenerator
5+
from typing import (
6+
Any,
7+
AsyncGenerator,
8+
Generic,
9+
List,
10+
Optional,
11+
TypeVar,
12+
)
813

9-
from uipath.runtime.context import UiPathRuntimeContext
10-
from uipath.runtime.errors import (
11-
UiPathErrorCategory,
12-
UiPathErrorCode,
13-
UiPathErrorContract,
14-
UiPathRuntimeError,
14+
from opentelemetry import trace
15+
from opentelemetry.instrumentation.instrumentor import ( # type: ignore[attr-defined]
16+
BaseInstrumentor,
1517
)
18+
from opentelemetry.sdk.trace import ReadableSpan, SpanProcessor, TracerProvider
19+
from opentelemetry.sdk.trace.export import SpanExporter
20+
from typing_extensions import override
21+
22+
from uipath.runtime.context import UiPathRuntimeContext
1623
from uipath.runtime.events import (
1724
UiPathRuntimeEvent,
1825
)
26+
from uipath.runtime.logging import UiPathRuntimeExecutionLogHandler
1927
from uipath.runtime.logging._interceptor import UiPathRuntimeLogsInterceptor
20-
from uipath.runtime.result import UiPathRuntimeResult, UiPathRuntimeStatus
2128
from uipath.runtime.schema import (
2229
UiPathRuntimeSchema,
2330
)
31+
from uipath.runtime.tracing import (
32+
UiPathExecutionBatchTraceProcessor,
33+
UiPathExecutionSimpleTraceProcessor,
34+
UiPathRuntimeExecutionSpanExporter,
35+
)
2436

2537
logger = logging.getLogger(__name__)
2638

@@ -37,7 +49,7 @@ class UiPathBaseRuntime(ABC):
3749
This allows using the class with 'async with' statements.
3850
"""
3951

40-
def __init__(self, context: UiPathRuntimeContext):
52+
def __init__(self, context: Optional[UiPathRuntimeContext] = None):
4153
"""Initialize the runtime with the provided context."""
4254
self.context = context
4355

@@ -48,70 +60,10 @@ async def get_schema(self) -> UiPathRuntimeSchema:
4860
"""
4961
raise NotImplementedError()
5062

51-
async def __aenter__(self):
52-
"""Async enter method called when entering the 'async with' block.
53-
54-
Initializes and prepares the runtime environment.
55-
56-
Returns:
57-
The runtime instance
58-
"""
59-
# Read the input from file if provided
60-
if self.context.input_file:
61-
_, file_extension = os.path.splitext(self.context.input_file)
62-
if file_extension != ".json":
63-
raise UiPathRuntimeError(
64-
code=UiPathErrorCode.INVALID_INPUT_FILE_EXTENSION,
65-
title="Invalid Input File Extension",
66-
detail="The provided input file must be in JSON format.",
67-
)
68-
with open(self.context.input_file) as f:
69-
self.context.input = f.read()
70-
71-
try:
72-
if isinstance(self.context.input, str):
73-
if self.context.input.strip():
74-
self.context.input = json.loads(self.context.input)
75-
else:
76-
self.context.input = {}
77-
elif self.context.input is None:
78-
self.context.input = {}
79-
# else: leave it as-is (already a dict, list, bool, etc.)
80-
except json.JSONDecodeError as e:
81-
raise UiPathRuntimeError(
82-
UiPathErrorCode.INPUT_INVALID_JSON,
83-
"Invalid JSON input",
84-
f"The input data is not valid JSON: {str(e)}",
85-
UiPathErrorCategory.USER,
86-
) from e
87-
88-
await self.validate()
89-
90-
# Intercept all stdout/stderr/logs
91-
# Write to file (runtime), stdout (debug) or log handler (if provided)
92-
self.logs_interceptor = UiPathRuntimeLogsInterceptor(
93-
min_level=self.context.logs_min_level,
94-
dir=self.context.runtime_dir,
95-
file=self.context.logs_file,
96-
job_id=self.context.job_id,
97-
execution_id=self.context.execution_id,
98-
log_handler=self.context.log_handler,
99-
)
100-
self.logs_interceptor.setup()
101-
102-
return self
103-
10463
@abstractmethod
105-
async def execute(self) -> UiPathRuntimeResult:
106-
"""Execute with the provided context.
107-
108-
Returns:
109-
Dictionary with execution results
110-
111-
Raises:
112-
RuntimeError: If execution fails
113-
"""
114-
pass
64+
async def execute(self, input: dict[str, Any]) -> Any:
65+
"""Produce the agent output."""
66+
raise NotImplementedError()
11567

11668
async def stream(
11769
self,
@@ -154,101 +106,141 @@ async def stream(
154106
# Without it, the function wouldn't match the AsyncGenerator return type
155107
yield
156108

157-
@abstractmethod
158-
async def validate(self):
159-
"""Validate runtime inputs."""
160-
pass
109+
def get_instrumentor(self) -> Optional[BaseInstrumentor]:
110+
"""Get instrumentor for this runtime. If no instrumentor is available, return None."""
111+
return None
161112

162113
@abstractmethod
163114
async def cleanup(self):
164115
"""Cleaup runtime resources."""
165116
pass
166117

167-
async def __aexit__(self, exc_type, exc_val, exc_tb):
168-
"""Async exit method called when exiting the 'async with' block.
169118

170-
Cleans up resources and handles any exceptions.
119+
T = TypeVar("T", bound=UiPathBaseRuntime)
120+
121+
122+
class TraceManager:
123+
"""Trace manager."""
124+
125+
def __init__(self):
126+
"""Initialize a trace manager."""
127+
self.tracer_provider: TracerProvider = TracerProvider()
128+
trace.set_tracer_provider(self.tracer_provider)
129+
self.tracer_span_processors: List[SpanProcessor] = []
130+
self.execution_span_exporter = UiPathRuntimeExecutionSpanExporter()
131+
self.add_span_exporter(self.execution_span_exporter)
171132

172-
Always writes output file regardless of whether execution was successful,
173-
suspended, or encountered an error.
133+
def add_span_exporter(
134+
self,
135+
span_exporter: SpanExporter,
136+
batch: bool = True,
137+
) -> "TraceManager":
138+
"""Add a span processor to the tracer provider."""
139+
span_processor: SpanProcessor
140+
if batch:
141+
span_processor = UiPathExecutionBatchTraceProcessor(span_exporter)
142+
else:
143+
span_processor = UiPathExecutionSimpleTraceProcessor(span_exporter)
144+
self.tracer_span_processors.append(span_processor)
145+
self.tracer_provider.add_span_processor(span_processor)
146+
return self
147+
148+
def get_execution_spans(
149+
self,
150+
execution_id: str,
151+
) -> List[ReadableSpan]:
152+
"""Retrieve spans for a given execution id."""
153+
return self.execution_span_exporter.get_spans(execution_id)
154+
155+
def flush_spans(self) -> None:
156+
"""Flush all span processors."""
157+
for span_processor in self.tracer_span_processors:
158+
span_processor.force_flush()
159+
160+
161+
class UiPathExecutionRuntime(UiPathBaseRuntime, Generic[T]):
162+
"""Handles runtime execution with tracing/telemetry."""
163+
164+
def __init__(
165+
self,
166+
delegate: T,
167+
trace_manager: TraceManager,
168+
root_span: str = "root",
169+
execution_id: Optional[str] = None,
170+
):
171+
"""Initialize the executor."""
172+
self.delegate = delegate
173+
self.trace_manager = trace_manager
174+
self.root_span = root_span
175+
self.execution_id = execution_id
176+
self.log_handler: Optional[UiPathRuntimeExecutionLogHandler]
177+
if execution_id is not None:
178+
self.log_handler = UiPathRuntimeExecutionLogHandler(execution_id)
179+
else:
180+
self.log_handler = None
181+
182+
async def execute(
183+
self,
184+
input: dict[str, Any],
185+
) -> Any:
186+
"""Execute runtime with context."""
187+
instrumentor = self.delegate.get_instrumentor()
188+
log_interceptor = UiPathRuntimeLogsInterceptor(
189+
execution_id=self.execution_id, log_handler=self.log_handler
190+
)
191+
log_interceptor.setup()
192+
193+
if instrumentor is not None:
194+
instrumentor.instrument(tracer_provider=self.trace_manager.tracer_provider)
195+
try:
196+
tracer = trace.get_tracer("uipath-runtime")
197+
span_attributes: dict[str, Any] = {}
198+
if self.execution_id:
199+
span_attributes["execution.id"] = self.execution_id
200+
with tracer.start_as_current_span(
201+
self.root_span, attributes=span_attributes
202+
):
203+
return await self.delegate.execute(input)
204+
finally:
205+
self.trace_manager.flush_spans()
206+
if instrumentor is not None:
207+
instrumentor.uninstrument()
208+
log_interceptor.teardown()
209+
210+
@override
211+
async def stream(
212+
self,
213+
root_span: str = "root",
214+
execution_id: Optional[str] = None,
215+
) -> AsyncGenerator[UiPathRuntimeEvent, None]:
216+
"""Stream runtime execution with context.
217+
218+
Args:
219+
runtime: The runtime instance
220+
context: The runtime context
221+
222+
Yields:
223+
UiPathRuntimeEvent instances during execution and final UiPathRuntimeResult
224+
225+
Raises:
226+
UiPathRuntimeStreamNotSupportedError: If the runtime doesn't support streaming
174227
"""
228+
instrumentor = self.delegate.get_instrumentor()
229+
if instrumentor is not None:
230+
instrumentor.instrument(tracer_provider=self.trace_manager.tracer_provider)
175231
try:
176-
if self.context.result is None:
177-
execution_result = UiPathRuntimeResult()
178-
else:
179-
execution_result = self.context.result
180-
181-
if exc_type:
182-
# Create error info from exception
183-
if isinstance(exc_val, UiPathRuntimeError):
184-
error_info = exc_val.error_info
185-
else:
186-
# Generic error
187-
error_info = UiPathErrorContract(
188-
code=f"ERROR_{exc_type.__name__}",
189-
title=f"Runtime error: {exc_type.__name__}",
190-
detail=str(exc_val),
191-
category=UiPathErrorCategory.UNKNOWN,
192-
)
193-
194-
execution_result.status = UiPathRuntimeStatus.FAULTED
195-
execution_result.error = error_info
196-
197-
content = execution_result.to_dict()
198-
199-
# Always write output file at runtime, except for inner runtimes
200-
# Inner runtimes have execution_id
201-
if self.context.job_id and not self.context.execution_id:
202-
with open(self.context.result_file_path, "w") as f:
203-
json.dump(content, f, indent=2, default=str)
204-
205-
# Write the execution output to file if requested
206-
if self.context.output_file:
207-
with open(self.context.output_file, "w") as f:
208-
f.write(content.get("output", "{}"))
209-
210-
# Don't suppress exceptions
211-
return False
212-
213-
except Exception as e:
214-
logger.error(f"Error during runtime shutdown: {str(e)}")
215-
216-
# Create a fallback error result if we fail during cleanup
217-
if not isinstance(e, UiPathRuntimeError):
218-
error_info = UiPathErrorContract(
219-
code="RUNTIME_SHUTDOWN_ERROR",
220-
title="Runtime shutdown failed",
221-
detail=f"Error: {str(e)}",
222-
category=UiPathErrorCategory.SYSTEM,
223-
)
224-
else:
225-
error_info = e.error_info
226-
227-
# Last-ditch effort to write error output
228-
try:
229-
error_result = UiPathRuntimeResult(
230-
status=UiPathRuntimeStatus.FAULTED, error=error_info
231-
)
232-
error_result_content = error_result.to_dict()
233-
if self.context.job_id:
234-
with open(self.context.result_file_path, "w") as f:
235-
json.dump(error_result_content, f, indent=2, default=str)
236-
except Exception as write_error:
237-
logger.error(f"Failed to write error output file: {str(write_error)}")
238-
raise
239-
240-
# Re-raise as RuntimeError if it's not already a UiPathRuntimeError
241-
if not isinstance(e, UiPathRuntimeError):
242-
raise RuntimeError(
243-
error_info.code,
244-
error_info.title,
245-
error_info.detail,
246-
error_info.category,
247-
) from e
248-
raise
232+
tracer = trace.get_tracer("uipath-runtime")
233+
span_attributes: dict[str, Any] = {}
234+
if execution_id:
235+
span_attributes["execution.id"] = "exec-a"
236+
with tracer.start_as_current_span(root_span, attributes=span_attributes):
237+
async for event in self.delegate.stream():
238+
yield event
249239
finally:
250-
# Restore original logging
251-
if hasattr(self, "logs_interceptor"):
252-
self.logs_interceptor.teardown()
240+
self.trace_manager.flush_spans()
241+
if instrumentor is not None:
242+
instrumentor.uninstrument()
253243

254-
await self.cleanup()
244+
def cleanup(self) -> None:
245+
"""Close runtime resources."""
246+
pass

0 commit comments

Comments
 (0)