Skip to content

Conversation

omby8888
Copy link
Contributor

@omby8888 omby8888 commented Sep 21, 2025

User description

Description

What -

Why -

How -

Type of change

Please leave one option from the following and delete the rest:

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • New Integration (non-breaking change which adds a new integration)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Non-breaking change (fix of existing functionality that will not change current behavior)
  • Documentation (added/updated documentation)

All tests should be run against the port production environment(using a testing org).

Core testing checklist

  • Integration able to create all default resources from scratch
  • Resync finishes successfully
  • Resync able to create entities
  • Resync able to update entities
  • Resync able to detect and delete entities
  • Scheduled resync able to abort existing resync and start a new one
  • Tested with at least 2 integrations from scratch
  • Tested with Kafka and Polling event listeners
  • Tested deletion of entities that don't pass the selector

Integration testing checklist

  • Integration able to create all default resources from scratch
  • Completed a full resync from a freshly installed integration and it completed successfully
  • Resync able to create entities
  • Resync able to update entities
  • Resync able to detect and delete entities
  • Resync finishes successfully
  • If new resource kind is added or updated in the integration, add example raw data, mapping and expected result to the examples folder in the integration directory.
  • If resource kind is updated, run the integration with the example data and check if the expected result is achieved
  • If new resource kind is added or updated, validate that live-events for that resource are working as expected
  • Docs PR link here

Preflight checklist

  • Handled rate limiting
  • Handled pagination
  • Implemented the code in async
  • Support Multi account

Screenshots

Include screenshots from your environment showing how the resources of the integration will look.

API Documentation

Provide links to the API documentation used for this integration.


PR Type

Enhancement


Description

  • Add Ocean execution agent infrastructure for action handling

  • Refactor GitHub webhook path management with constants

  • Implement Port client actions mixin for API operations

  • Add GitHub workflow dispatch action specification


Diagram Walkthrough

flowchart LR
  A["Ocean Core"] --> B["Execution Manager"]
  B --> C["Abstract Executor"]
  C --> D["Action Handlers"]
  E["Port Client"] --> F["Actions Mixin"]
  F --> G["API Operations"]
  H["GitHub Integration"] --> I["Webhook Registry"]
  I --> J["Workflow Dispatch"]
Loading

File Walkthrough

Relevant files
Enhancement
11 files
registry.py
Refactor webhook path to use constant                                       
+24/-23 
webhook_client.py
Import and use webhook path constant                                         
+2/-1     
main.py
Remove path parameter from webhook registration                   
+1/-1     
client.py
Add ActionsClientMixin to PortClient                                         
+3/-0     
actions.py
Create actions client mixin for API operations                     
+56/-0   
blueprints.py
Move create_action method to actions mixin                             
+0/-12   
ocean.py
Add executor registration method to context                           
+4/-0     
abstract_executor.py
Create abstract base class for executors                                 
+61/-0   
execution_manager.py
Implement execution manager for action orchestration         
+75/-0   
models.py
Add action run and execution models                                           
+36/-1   
ocean.py
Integrate execution manager into Ocean app                             
+21/-4   
Configuration changes
2 files
settings.py
Add execution agent configuration settings                             
+19/-0   
spec.yaml
Add GitHub workflow dispatch action specification               
+19/-0   

@omby8888 omby8888 requested a review from a team as a code owner September 21, 2025 13:16
Copy link
Contributor

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 3 🔵🔵🔵⚪⚪
🧪 No relevant tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

API Change

Replacing the parameterized webhook path with a hardcoded WEBHOOK_PATH constant changes the public function signature and removes flexibility for custom paths. Verify no callers relied on passing a different path and consider keeping a defaulted parameter while centralizing the default in the constant.

WEBHOOK_PATH = "/webhook"

def register_live_events_webhooks() -> None:
    """Register all live event webhook processors."""
    ocean.add_webhook_processor(WEBHOOK_PATH, RepositoryWebhookProcessor)
    ocean.add_webhook_processor(WEBHOOK_PATH, PullRequestWebhookProcessor)
    ocean.add_webhook_processor(WEBHOOK_PATH, IssueWebhookProcessor)
    ocean.add_webhook_processor(WEBHOOK_PATH, ReleaseWebhookProcessor)
    ocean.add_webhook_processor(WEBHOOK_PATH, TagWebhookProcessor)
    ocean.add_webhook_processor(WEBHOOK_PATH, BranchWebhookProcessor)
    ocean.add_webhook_processor(WEBHOOK_PATH, EnvironmentWebhookProcessor)
    ocean.add_webhook_processor(WEBHOOK_PATH, DeploymentWebhookProcessor)
    ocean.add_webhook_processor(WEBHOOK_PATH, WorkflowRunWebhookProcessor)
    ocean.add_webhook_processor(WEBHOOK_PATH, WorkflowWebhookProcessor)
    ocean.add_webhook_processor(WEBHOOK_PATH, DependabotAlertWebhookProcessor)
    ocean.add_webhook_processor(WEBHOOK_PATH, CodeScanningAlertWebhookProcessor)
    ocean.add_webhook_processor(WEBHOOK_PATH, FolderWebhookProcessor)
    ocean.add_webhook_processor(WEBHOOK_PATH, TeamWebhookProcessor)
    ocean.add_webhook_processor(WEBHOOK_PATH, TeamMemberWebhookProcessor)
    ocean.add_webhook_processor(WEBHOOK_PATH, UserWebhookProcessor)
    ocean.add_webhook_processor(WEBHOOK_PATH, FileWebhookProcessor)
    ocean.add_webhook_processor(WEBHOOK_PATH, CollaboratorMemberWebhookProcessor)
    ocean.add_webhook_processor(WEBHOOK_PATH, CollaboratorMembershipWebhookProcessor)
    ocean.add_webhook_processor(WEBHOOK_PATH, CollaboratorTeamWebhookProcessor)
    ocean.add_webhook_processor(WEBHOOK_PATH, CheckRunValidatorWebhookProcessor)
    ocean.add_webhook_processor(WEBHOOK_PATH, SecretScanningAlertWebhookProcessor)
Typing Bug

The code imports Union from ctypes instead of typing, and mixes pydantic Literal with a local Literal import above. This will likely cause runtime/type issues. Use typing.Union and a single consistent Literal import.

from ctypes import Union
from dataclasses import dataclass, field
from enum import Enum, StrEnum
from typing import Any, Literal, TypedDict

from pydantic import BaseModel
from pydantic.fields import Field
Async Await

In register_addons the execution manager processing is started without await; method is async but not awaited, so it will not run or may raise unhandled warnings. Ensure start_processing_action_runs is awaited or explicitly scheduled as a task.

async def register_addons(self) -> None:
    if self.base_url:
        await self.webhook_manager.start_processing_event_messages()
    else:
        logger.warning("No base URL provided, skipping webhook processing")

    if self.execution_agent_enabled:
        self.execution_manager.start_processing_action_runs()
    else:
        logger.warning(
            "Execution agent is not enabled, skipping execution agent setup"
        )

Copy link
Contributor

qodo-merge-pro bot commented Sep 21, 2025

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Await an async method call
Suggestion Impact:The start_processing_action_runs method was expanded and used to initialize polling and workers, and it performs the awaited feature flag check using IntegrationFeatureFlag. Although the diff doesn't show the call site being updated to add an await, the method itself was corrected and its logic implemented, addressing the original concern about the async flow not executing.

code diff:

-    async def start_processing_action_runs(self) -> None:
-        """Start polling and processing action runs for all registered actions."""
+    async def start_processing_action_runs(self):
+        """
+        Start polling and processing action runs for all registered actions.
+        """
         flags = await ocean.port_client.get_organization_feature_flags()
-        if "OCEAN_EXECUTION_AGENT_ELIGIBLE" not in flags:
+        if IntegrationFeatureFlag.OCEAN_EXECUTION_AGENT_ELIGIBLE not in flags:
             logger.warning(
                 "Execution agent is not allowed for your organization, skipping execution agent setup"
             )
             return
 
+        self._polling_task = asyncio.create_task(self._poll_action_runs())
+        self._timeout_task = asyncio.create_task(self._background_timeout_check())
+
+        workers_count = max(1, ocean.config.execution_agent.workers_count)
+        for worker_index in range(workers_count):
+            task = asyncio.create_task(self._process_actions(worker_index))
+            self._workers_pool.add(task)
+            task.add_done_callback(self._workers_pool.discard)
+

The start_processing_action_runs method is an async function but is not awaited
when called in ocean.py. This prevents the feature flag check from executing.
The call should be awaited.

port_ocean/core/handlers/actions/execution_manager.py [63-70]

 async def start_processing_action_runs(self) -> None:
     """Start polling and processing action runs for all registered actions."""
     flags = await ocean.port_client.get_organization_feature_flags()
     if "OCEAN_EXECUTION_AGENT_ELIGIBLE" not in flags:
         logger.warning(
             "Execution agent is not allowed for your organization, skipping execution agent setup"
         )
         return
 
+    # TODO: Implement polling for action runs
+    logger.info("Execution agent is enabled and eligible, starting to process action runs.")
+

[Suggestion processed]

Suggestion importance[1-10]: 10

__

Why: The suggestion correctly identifies a critical bug where an async method call is not awaited, causing the execution agent setup logic, including a feature flag check, to never run.

High
Correct inverted validation logic

Reverse the logic in the validate_execution_agent validator. The validator
should raise an error if the agent is enabled in the configuration but not
supported in the integration's spec.yaml, instead of the other way around.

port_ocean/config/settings.py [208-218]

 @validator("execution_agent")
 def validate_execution_agent(
     cls, execution_agent: ExecutionAgentSettings
 ) -> ExecutionAgentSettings:
-    spec = get_spec_file()
-    if spec and spec.get("execution_agent", {}).get("enabled", None):
-        raise ValueError(
-            "Serving as an execution agent is not currently supported for this integration."
-        )
+    if execution_agent.enabled:
+        spec = get_spec_file()
+        if not spec or not spec.get("execution_agent", {}).get("enabled"):
+            raise ValueError(
+                "Execution agent is enabled in the configuration, but not supported by this integration's spec.yaml."
+            )
 
     return execution_agent
  • Apply / Chat
Suggestion importance[1-10]: 9

__

Why: The suggestion correctly identifies a logical flaw in the validate_execution_agent validator, which incorrectly prevents the execution agent from being enabled. The proposed fix corrects this critical bug.

High
Return empty list instead of placeholder
Suggestion Impact:The placeholder implementation returning [{}] was removed; the method signature was changed and the body no longer returns [{}], preventing the problematic placeholder. Although it now has a TODO/pass, it no longer returns [{}], aligning with the suggestion’s intent to avoid that placeholder.

code diff:

-    async def get_pending_runs(self) -> list[dict[str, Any]]:
-        # response = await self.client.get(
-        #     f"{self.auth.api_url}/runs/pending?installation_id={self.auth.integration_identifier}",
-        #     headers=await self.auth.headers(),
-        # )
-        # handle_port_status_code(response)
-        # return response.json()
-        return [{}]
+    async def get_pending_runs(self, limit: int = 50) -> list[ActionRun]:
+        # TODO: Implement
+        pass

Modify the placeholder implementation of get_pending_runs to return an empty
list [] instead of [{}]. This prevents potential KeyError exceptions in
downstream code.

port_ocean/clients/port/mixins/actions.py [41-48]

 async def get_pending_runs(self) -> list[dict[str, Any]]:
     # response = await self.client.get(
     #     f"{self.auth.api_url}/runs/pending?installation_id={self.auth.integration_identifier}",
     #     headers=await self.auth.headers(),
     # )
     # handle_port_status_code(response)
     # return response.json()
-    return [{}]
+    return []

[Suggestion processed]

Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies that returning [{}] from the placeholder function get_pending_runs can lead to KeyError exceptions in consumer code, and proposes the safer alternative of returning an empty list [].

Low
High-level
Complete the execution agent implementation

The execution agent infrastructure is incomplete. Key methods in
ExecutionManager and ActionsClientMixin are placeholders, making the feature
non-operational and lacking a proper shutdown process.

Examples:

port_ocean/core/handlers/actions/execution_manager.py [63-70]
    async def start_processing_action_runs(self) -> None:
        """Start polling and processing action runs for all registered actions."""
        flags = await ocean.port_client.get_organization_feature_flags()
        if "OCEAN_EXECUTION_AGENT_ELIGIBLE" not in flags:
            logger.warning(
                "Execution agent is not allowed for your organization, skipping execution agent setup"
            )
            return
port_ocean/clients/port/mixins/actions.py [41-48]
    async def get_pending_runs(self) -> list[dict[str, Any]]:
        # response = await self.client.get(
        #     f"{self.auth.api_url}/runs/pending?installation_id={self.auth.integration_identifier}",
        #     headers=await self.auth.headers(),
        # )
        # handle_port_status_code(response)
        # return response.json()
        return [{}]

Solution Walkthrough:

Before:

class ActionsClientMixin:
    async def get_pending_runs(self):
        # ... implementation is commented out
        return [{}] # returns dummy data

class ExecutionManager:
    async def start_processing_action_runs(self):
        # Checks for a feature flag
        # ... but does not start any processing loop
        return

    async def shutdown(self):
        # Logs a shutdown message
        pass # No actual cleanup logic

After:

class ActionsClientMixin:
    async def get_pending_runs(self):
        # Fetches pending action runs from the Port API
        response = await self.client.get(f"{self.auth.api_url}/runs/pending?...")
        handle_port_status_code(response)
        return response.json()

class ExecutionManager:
    async def start_processing_action_runs(self):
        # Starts a polling loop to fetch and process pending runs
        # Dispatches runs to the correct executor
        # Manages worker tasks
        ...

    async def shutdown(self):
        # Gracefully stops the polling loop
        # Waits for active worker tasks to complete
        # Cancels pending tasks
        ...
Suggestion importance[1-10]: 9

__

Why: The suggestion correctly identifies that the core logic for the new execution agent feature is incomplete, with key methods like start_processing_action_runs and get_pending_runs being stubbed out, rendering the feature non-functional.

High
  • Update

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant