Skip to content

Conversation

dennis-bilson-port
Copy link
Member

@dennis-bilson-port dennis-bilson-port commented Sep 18, 2025

User description

Description

What - Added support for Datadog Service Dependency Live Events in the Port Datadog integration. This enables ingestion and processing of live dependency data for services, expanding observability within Port.

Why - Previously, the integration only supported static service dependency data. By introducing live event handling, teams can now track real-time changes in service dependencies, improving incident response, dependency mapping, and operational visibility.

How -
• Extended the Datadog integration to subscribe to and process live service dependency events
• Implemented event transformation and mapping logic to Port’s internal service dependency model.
• Added error handling, retries, and logging to ensure robust ingestion of live events.
• Updated tests and documentation to cover new live event functionality.

Type of change

Please leave one option from the following and delete the rest:

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • New Integration (non-breaking change which adds a new integration)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Non-breaking change (fix of existing functionality that will not change current behavior)
  • Documentation (added/updated documentation)

All tests should be run against the port production environment(using a testing org).

Core testing checklist

  • Integration able to create all default resources from scratch
  • Resync finishes successfully
  • Resync able to create entities
  • Resync able to update entities
  • Resync able to detect and delete entities
  • Scheduled resync able to abort existing resync and start a new one
  • Tested with at least 2 integrations from scratch
  • Tested with Kafka and Polling event listeners
  • Tested deletion of entities that don't pass the selector

Integration testing checklist

  • Integration able to create all default resources from scratch
  • Completed a full resync from a freshly installed integration and it completed successfully
  • Resync able to create entities
  • Resync able to update entities
  • Resync able to detect and delete entities
  • Resync finishes successfully
  • If new resource kind is added or updated in the integration, add example raw data, mapping and expected result to the examples folder in the integration directory.
  • If resource kind is updated, run the integration with the example data and check if the expected result is achieved
  • If new resource kind is added or updated, validate that live-events for that resource are working as expected
  • Docs PR link here

Preflight checklist

  • Handled rate limiting
  • Handled pagination
  • Implemented the code in async
  • Support Multi account

Screenshots

Include screenshots from your environment showing how the resources of the integration will look.

API Documentation

Provide links to the API documentation used for this integration.


PR Type

Enhancement


Description

  • Added live event support for Datadog service dependencies

  • Implemented service dependency webhook processor for real-time updates

  • Enhanced client with single service dependency fetching capability

  • Refactored webhook authentication to shared abstract base class


Diagram Walkthrough

flowchart LR
  A["Webhook Event"] --> B["ServiceDependencyWebhookProcessor"]
  B --> C["DatadogClient.get_single_service_dependency"]
  C --> D["Port Entity Update"]
  E["AbstractWebhookProcessor"] --> F["Shared Authentication"]
  F --> B
  F --> G["MonitorWebhookProcessor"]
Loading

File Walkthrough

Relevant files
Enhancement
5 files
client.py
Enhanced client with service dependency fetching                 
+30/-4   
main.py
Added service dependency webhook processor registration   
+11/-8   
_abstract_webhook_processor.py
Created shared webhook authentication base class                 
+45/-0   
monitor_webhook_processor.py
Refactored to use abstract webhook processor                         
+4/-26   
service_dependency_webhook_processor.py
Implemented service dependency webhook event handling       
+86/-0   
Formatting
1 files
test_client.py
Updated client test formatting                                                     
+3/-1     
Tests
2 files
test_monitor_webhook_processor.py
Refactored monitor webhook processor tests                             
+12/-16 
test_service_dependency_webhook_processor.py
Added comprehensive service dependency webhook tests         
+113/-0 
Configuration changes
5 files
port-app-config.yaml
Added service dependency resource configuration                   
+15/-0   
spec.yaml
Added service dependency to feature list                                 
+1/-0     
blueprints.json
Updated blueprints with service dependency schema               
+93/-1   
mappings.yaml
Added service dependency mapping configuration                     
+16/-0   
pyproject.toml
Bumped version to 0.3.21                                                                 
+1/-1     
Documentation
1 files
CHANGELOG.md
Added version 0.3.21 release notes                                             
+8/-0     

dennis-bilson and others added 30 commits July 30, 2025 13:23
…ove webhook processing

- Refactored service dependency fetching methods to utilize the new fetch interval constant.
- Removed the create_service_dependency_webhook method and adjusted webhook processing logic to streamline event handling.
- Updated tests to cover new service dependency functionality and ensure correct API request parameters.
@dennis-bilson-port dennis-bilson-port self-assigned this Sep 18, 2025
@dennis-bilson-port dennis-bilson-port changed the title Port 15408 add support for datadog service dependency live events [Integration][DataDog]Add Support for Live Events Sep 18, 2025
@dennis-bilson-port dennis-bilson-port changed the title [Integration][DataDog]Add Support for Live Events [Integration][DataDog] Add Support for Live Events Sep 18, 2025
@dennis-bilson-port dennis-bilson-port marked this pull request as ready for review September 23, 2025 15:02
@dennis-bilson-port dennis-bilson-port requested a review from a team as a code owner September 23, 2025 15:02
Copy link
Contributor

You are nearing your monthly Qodo Merge usage quota. For more information, please visit here.

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 3 🔵🔵🔵⚪⚪
🧪 PR contains tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

Possible Issue

In get_single_service_dependency the meaning of start_time is altered (multiplied by FETCH_WINDOW_TIME_IN_SECONDS and subtracted from now), which may not match the selector semantics and can produce unexpected time windows. Confirm intended units and naming to avoid incorrect API queries.

async def get_single_service_dependency(
    self, env: str, start_time: float, service_id: str
) -> dict[str, Any] | None:
    end_time = int(time.time())

    start_time = time.time() - (FETCH_WINDOW_TIME_IN_SECONDS * start_time)

    url = f"{self.api_url}/api/v1/service_dependencies/{service_id}"
    result: dict[str, Any] = await self._send_api_request(
        url,
        params={"env": env, "start": int(start_time), "end": end_time},
    )
Robustness

handle_event assumes payload['tags'] exists and is iterable; although validate_payload checks presence in some cases, relying on it without defensive access may raise KeyError if upstream changes. Consider using payload.get('tags', []) consistently.

async def handle_event(
    self, payload: EventPayload, resource_config: ResourceConfig
) -> WebhookEventRawResults:
    service_ids: list[str] = []
    for tag in payload["tags"]:
        if tag.startswith("service:"):
            service_ids.append(tag.split(":")[1])

    config = cast(
        Union[ResourceConfig, ServiceDependencyResourceConfig], resource_config
    )
    selector = cast(DatadogServiceDependencySelector, config.selector)
    dd_client = init_client()

    tasks = [
        dd_client.get_single_service_dependency(
            service_id=service_id,
            env=selector.environment,
            start_time=selector.start_time,
        )
        for service_id in service_ids
    ]
API Method Consistency

_webhook_exists dropped explicit method="GET" and now calls _send_api_request with only url. Ensure default method is GET; otherwise, behavior may differ and cause false negatives.

async def _webhook_exists(self, webhook_url: str) -> bool:
    try:
        webhook = await self._send_api_request(url=webhook_url)
        return bool(webhook)

Copy link
Contributor

qodo-merge-pro bot commented Sep 23, 2025

You are nearing your monthly Qodo Merge usage quota. For more information, please visit here.

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Enrich service dependency API response
Suggestion Impact:The commit enriched the API response by adding result["name"] = service_id, exactly as suggested, including a comment noting consistency with get_service_dependencies.

code diff:

+        
+        # enrich with service name for consistency with get_service_dependencies
+        result["name"] = service_id

Update get_single_service_dependency to add a name key to the result dictionary,
using the service_id as its value, to ensure data consistency with the resync
logic.

integrations/datadog/client.py [675-694]

 async def get_single_service_dependency(
     self, env: str, start_time: float, service_id: str
 ) -> dict[str, Any] | None:
     end_time = int(time.time())
 
     start_time = time.time() - (FETCH_WINDOW_TIME_IN_SECONDS * start_time)
 
     url = f"{self.api_url}/api/v1/service_dependencies/{service_id}"
     result: dict[str, Any] = await self._send_api_request(
         url,
         params={"env": env, "start": int(start_time), "end": end_time},
     )
 
     if not result:
         logger.warning(
             f"No service dependencies found for service {service_id} in environment {env}"
         )
         return None
 
+    result["name"] = service_id
     return result

[Suggestion processed]

Suggestion importance[1-10]: 9

__

Why: The suggestion correctly identifies that the get_single_service_dependency response is missing the name field, which is inconsistent with get_service_dependencies and would cause mapping failures for webhook events.

High
Improve validation for service tags
Suggestion Impact:The commit updated validate_payload to filter service tags and verify they have non-empty values after "service:", aligning with the suggested robust validation.

code diff:

+        if "tags" in payload:
+            service_tags = [tag for tag in payload["tags"] if tag.startswith("service:")]
+            if service_tags:
+                if all(len(tag.split(":", 1)) > 1 and tag.split(":", 1)[1] for tag in service_tags):
+                    has_service_info = True

Improve the validate_payload method to check that all service: tags are
correctly formatted (e.g., service:my-service) to prevent an IndexError during
event handling.

integrations/datadog/webhook_processors/service_dependency_webhook_processor.py [73-86]

 async def validate_payload(self, payload: EventPayload) -> bool:
     has_event_info = "event_type" in payload
 
     has_service_info = False
-    for tag in payload.get("tags", []):
-        has_service_info = tag.startswith("service:")
-        if has_service_info:
-            break
+    if "tags" in payload:
+        service_tags = [tag for tag in payload["tags"] if tag.startswith("service:")]
+        if service_tags:
+            # Ensure all service tags are valid and not empty, e.g. "service:"
+            if all(len(tag.split(":", 1)) > 1 and tag.split(":", 1)[1] for tag in service_tags):
+                has_service_info = True
 
     is_valid = has_service_info and has_event_info
     if not is_valid:
         logger.warning(f"Invalid webhook payload for service dependency: {payload}")
 
     return is_valid

[Suggestion processed]

Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies a potential IndexError in handle_event due to insufficiently strict validation in validate_payload for malformed service: tags, and proposes a robust fix.

Medium
High-level
Webhook triggers are overly broad

The webhook processor for service dependencies is triggered by a broad set of
generic events, potentially causing excessive API calls. It is recommended to
narrow the event list or make it user-configurable to improve efficiency.

Examples:

integrations/datadog/webhook_processors/service_dependency_webhook_processor.py [20-35]
    async def should_process_event(self, event: WebhookEvent) -> bool:
        """Only process events that are related to service dependencies."""
        event_type = event.payload["event_type"]

        # Event types that may indicate changes in service behavior or dependencies
        service_related_events = [
            "service_check",
            "query_alert_monitor",
            "metric_slo_alert",
            "monitor_slo_alert",

 ... (clipped 6 lines)

Solution Walkthrough:

Before:

# integrations/datadog/webhook_processors/service_dependency_webhook_processor.py

class ServiceDependencyWebhookProcessor(_AbstractDatadogWebhookProcessor):
    async def should_process_event(self, event: WebhookEvent) -> bool:
        event_type = event.payload["event_type"]
        
        # Hardcoded, broad list of events
        service_related_events = [
            "service_check",
            "query_alert_monitor",
            "metric_slo_alert",
            "monitor_slo_alert",
            "outlier_monitor",
            "event_v2_alert",
        ]

        return event_type in service_related_events

After:

# integrations/datadog/.port/spec.yaml (example of making it configurable)
# ...
# configurations:
#   - name: dependencyWebhookEvents
#     type: array
#     default: ["service_check", "query_alert_monitor", ...]

# integrations/datadog/webhook_processors/service_dependency_webhook_processor.py
class ServiceDependencyWebhookProcessor(_AbstractDatadogWebhookProcessor):
    async def should_process_event(self, event: WebhookEvent) -> bool:
        event_type = event.payload["event_type"]
        
        # Narrower list, or better, configurable by the user
        configured_events = ocean.integration_config.get("dependencyWebhookEvents")

        return event_type in configured_events
Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies a significant design flaw in the new ServiceDependencyWebhookProcessor where a broad, hardcoded list of event types could cause excessive, unnecessary API calls, impacting performance and potentially hitting rate limits.

Medium
General
Avoid redundant API calls for services

Modify handle_event to use a set for service_ids instead of a list to ensure
each service is processed only once, avoiding redundant API calls for duplicate
service tags in the payload.

integrations/datadog/webhook_processors/service_dependency_webhook_processor.py [40-71]

 async def handle_event(
     self, payload: EventPayload, resource_config: ResourceConfig
 ) -> WebhookEventRawResults:
-    service_ids: list[str] = []
+    service_ids: set[str] = set()
     for tag in payload["tags"]:
         if tag.startswith("service:"):
-            service_ids.append(tag.split(":")[1])
+            service_ids.add(tag.split(":", 1)[1])
+
+    if not service_ids:
+        return WebhookEventRawResults(updated_raw_results=[], deleted_raw_results=[])
 
     config = cast(
         Union[ResourceConfig, ServiceDependencyResourceConfig], resource_config
     )
     selector = cast(DatadogServiceDependencySelector, config.selector)
     dd_client = init_client()
 
     tasks = [
         dd_client.get_single_service_dependency(
             service_id=service_id,
             env=selector.environment,
             start_time=selector.start_time,
         )
         for service_id in service_ids
     ]
 
     results: list[dict[str, Any] | None] = await asyncio.gather(*tasks)
     service_dependencies = [
         service_dependency for service_dependency in results if service_dependency
     ]
 
     return WebhookEventRawResults(
         updated_raw_results=service_dependencies,
         deleted_raw_results=[],
     )
  • Apply / Chat
Suggestion importance[1-10]: 6

__

Why: The suggestion provides a valid optimization by using a set to collect unique service IDs, which avoids redundant API calls if the webhook payload contains duplicate service: tags.

Low
  • Update

Comment on lines 693 to 694

return result
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: Enrich service dependency API response

Suggested change
return result
result["name"] = service_id
return result

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only inconsistent property ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ask

This is the only inconsistent property ?

Answer:

No. There are two inconsistencies introduced:

  • In examples/blueprints.json, the datadogService blueprint defines a relation named "dependency" targeting "datadogServiceDependency", while the .port/resources/port-app-config.yaml and examples/mappings.yaml create entities of kind serviceDependency that set relations.dependencies (plural) on the datadogServiceDependency entities. These don’t align:
    • datadogService has relation "dependency"
    • datadogServiceDependency entities set "dependencies" relation (plural) pointing to targets, which implies that the datadogServiceDependency blueprint should own a "dependencies" relation, not the service blueprint—or the service blueprint should expose "dependencies" matching the mapping output. Decide which entity owns the relation and make the naming consistent.
  • The ServiceDependency selector/mapping use properties named environment and startTime (camelCase), while the new client method get_single_service_dependency expects parameters env and start_time (snake_case). That’s fine internally, but in tests and processor you cast to DatadogServiceDependencySelector where you reference selector.environment and selector.start_time (snake_case in code vs startTime in config/mappings). Ensure the selector schema and code use the same casing (either both camelCase or both snake_case) to avoid runtime attribute errors.

Suggested fixes:

  • Harmonize relation naming and ownership:
    • Option A: On datadogService blueprint, rename relation "dependency" to "dependencies" and ensure mappings for serviceDependency set the reverse relation appropriately, or
    • Option B: Keep relations on datadogServiceDependency, but then don’t define "dependency" on datadogService, or add the reverse relation with matching name.
  • Align selector fields:
    • If config uses startTime, map it to start_time in code when parsing, or rename in config to start_time. Similarly ensure environment casing matches. Update tests, processor (selector.start_time), and examples to match.

Signed-off-by: Dennis Bilson <dennis.bilson@port.io>
Signed-off-by: Dennis Bilson <dennis.bilson@port.io>
Copy link
Member

@mk-armah mk-armah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some good comments

Comment on lines +585 to +590
dd_webhook_url = (
f"{self.api_url}/api/v1/integration/webhooks/configuration/webhooks"
)

try:
if await self._webhook_exists(dd_webhook_url):
if await self._webhook_exists(f"{dd_webhook_url}/{webhook_name}"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤷🏽‍♂️

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert change, I think constructing the full url at a single point is much cleaner

Comment on lines 693 to 694

return result
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only inconsistent property ?

Comment on lines +17 to +27
webhook_secret = ocean.integration_config.get("webhook_secret")
if not webhook_secret:
logger.info("No webhook secret found. Skipping authentication")
return True

authorization = headers.get("authorization")
if not authorization:
logger.warning(
"Webhook authentication failed due to missing Authorization header in the event"
)
return False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if there is a an existing authorization, but user does not provide a webhook secret to the integration ?

Comment on lines +74 to +92
has_event_info = "event_type" in payload

has_service_info = False
if "tags" in payload:
service_tags = [
tag for tag in payload["tags"] if tag.startswith("service:")
]
if service_tags:
if all(
len(tag.split(":", 1)) > 1 and tag.split(":", 1)[1]
for tag in service_tags
):
has_service_info = True

is_valid = has_service_info and has_event_info
if not is_valid:
logger.warning(f"Invalid webhook payload for service dependency: {payload}")

return is_valid
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain why we look out for tags starting with service keyword specifically ?
And how is this consistent with resync ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants