Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: Add the Python process framework #9363

Merged
merged 20 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
PR Feedback
  • Loading branch information
moonbox3 committed Oct 22, 2024
commit 4038579587cbb7802917ac4782b7778cb8ffed3f
60 changes: 37 additions & 23 deletions python/samples/concepts/processes/cycles_with_fan_in.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
import logging
from enum import Enum

from pydantic import Field

Expand All @@ -18,7 +19,7 @@
logging.basicConfig(level=logging.WARNING)


class CommonEvents:
class CommonEvents(Enum):
"""Common events for the sample process."""

UserInputReceived = "UserInputReceived"
Expand All @@ -33,37 +34,46 @@ class CommonEvents:
StartProcess = "StartProcess"
moonbox3 marked this conversation as resolved.
Show resolved Hide resolved


# Define a sample step that once the `on_input_event` is received,
# it will emit two events to start the A and B steps.
class KickOffStep(KernelProcessStep):
class Functions:
KickOff = "kick_off"

@kernel_function(name=Functions.KickOff)
async def print_welcome_message(self, context: KernelProcessStepContext):
await context.emit_event(KernelProcessEvent(id=CommonEvents.StartARequested, data="Get Going A"))
await context.emit_event(KernelProcessEvent(id=CommonEvents.StartBRequested, data="Get Going B"))
await context.emit_event(KernelProcessEvent(id=CommonEvents.StartARequested.value, data="Get Going A"))
await context.emit_event(KernelProcessEvent(id=CommonEvents.StartBRequested.value, data="Get Going B"))


# Define a sample `AStep` step that will emit an event after 1 second.
# The event will be sent to the `CStep` step with the data `I did A`.
class AStep(KernelProcessStep):
@kernel_function()
async def do_it(self, context: KernelProcessStepContext):
await asyncio.sleep(1)
await context.emit_event(KernelProcessEvent(id=CommonEvents.AStepDone, data="I did A"))
await context.emit_event(KernelProcessEvent(id=CommonEvents.AStepDone.value, data="I did A"))


# Define a sample `BStep` step that will emit an event after 2 seconds.
# The event will be sent to the `CStep` step with the data `I did B`.
class BStep(KernelProcessStep):
@kernel_function()
moonbox3 marked this conversation as resolved.
Show resolved Hide resolved
async def do_it(self, context: KernelProcessStepContext):
await asyncio.sleep(2)
await context.emit_event(KernelProcessEvent(id=CommonEvents.BStepDone, data="I did B"))
await context.emit_event(KernelProcessEvent(id=CommonEvents.BStepDone.value, data="I did B"))


# Define a sample `CStepState` that will keep track of the current cycle.
class CStepState:
current_cycle: int = 0


# Define a sample `CStep` step that will emit an `ExitRequested` event after 3 cycles.
class CStep(KernelProcessStep[CStepState]):
state: CStepState = Field(default_factory=CStepState)

# The activate method overrides the base class method to set the state in the step.
async def activate(self, state: KernelProcessStepState[TState]):
"""Activates the step and sets the state."""
self.state = state.state
Expand All @@ -73,9 +83,9 @@ async def do_it(self, context: KernelProcessStepContext, astepdata: str, bstepda
self.state.current_cycle += 1
print(f"CStep Current Cycle: {self.state.current_cycle}")
if self.state.current_cycle == 3:
await context.emit_event(process_event=KernelProcessEvent(id=CommonEvents.ExitRequested))
await context.emit_event(process_event=KernelProcessEvent(id=CommonEvents.ExitRequested.value))
return
await context.emit_event(process_event=KernelProcessEvent(id=CommonEvents.CStepDone))
await context.emit_event(process_event=KernelProcessEvent(id=CommonEvents.CStepDone.value))


kernel = Kernel()
Expand All @@ -84,37 +94,41 @@ async def do_it(self, context: KernelProcessStepContext, astepdata: str, bstepda
async def cycles_with_fan_in():
kernel.add_service(OpenAIChatCompletion(service_id="default"))

# Define the process builder
process = ProcessBuilder(name="Test Process")

# Add the step types to the builder
kickoff_step = process.add_step(step_type=KickOffStep)
myAStep = process.add_step(step_type=AStep)
myBStep = process.add_step(step_type=BStep)
myCStep = process.add_step(step_type=CStep, initial_state=CStepState())

# Define the input event and where to send it to
process.on_input_event(event_id=CommonEvents.StartProcess).send_event_to(target=kickoff_step)
process.on_input_event(event_id=CommonEvents.StartProcess.value).send_event_to(target=kickoff_step)

# Define the process flow
kickoff_step.on_event(event_id=CommonEvents.StartARequested).send_event_to(target=myAStep)
kickoff_step.on_event(event_id=CommonEvents.StartBRequested).send_event_to(target=myBStep)
myAStep.on_event(event_id=CommonEvents.AStepDone).send_event_to(target=myCStep, parameter_name="astepdata")
kickoff_step.on_event(event_id=CommonEvents.StartARequested.value).send_event_to(target=myAStep)
kickoff_step.on_event(event_id=CommonEvents.StartBRequested.value).send_event_to(target=myBStep)
myAStep.on_event(event_id=CommonEvents.AStepDone.value).send_event_to(target=myCStep, parameter_name="astepdata")

# Define the fan in behavior once both AStep and BStep are done
myBStep.on_event(event_id=CommonEvents.BStepDone).send_event_to(target=myCStep, parameter_name="bstepdata")
myCStep.on_event(event_id=CommonEvents.CStepDone).send_event_to(target=kickoff_step)
myCStep.on_event(event_id=CommonEvents.ExitRequested).stop_process()
myBStep.on_event(event_id=CommonEvents.BStepDone.value).send_event_to(target=myCStep, parameter_name="bstepdata")
myCStep.on_event(event_id=CommonEvents.CStepDone.value).send_event_to(target=kickoff_step)
myCStep.on_event(event_id=CommonEvents.ExitRequested.value).stop_process()

# Build the process
kernel_process = process.build()

process_context = await kernel_process.start(kernel=kernel, initial_event=CommonEvents.StartProcess, data="foo")

process_state = await process_context.get_state()
c_step_state: KernelProcessStepState[CStepState] = next(
(s.state for s in process_state.steps if s.state.name == "CStep"), None
)
assert c_step_state.state # nosec
assert c_step_state.state.current_cycle == 3 # nosec
print(f"CStepState current cycle: {c_step_state.state.current_cycle}")
async with await kernel_process.start(
kernel=kernel, initial_event=CommonEvents.StartProcess.value, data="foo"
) as process_context:
process_state = await process_context.get_state()
c_step_state: KernelProcessStepState[CStepState] = next(
(s.state for s in process_state.steps if s.state.name == "CStep"), None
)
assert c_step_state.state # nosec
assert c_step_state.state.current_cycle == 3 # nosec
print(f"CStepState current cycle: {c_step_state.state.current_cycle}")


if __name__ == "__main__":
Expand Down
3 changes: 3 additions & 0 deletions python/semantic_kernel/processes/const.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Copyright (c) Microsoft. All rights reserved.

END_PROCESS_ID: str = "END"
moonbox3 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,6 @@ async def start(
if isinstance(initial_event, str):
initial_event = KernelProcessEvent(id=initial_event, data=kwargs.get("data", None))

async with LocalKernelProcessContext(self, kernel) as process_context:
await process_context.start_with_event(initial_event)
return process_context
process_context = LocalKernelProcessContext(self, kernel)
await process_context.start_with_event(initial_event)
return process_context
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from semantic_kernel.exceptions import KernelException
from semantic_kernel.exceptions.process_exceptions import ProcessEventUndefinedException
from semantic_kernel.kernel import Kernel
from semantic_kernel.processes.const import END_PROCESS_ID
from semantic_kernel.processes.kernel_process.kernel_process_state import KernelProcessState
from semantic_kernel.processes.kernel_process.kernel_process_step_info import KernelProcessStepInfo
from semantic_kernel.processes.local_runtime.local_event import KernelProcessEvent, KernelProcessEventVisibility
Expand All @@ -30,8 +31,6 @@
class LocalProcess(LocalStep):
"""A local process that contains a collection of steps."""

_end_process_id: str = "END"

kernel: Kernel
steps: list[LocalStep] = Field(default_factory=list)
step_infos: list[KernelProcessStepInfo] = Field(default_factory=list)
Expand Down Expand Up @@ -170,7 +169,7 @@ async def internal_execute(self, max_supersteps: int = 100, keep_alive: bool = T

message_tasks = []
for message in messages_to_process:
if message.destination_id == self._end_process_id:
if message.destination_id == END_PROCESS_ID:
break

destination_step = next(step for step in self.steps if step.id == message.destination_id)
Expand Down
Loading