Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to only send metrics as part of the log fowarder #95

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions logs_ingest/dynatrace_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import time
import aiohttp
import asyncio
import urllib.parse
import requests

from typing import List, Dict, Tuple, NamedTuple
from urllib.error import HTTPError
Expand All @@ -41,6 +43,39 @@ class LogBatch(NamedTuple):
serialized_batch: str
number_of_logs_in_batch: int

async def get_entity_id_by_name(dynatrace_url: str, dynatrace_token: str, entity_name: str):
# API endpoint for querying entities

headers = {
'Authorization': f'Api-Token {dynatrace_token}',
'Content-Type': 'application/json'
}
params = {
# Modify the entitySelector as needed for your use case
'entitySelector': f'type("CUSTOM_DEVICE"),caseSensitive(entityName.equals("{entity_name}"))',
'fields': '+properties' # Adjust based on the required fields
}

entities_url = urlparse(dynatrace_url.rstrip("/") + "/api/v2/entities").geturl()
# logging.info("get_entity_id_by_name() :: EntityName = " + entity_name + ", Url = " + entities_url )

response = requests.get(entities_url, headers=headers, params=params, verify=False)

# Check if the request was successful
if response.status_code == 200:
data = response.json()
entities = data.get('entities', [])
if entities:
# Assuming the first entity is the one we're looking for
entity_id = entities[0].get('entityId')
logging.info(f"Entity ID for '{entity_name}' is {entity_id}")
return entity_id
else:
logging.warning(f"No entity found with name '{entity_name}'", "entity_not_found-level-warning")
return None
else:
logging.error(f"Failed to retrieve entity. Status code: {response.status_code}", "entity_not_found-level-error")
return None

async def send_logs(dynatrace_url: str, dynatrace_token: str, logs: List[Dict], self_monitoring: SelfMonitoring):
start_time = time.perf_counter()
Expand Down
22 changes: 16 additions & 6 deletions logs_ingest/filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
RESOURCE_ID_ATTRIBUTE

GLOBAL = "global"
FILTER_NAMES_PREFIXES = ["filter.resource_type.min_log_level.", "filter.resource_type.contains_pattern.",
"filter.resource_id.min_log_level.","filter.resource_id.contains_pattern."]
FILTER_NAMES_PREFIXES = ["filter.resource_type.min_log_level.", "filter.resource_type.contains_pattern.", "filter.resource_type.only_metrics.",
"filter.resource_id.min_log_level.","filter.resource_id.contains_pattern.", "filter.resource_id.only_metrics."]


class LogFilter:
Expand Down Expand Up @@ -68,6 +68,10 @@ def _prepare_filters_dict(self) -> Dict:
contains_pattern_filter = self._create_contains_pattern_filter(filter_value)
filters_to_apply.append(contains_pattern_filter)
parsed_filters_to_log.append(filter_name)
if "only_metrics" in filter_name:
only_metrics_filter = self._create_only_metrics_filter(filter_value)
filters_to_apply.append(only_metrics_filter)
parsed_filters_to_log.append(filter_name)
if filters_to_apply:
filters_to_apply_dict[k] = filters_to_apply
logging.info(f"Successfully parsed filters: {parsed_filters_to_log}")
Expand All @@ -81,11 +85,15 @@ def _group_filters(self) -> Dict:

@staticmethod
def _create_log_level_filter(log_levels: Set):
return lambda severity, record: severity in log_levels
return lambda severity, isMetric, record: severity in log_levels

@staticmethod
def _create_contains_pattern_filter(pattern: str):
return lambda severity, record: fnmatch.fnmatch(record, pattern)
return lambda severity, isMetric, record: fnmatch.fnmatch(record, pattern)

@staticmethod
def _create_only_metrics_filter(only_metrics: str):
return lambda severity, isMetric, record: isMetric == only_metrics

def should_filter_out_record(self, parsed_record: Dict) -> bool:
if not self.filters_dict:
Expand All @@ -95,9 +103,11 @@ def should_filter_out_record(self, parsed_record: Dict) -> bool:
resource_id = parsed_record.get(RESOURCE_ID_ATTRIBUTE, "").casefold()
resource_type = parsed_record.get(RESOURCE_TYPE_ATTRIBUTE, "").casefold()
content = parsed_record.get("content", "")

is_metric = parsed_record.get("isMetric", 'False')

log_filters = self._get_filters(resource_id, resource_type)
return not all(log_filter(severity, content) for log_filter in log_filters)

return not all(log_filter(severity, is_metric, content) for log_filter in log_filters)

def _get_filters(self, resource_id, resource_type):
filters = self.filters_dict.get(resource_id, [])
Expand Down
11 changes: 10 additions & 1 deletion logs_ingest/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from dateutil import parser

from . import logging
from .dynatrace_client import send_logs
from .dynatrace_client import send_logs, get_entity_id_by_name
from .filtering import LogFilter
from .mapping import extract_resource_id_attributes, extract_severity, azure_properties_names
from .metadata_engine import MetadataEngine
Expand Down Expand Up @@ -157,9 +157,18 @@ def parse_record(record: Dict, self_monitoring: SelfMonitoring):
if "resourceId" in record:
extract_resource_id_attributes(parsed_record, record["resourceId"])

if "metricName" in record:
parsed_record['isMetric'] = "True"

if log_filter.should_filter_out_record(parsed_record):
return None

source_entity = asyncio.run(get_entity_id_by_name(os.environ[DYNATRACE_URL], os.environ[DYNATRACE_ACCESS_KEY], parsed_record["azure.resource.name"]))
if source_entity is not None:
parsed_record['dynatrace_source_entity'] = source_entity
else:
logging.warning(f"No dynatrace_source_entity found for " + parsed_record["azure.resource.name"], "source_entity_found-level-warning" )

metadata_engine.apply(record, parsed_record)
convert_date_format(parsed_record)
category = record.get("category", "").lower()
Expand Down
2 changes: 1 addition & 1 deletion logs_ingest/version.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.0.2
0.0.3
13 changes: 12 additions & 1 deletion tests/integration/events.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,16 @@
"EventStampName": "waws-prod-am2-373",
"Host": "lw1sdlwk0007DK",
"EventIpAddress": "10.51.0.105"
}
},
{
"count": 2,
"total": 19,
"minimum": 9,
"maximum": 10,
"average": 9.5,
"resourceId": "/SUBSCRIPTIONS/C85913AE-8C54-47FA-84E0-FF171B8D3579/RESOURCEGROUPS/DYNATRACERG/PROVIDERS/MICROSOFT.WEB/SITES/DYNATRACELOGFW-FUNCTION",
"time": "2024-02-03T08:29:00.0000000Z",
"metricName": "Requests",
"timeGrain": "PT1M"
}
]
2 changes: 1 addition & 1 deletion tests/integration/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def test_main_success(monkeypatch: MonkeyPatchFixture, init_events, self_monitor
assert Counter(self_monitoring.dynatrace_connectivities) == {DynatraceConnectivity.Ok: 3}
assert self_monitoring.processing_time > 0
assert self_monitoring.sending_time > 0
assert self_monitoring.sent_log_entries == 16
assert self_monitoring.sent_log_entries == 20


def test_main_expired_token(monkeypatch: MonkeyPatchFixture, init_events, self_monitoring):
Expand Down
71 changes: 71 additions & 0 deletions tests/unit/test_filtering_only_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Copyright 2021 Dynatrace LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from typing import NewType, Any

from logs_ingest.filtering import LogFilter

MonkeyPatchFixture = NewType("MonkeyPatchFixture", Any)

parsed_record = {
"cloud.provider": "Azure",
"severity": "Informational",
"azure.resource.id": "/SUBSCRIPTIONS/C85913AE-8C54-47FA-84E0-FF171B8D3579/RESOURCEGROUPS/DYNATRACERG/PROVIDERS/MICROSOFT.WEB/SITES/DYNATRACELOGFW-FUNCTION",
"azure.subscription": "C85913AE-8C54-47FA-84E0-FF171B8D3579",
"azure.resource.group": "DYNATRACERG",
"azure.resource.name": "DYNATRACELOGFW-FUNCTION",
"azure.resource.type": "MICROSOFT.WEB/SITES",
"isMetric": "True"
}


def test_resource_id_not_filter_out_only_metrics():
os.environ["FILTER_CONFIG"] = "FILTER.RESOURCE_ID.ONLY_METRICS./SUBSCRIPTIONS/C85913AE-8C54-47FA-84E0-FF171B8D3579/RESOURCEGROUPS/DYNATRACERG/PROVIDERS/MICROSOFT.WEB/SITES/DYNATRACELOGFW-FUNCTION=True"
log_filter = LogFilter()
assert not log_filter.should_filter_out_record(parsed_record)

def test_resource_id_filter_out_only_metrics():
os.environ["FILTER_CONFIG"] = "FILTER.RESOURCE_ID.ONLY_METRICS./SUBSCRIPTIONS/C85913AE-8C54-47FA-84E0-FF171B8D3579/RESOURCEGROUPS/DYNATRACERG/PROVIDERS/MICROSOFT.WEB/SITES/DYNATRACELOGFW-FUNCTION=False"
log_filter = LogFilter()
assert log_filter.should_filter_out_record(parsed_record)

def test_resource_type_not_filter_out_only_metrics():
os.environ["FILTER_CONFIG"] = "FILTER.RESOURCE_TYPE.ONLY_METRICS.MICROSOFT.WEB/SITES=True"
log_filter = LogFilter()
assert not log_filter.should_filter_out_record(parsed_record)

def test_resource_type_filter_out_only_metrics():
os.environ["FILTER_CONFIG"] = "FILTER.RESOURCE_TYPE.ONLY_METRICS.MICROSOFT.WEB/SITES=False"
log_filter = LogFilter()
assert log_filter.should_filter_out_record(parsed_record)

def test_all_filters_not_filter_out_only_metrics():
os.environ["FILTER_CONFIG"] = "FILTER.RESOURCE_ID.ONLY_METRICS./SUBSCRIPTIONS/C85913AE-8C54-47FA-84E0-FF171B8D3579/RESOURCEGROUPS/DYNATRACERG/PROVIDERS/MICROSOFT.WEB/SITES/DYNATRACELOGFW-FUNCTION=True;FILTER.RESOURCE_TYPE.ONLY_METRICS.MICROSOFT.WEB/SITES=True"
log_filter = LogFilter()
assert not log_filter.should_filter_out_record(parsed_record)

def test_all_filters_filter_out_only_metrics():
os.environ["FILTER_CONFIG"] = "FILTER.RESOURCE_ID.ONLY_METRICS./SUBSCRIPTIONS/C85913AE-8C54-47FA-84E0-FF171B8D3579/RESOURCEGROUPS/DYNATRACERG/PROVIDERS/MICROSOFT.WEB/SITES/DYNATRACELOGFW-FUNCTION=False;FILTER.RESOURCE_TYPE.ONLY_METRICS.MICROSOFT.WEB/SITES=False"
log_filter = LogFilter()
assert log_filter.should_filter_out_record(parsed_record)

def test_global_filters_not_filter_out_only_metrics():
os.environ["FILTER_CONFIG"] = "FILTER.GLOBAL.ONLY_METRICS=True"
log_filter = LogFilter()
assert not log_filter.should_filter_out_record(parsed_record)

def test_global_filters_filter_out_v2():
os.environ["FILTER_CONFIG"] = "FILTER.GLOBAL.ONLY_METRICS=False"
log_filter = LogFilter()
assert log_filter.should_filter_out_record(parsed_record)