Skip to content

Commit 88dd325

Browse files
authored
[Armis Event Collector] Update pagination handling in fetch-events (#40682)
* initial logic * fix: remove duplicate debug logging * fix: reorder pagination time limit condition and change log level to debug * refactor: improve debug logging and reduce pagination timeout in Armis event collector * Update debug log * refactor: remove debug-log prefix from debug messages in ArmisEventCollector * refactor: remove info-log prefix from debug messages in ArmisEventCollector * chore: bump Armis pack to version 1.2.2 * test: add pagination timeout test for ArmisEventCollector fetch_by_aql_query * Update UT * test: improve pagination timeout test using freeze_time decorator and simplified response handling
1 parent 05f113b commit 88dd325

File tree

5 files changed

+108
-36
lines changed

5 files changed

+108
-36
lines changed

Packs/Armis/Integrations/ArmisEventCollector/ArmisEventCollector.py

Lines changed: 51 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
1-
import demistomock as demisto
2-
from CommonServerPython import *
3-
import urllib3
4-
from typing import Any
51
import itertools
2+
from typing import Any
3+
4+
import urllib3
65
from dateutil import parser
76

7+
import demistomock as demisto
8+
from CommonServerPython import *
9+
810
# Disable insecure warnings
911
urllib3.disable_warnings()
1012

1113
EVENT_TYPE_ALERTS = "alerts"
1214
EVENT_TYPE_ACTIVITIES = "activity"
1315
EVENT_TYPE_DEVICES = "devices"
16+
MAX_PAGINATION_DURATION_SECONDS = 180
1417

1518

1619
class EVENT_TYPE:
@@ -72,7 +75,9 @@ def __init__(self, base_url, api_key, access_token, verify=False, proxy=False):
7275
self._api_key = api_key
7376
super().__init__(base_url=base_url, verify=verify, proxy=proxy)
7477
if not access_token or not self.is_valid_access_token(access_token):
78+
demisto.debug("Invalid access token was used, attempting to get new access token.")
7579
access_token = self.get_access_token()
80+
demisto.debug("New access token was successfully generated.")
7681
self.update_access_token(access_token)
7782

7883
def update_access_token(self, access_token=None):
@@ -89,13 +94,14 @@ def perform_fetch(self, params):
8994
)
9095
except Exception as e:
9196
if "Invalid access token" in str(e):
92-
demisto.debug("debug-log: Invalid access token")
97+
demisto.debug("Expired or invalid access token, attempting to update access token.")
9398
self.update_access_token()
99+
demisto.debug("Access token successfully updated.")
94100
raw_response = self._http_request(
95101
url_suffix="/search/", method="GET", params=params, headers=self._headers, timeout=API_TIMEOUT
96102
)
97103
else:
98-
demisto.debug(f"debug-log: Error occurred while fetching events: {e}")
104+
demisto.debug(f"Error occurred while fetching events: {e}")
99105
raise e
100106
return raw_response
101107

@@ -121,6 +127,7 @@ def fetch_by_aql_query(
121127
order_by: str = "time",
122128
from_param: None | int = None,
123129
before: Optional[datetime] = None,
130+
event_type: str = "",
124131
):
125132
"""Fetches events using AQL query.
126133
@@ -137,7 +144,7 @@ def fetch_by_aql_query(
137144
aql_query = f"{aql_query} after:{after.strftime(DATE_FORMAT)}" # noqa: E231
138145
if before:
139146
aql_query = f"{aql_query} before:{before.strftime(DATE_FORMAT)}" # noqa: E231
140-
demisto.info(f"info-log: Fetching events until {before}.")
147+
demisto.info(f"Fetching events until {before}.")
141148
params: dict[str, Any] = {"aql": aql_query, "includeTotal": "false", "length": max_fetch, "orderBy": order_by}
142149
if from_param:
143150
params["from"] = from_param
@@ -147,6 +154,7 @@ def fetch_by_aql_query(
147154
# perform pagination if needed (until max_fetch limit), cycle through all pages and add results to results list.
148155
# The response's 'next' attribute carries the index to start the next request in the
149156
# pagination (using the 'from' request parameter), or null if there are no more pages left.
157+
start_time = datetime.now()
150158
try:
151159
while next and (len(results) < max_fetch):
152160
if len(results) < max_fetch:
@@ -156,9 +164,19 @@ def fetch_by_aql_query(
156164
next = raw_response.get("data", {}).get("next") or 0
157165
current_results = raw_response.get("data", {}).get("results", [])
158166
results.extend(current_results)
159-
demisto.info(f"info-log: fetched {len(current_results)} results, total is {len(results)}, and {next=}.")
167+
demisto.info(f"fetched {len(current_results)} results, total is {len(results)}, and {next=}.")
168+
169+
total_seconds = (datetime.now() - start_time).total_seconds()
170+
demisto.debug(f"total {total_seconds} seconds so far")
171+
if next and total_seconds >= MAX_PAGINATION_DURATION_SECONDS:
172+
demisto.debug(
173+
f"Reached pagination time limit of {MAX_PAGINATION_DURATION_SECONDS}s for {event_type}, "
174+
f"breaking early with {next=} to avoid timeout. Pagination will resume in the next fetch cycle."
175+
)
176+
break
177+
160178
except Exception as e:
161-
demisto.info(f"info-log: caught an exception during pagination:\n{str(e)}") # noqa: E231
179+
demisto.info(f"caught an exception during pagination:\n{str(e)}") # noqa: E231
162180

163181
return results, next
164182

@@ -251,7 +269,7 @@ def calculate_fetch_start_time(
251269
# case 1
252270
if last_fetch_time:
253271
if isinstance(last_fetch_time, str):
254-
demisto.info(f"info-log: calculating_fetch_time for {last_fetch_time=}")
272+
demisto.info(f"calculating_fetch_time for {last_fetch_time=}")
255273
last_fetch_datetime = arg_to_datetime(last_fetch_time)
256274
else:
257275
last_fetch_datetime = last_fetch_time
@@ -264,7 +282,7 @@ def calculate_fetch_start_time(
264282
if after_time:
265283
after_time = after_time.replace(tzinfo=None)
266284
if not after_time or after_time >= before_time:
267-
demisto.info("info-log: last run time is later than before time, overwriting after time.")
285+
demisto.info("last run time is later than before time, overwriting after time.")
268286
after_time = before_time - timedelta(minutes=1)
269287
return after_time, before_time
270288

@@ -323,7 +341,7 @@ def dedup_events(events: list[dict], events_last_fetch_ids: list[str], unique_id
323341
"""
324342
# case 1
325343
if not events:
326-
demisto.debug("debug-log: Dedup case 1 - Empty event list (no new events received from API response).")
344+
demisto.debug("Dedup case 1 - Empty event list (no new events received from API response).")
327345
return [], events_last_fetch_ids
328346

329347
new_events: list[dict] = [event for event in events if event.get(unique_id_key) not in events_last_fetch_ids]
@@ -337,7 +355,7 @@ def dedup_events(events: list[dict], events_last_fetch_ids: list[str], unique_id
337355
and latest_event_datetime
338356
and are_two_datetime_equal_by_second(latest_event_datetime, earliest_event_datetime)
339357
):
340-
demisto.debug("debug-log: Dedup case 2 - All events from the current fetch cycle have the same timestamp.")
358+
demisto.debug("Dedup case 2 - All events from the current fetch cycle have the same timestamp.")
341359
new_ids = [event.get(unique_id_key, "") for event in new_events]
342360
events_last_fetch_ids.extend(new_ids)
343361
return new_events, events_last_fetch_ids
@@ -346,7 +364,7 @@ def dedup_events(events: list[dict], events_last_fetch_ids: list[str], unique_id
346364
else:
347365
# Note that the following timestamps comparison are made between strings and assume
348366
# the following timestamp format from the response: "YYYY-MM-DDTHH:MM:SS.fffff+Z"
349-
demisto.debug("debug-log: Dedup case 3 - Most recent event has later timestamp then other events in the response.")
367+
demisto.debug("Dedup case 3 - Most recent event has later timestamp then other events in the response.")
350368

351369
latest_event_timestamp = events[-1].get(event_order_by, "")[:19]
352370
# itertools.takewhile is used to iterate over the list of events (from latest time to earliest)
@@ -385,11 +403,11 @@ def fetch_by_event_type(
385403
last_fetch_time_field = f"{event_type.type}_last_fetch_time"
386404
last_fetch_next_field = f"{event_type.type}_last_fetch_next_field"
387405

388-
demisto.debug(f"debug-log: handling event-type: {event_type.type}")
406+
demisto.debug(f"handling event-type: {event_type.type}")
389407
if last_fetch_time := last_run.get(last_fetch_time_field):
390-
demisto.debug(f"debug-log: last run of type: {event_type.type} time is: {last_fetch_time}")
408+
demisto.debug(f"last run of type: {event_type.type} time is: {last_fetch_time}")
391409
last_fetch_next = last_run.get(last_fetch_next_field, 0)
392-
demisto.debug(f"debug-log: last run of type: {event_type.type} next is: {last_fetch_next}")
410+
demisto.debug(f"last run of type: {event_type.type} next is: {last_fetch_next}")
393411
event_type_fetch_start_time, before_time = calculate_fetch_start_time(last_fetch_time, fetch_start_time, fetch_delay)
394412
response, next = client.fetch_by_aql_query(
395413
aql_query=event_type.aql_query,
@@ -398,16 +416,17 @@ def fetch_by_event_type(
398416
order_by=event_type.order_by,
399417
from_param=last_fetch_next,
400418
before=before_time,
419+
event_type=event_type.type,
401420
)
402421
new_events: list[dict] = []
403-
demisto.debug(f"debug-log: fetched {len(response)} {event_type.type} from API")
422+
demisto.debug(f"fetched {len(response)} {event_type.type} from API")
404423
if response:
405424
new_events, next_run[last_fetch_ids] = dedup_events(
406425
response, last_run.get(last_fetch_ids, []), event_type.unique_id_key, event_type.order_by
407426
)
408427
events.setdefault(event_type.dataset_name, []).extend(new_events)
409-
demisto.debug(f"debug-log: overall {len(new_events)} {event_type.dataset_name} (after dedup)")
410-
demisto.debug(f"debug-log: last {event_type.dataset_name} in list: {new_events[-1] if new_events else {}}")
428+
demisto.debug(f"overall {len(new_events)} {event_type.dataset_name} (after dedup)")
429+
demisto.debug(f"last {event_type.dataset_name} in list: {new_events[-1] if new_events else {}}")
411430

412431
if not next: # we wish to update the time only in case the next is 0 because the next is relative to the time.
413432
event_type_fetch_start_time = new_events[-1].get(event_type.order_by) if new_events else last_fetch_time
@@ -416,7 +435,7 @@ def fetch_by_event_type(
416435
if isinstance(event_type_fetch_start_time, datetime):
417436
event_type_fetch_start_time = event_type_fetch_start_time.strftime(DATE_FORMAT)
418437
next_run[last_fetch_time_field] = event_type_fetch_start_time
419-
demisto.debug(f"debug-log: updated next_run for event type {event_type.type} with {next=} and {event_type_fetch_start_time=}")
438+
demisto.debug(f"updated next_run for event type {event_type.type} with {next=} and {event_type_fetch_start_time=}")
420439

421440

422441
def fetch_events_for_specific_alert_ids(client: Client, alert, aql_alert_id):
@@ -431,14 +450,14 @@ def fetch_events_for_specific_alert_ids(client: Client, alert, aql_alert_id):
431450
None: Alert dict is updated in-place with activitiesData and devicesData.
432451
433452
"""
434-
demisto.debug(f"debug-log: Fetching Activities and Devices for specific alert IDs: {aql_alert_id}")
453+
demisto.debug(f"Fetching Activities and Devices for specific alert IDs: {aql_alert_id}")
435454
activities_aql_query = f'{EVENT_TYPES["Activities"].aql_query} {aql_alert_id}'
436455
devices_aql_query = f'{EVENT_TYPES["Devices"].aql_query} {aql_alert_id}'
437456
activities_response = client.fetch_by_ids_in_aql_query(
438457
aql_query=activities_aql_query, order_by=EVENT_TYPES["Activities"].order_by
439458
)
440459
devices_response = client.fetch_by_ids_in_aql_query(aql_query=devices_aql_query, order_by=EVENT_TYPES["Devices"].order_by)
441-
demisto.debug(f"debug-log: fetch by alert ids\
460+
demisto.debug(f"fetch by alert ids\
442461
fetched {len(activities_response)} Activities and {len(devices_response)} Devices")
443462
alert["activitiesData"] = activities_response if activities_response else {}
444463
alert["devicesData"] = devices_response if devices_response else {}
@@ -471,7 +490,7 @@ def fetch_events(
471490
events: dict[str, list[dict]] = {}
472491
next_run: dict[str, list | str] = {}
473492
if "Devices" in event_types_to_fetch and not should_run_device_fetch(last_run, device_fetch_interval, datetime.now()):
474-
demisto.debug("debug-log: skipping Devices fetch as it is not yet reached the device interval.")
493+
demisto.debug("skipping Devices fetch as it is not yet reached the device interval.")
475494
event_types_to_fetch.remove("Devices")
476495

477496
if "Alerts" in event_types_to_fetch:
@@ -500,7 +519,7 @@ def fetch_events(
500519

501520
next_run["access_token"] = client._access_token
502521

503-
demisto.debug(f"debug-log: events: {events}")
522+
demisto.debug(f"events: {events}")
504523
return events, next_run
505524

506525

@@ -547,18 +566,18 @@ def handle_fetched_events(events: dict[str, list[dict[str, Any]]], next_run: dic
547566
if events:
548567
for event_type, events_list in events.items():
549568
if not events_list:
550-
demisto.debug(f"debug-log: No events of type: {event_type} fetched from API.")
569+
demisto.debug(f"No events of type: {event_type} fetched from API.")
551570
else:
552571
add_time_to_events(events_list, event_type)
553-
demisto.debug(f"debug-log: {len(events_list)} events of type: {event_type} are about to be sent to XSIAM.")
572+
demisto.debug(f"{len(events_list)} events of type: {event_type} are about to be sent to XSIAM.")
554573
product = f"{PRODUCT}_{event_type}" if event_type != EVENT_TYPE_ALERTS else PRODUCT
555574
send_events_to_xsiam(events_list, vendor=VENDOR, product=product)
556-
demisto.debug(f"debug-log: {len(events)} events were sent to XSIAM.")
575+
demisto.debug(f"{len(events)} events were sent to XSIAM.")
557576
else:
558-
demisto.debug("debug-log: No new events fetched. Sending 0 to XSIAM.")
577+
demisto.debug("No new events fetched. Sending 0 to XSIAM.")
559578
send_events_to_xsiam(events=[], vendor=VENDOR, product=PRODUCT)
560579

561-
demisto.debug(f"debug-log: setting {next_run=}")
580+
demisto.debug(f"setting {next_run=}")
562581
demisto.setLastRun(next_run)
563582

564583

@@ -651,7 +670,7 @@ def main(): # pragma: no cover
651670

652671
if not last_run: # initial fetch - update last fetch time values to current time
653672
set_last_run_for_last_minute(last_run)
654-
demisto.debug("debug-log: Initial fetch - updating last fetch time value to current time for each event type.")
673+
demisto.debug("Initial fetch - updating last fetch time value to current time for each event type.")
655674

656675
if command == "armis-get-events":
657676
event_type_name = args.get("event_type")
@@ -676,7 +695,7 @@ def main(): # pragma: no cover
676695
fetch_delay=fetch_delay,
677696
)
678697
for key, value in events.items():
679-
demisto.debug(f"debug-log: {len(value)} events of type: {key} fetched from armis api")
698+
demisto.debug(f"{len(value)} events of type: {key} fetched from armis api")
680699

681700
if should_push_events:
682701
handle_fetched_events(events, next_run)

Packs/Armis/Integrations/ArmisEventCollector/ArmisEventCollector.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ script:
110110
script: '-'
111111
type: python
112112
subtype: python3
113-
dockerimage: demisto/python3:3.12.8.3296088
113+
dockerimage: demisto/python3:3.12.11.4417419
114114
marketplaces:
115115
- marketplacev2
116116
- platform

Packs/Armis/Integrations/ArmisEventCollector/ArmisEventCollector_test.py

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
from ArmisEventCollector import Client, datetime, timedelta, DemistoException, arg_to_datetime, EVENT_TYPE, EVENT_TYPES, Any
21
import pytest
2+
from ArmisEventCollector import EVENT_TYPE, EVENT_TYPES, Any, Client, DemistoException, arg_to_datetime, datetime, timedelta
33
from freezegun import freeze_time
44

55

@@ -111,6 +111,53 @@ def test_continues_fetch_by_aql_query(self, mocker, dummy_client):
111111

112112
mocked_http_request.assert_called_with(**expected_args)
113113

114+
def test_fetch_by_aql_query_pagination_timeout(self, mocker, dummy_client):
115+
"""
116+
Test fetch_by_aql_query function behavior when pagination duration exceeds the limit.
117+
118+
Given:
119+
- A fetch operation with multiple pages of results.
120+
- The time taken to fetch pages exceeds MAX_PAGINATION_DURATION_SECONDS.
121+
When:
122+
- Fetching events using fetch_by_aql_query.
123+
Then:
124+
- The fetch operation should break early.
125+
- It should return the results fetched so far.
126+
- It should return the 'next' pointer for the subsequent fetch.
127+
"""
128+
with freeze_time("2023-01-01T01:00:00") as frozen_time:
129+
first_response = {
130+
"data": {"next": 1, "results": [{"unique_id": "1", "time": "2023-01-01T01:00:10.123456+00:00"}], "total": "Many"}
131+
}
132+
133+
second_response = {
134+
"data": {"next": 2, "results": [{"unique_id": "2", "time": "2023-01-01T01:00:20.123456+00:00"}], "total": "Many"}
135+
}
136+
137+
third_response = {
138+
"data": {"next": 3, "results": [{"unique_id": "3", "time": "2023-01-01T01:00:30.123456+00:00"}], "total": "Many"}
139+
}
140+
141+
call_count = 0
142+
responses = [first_response, second_response, third_response]
143+
144+
def advance_time_and_return_response(*args, **kwargs):
145+
nonlocal call_count
146+
response = responses[call_count]
147+
call_count += 1
148+
frozen_time.tick(delta=timedelta(seconds=200))
149+
return response
150+
151+
mocked_http_request = mocker.patch.object(Client, "_http_request", side_effect=advance_time_and_return_response)
152+
153+
results, next_page = dummy_client.fetch_by_aql_query(
154+
aql_query="example_query", max_fetch=10, after=(datetime.now() - timedelta(minutes=1))
155+
)
156+
157+
assert mocked_http_request.call_count == 2
158+
assert len(results) == 2
159+
assert next_page == 2
160+
114161

115162
class TestHelperFunction:
116163
date_1 = "2023-01-01T01:00:00"
@@ -361,7 +408,7 @@ def test_events_to_command_results(self):
361408
Then:
362409
- A command result with readable output will be printed to the war-room.
363410
"""
364-
from ArmisEventCollector import CommandResults, VENDOR, PRODUCT, events_to_command_results, tableToMarkdown
411+
from ArmisEventCollector import PRODUCT, VENDOR, CommandResults, events_to_command_results, tableToMarkdown
365412

366413
events_fetched = {
367414
"events": [

Packs/Armis/ReleaseNotes/1_2_2.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
2+
#### Integrations
3+
4+
##### Armis Event Collector
5+
- Updated the Docker image to: *demisto/python3:3.12.11.4417419*.
6+
- Improved the event collection process by optimizing the pagination timeout handling.

Packs/Armis/pack_metadata.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"name": "Armis",
33
"description": "Agentless and passive security platform that sees, identifies, and classifies every device, tracks behavior, identifies threats, and takes action automatically to protect critical information and systems",
44
"support": "partner",
5-
"currentVersion": "1.2.1",
5+
"currentVersion": "1.2.2",
66
"author": "Armis Corporation",
77
"url": "https://support.armis.com/",
88
"email": "support@armis.com",

0 commit comments

Comments
 (0)