Skip to content

Commit

Permalink
feat(eda): add ability to start stream from the latest event (#552)
Browse files Browse the repository at this point in the history
* fix(eda): fixed refresh stream timeout error

Added additional buffer time to the aiohttp.ClientTimeout to not collide
with the same timeout interval set for refreshing the stream.

Also added some better exception handling surrounding potential timeout
issues.

* feat(eda): add ability to start stream from the latest event

Fixes #526
Fixes #549

This option is mutually exclusive with offset, and if selected, will
allow the event stream to always start from the latest event. This is
useful for not having to manage the offset.

* chore: add changelog fragment

* lint(eda): updated tox and fixed some lint issues

* fix(eda): issue with timeout and session

* chore(eda): updated offset and latest logic + docs
  • Loading branch information
carlosmmatos authored Sep 1, 2024
1 parent f8ee0d8 commit e895fc1
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 19 deletions.
5 changes: 5 additions & 0 deletions changelogs/fragments/eda-updates.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
minor_changes:
- eventsource - add support for starting stream from latest event (https://github.com/CrowdStrike/ansible_collection_falcon/pull/552)

bugfixes:
- eventsource - fix issue with refreshinterval causing timeout (https://github.com/CrowdStrike/ansible_collection_falcon/pull/552)
6 changes: 4 additions & 2 deletions docs/crowdstrike.falcon.eventstream.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ An ansible-rulebook event source plugin for generating events from the Falcon Ev
| **stream_name**</br><font color=purple>string</font> | Label that identifies your connection.</br>**Max:** 32 alphanumeric characters (a-z, A-Z, 0-9)</br><font color=blue>**Default:** eda</font> |
| **include_event_types**</br><font color=purple>list</font> | List of event types to include. Otherwise all event types are included.</br>Refer to the [Streaming API Event Dictionary](https://falcon.crowdstrike.com/documentation/62/streaming-api-event-dictionary).</br><font color=blue>**Default:** None.</font> |
| **exclude_event_types**</br><font color=purple>list</font> | List of event types to exclude.</br>Refer to the [Streaming API Event Dictionary](https://falcon.crowdstrike.com/documentation/62/streaming-api-event-dictionary).</br><font color=blue>**Default:** None.</font> |
| **offset**</br><font color=purple>int</font> | The offset to start streaming from.</br><font color=blue>**Default:** 0.</font> |
| **offset**</br><font color=purple>int</font> | Specifies where in the event stream you want to being processing. This is useful if you have a mechanism to track the latest offset processed.</br>*This option is mutually exclusive with* `latest`. </br><font color=blue>**Default:** None.</font> |
| **latest**</br><font color=purple>bool</font> | Start the stream from the latest event. By default, if `offset` is not set, the stream will start from the beginning of all events.</br>*This option is mutually exclusive with* `offset`.</br><font color=blue>**Default:** false.</font> |
| **delay**</br><font color=purple>float</font> | Introduce a delay between each event.</br><font color=blue>**Default:** 0.</font> |

## Example Rulebook
Expand All @@ -37,7 +38,8 @@ An ansible-rulebook event source plugin for generating events from the Falcon Ev
falcon_client_id: "{{ FALCON_CLIENT_ID }}"
falcon_client_secret: "{{ FALCON_CLIENT_SECRET }}"
falcon_cloud: "us-2"
# offset: 12345
# start stream from specified offset
offset: 12345
stream_name: "eda-example"
include_event_types:
- "DetectionSummaryEvent"
Expand Down
46 changes: 30 additions & 16 deletions extensions/eda/plugins/event_source/eventstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
Max: 32 alphanumeric characters. Default: eda
include_event_types: List of event types to filter on. Defaults.
exclude_event_types: List of event types to exclude. Default: None.
offset: The offset to start streaming from. Default: 0.
offset: The offset to start streaming from. Default: None.
latest: Start stream at the latest event. Default: False.
delay: Introduce a delay between each event. Default: float(0).
Expand Down Expand Up @@ -233,6 +234,7 @@ def __init__(
client: AIOFalconAPI,
stream_name: str,
offset: int,
latest: bool,
include_event_types: list[str],
stream: dict,
) -> None:
Expand All @@ -246,6 +248,8 @@ def __init__(
A label identifying the connection.
offset: int
The offset to start streaming from.
latest: bool
Start stream at the latest event.
include_event_types: List[str]
A list of event types to filter on.
stream: dict
Expand All @@ -254,13 +258,14 @@ def __init__(
"""
logger.info("Initializing Stream: %s", stream_name)
self.client: AIOFalconAPI = client
self.session: aiohttp.ClientSession = client.session
self.stream_name: str = stream_name
self.data_feed: str = stream["dataFeedURL"]
self.token: str = stream["sessionToken"]["token"]
self.token_expires: str = stream["sessionToken"]["expiration"]
self.refresh_url: str = stream["refreshActiveSessionURL"]
self.partition: str = re.findall(r"v1/(\d+)", self.refresh_url)[0]
self.offset: int = offset
self.offset: int = offset if offset else 0
self.latest: bool = latest
self.include_event_types: list[str] = include_event_types
self.epoch: int = int(time.time())
self.refresh_interval: int = int(stream["refreshActiveSessionInterval"])
Expand Down Expand Up @@ -322,24 +327,24 @@ async def open_stream(self: "Stream") -> aiohttp.ClientResponse:
if not self.include_event_types
else f"&eventType={','.join(self.include_event_types)}"
)
offset_filter = f"&offset={self.offset}"
offset_filter = "&whence=2" if self.latest else f"&offset={self.offset}"

kwargs = {
"url": f"{self.data_feed}{offset_filter}{event_type_filter}",
"headers": {
"Authorization": f"Token {self.token}",
},
"raise_for_status": True,
"timeout": aiohttp.ClientTimeout(total=float(self.refresh_interval)),
"timeout": aiohttp.ClientTimeout(total=None),
}

session = aiohttp.ClientSession()
self.spigot: aiohttp.ClientResponse = await session.get(**kwargs)
self.spigot: aiohttp.ClientResponse = await self.session.get(**kwargs)
logger.info(
"Successfully opened stream %s:%s",
self.stream_name,
self.partition,
)
logger.debug("Stream URL: %s", kwargs["url"])
return self.spigot

async def stream_events(
Expand Down Expand Up @@ -408,13 +413,11 @@ def is_valid_event(
Returns
-------
bool
Returns False if the event_type is in the exclude_event_types list,
otherwise returns True.
Returns True if the event_type is not in the exclude_event_types list,
otherwise returns False.
"""
if event_type in exclude_event_types:
return False
return True
return event_type not in exclude_event_types


# pylint: disable=too-many-locals
Expand All @@ -438,7 +441,8 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None:
falcon_client_secret: str = str(args.get("falcon_client_secret"))
falcon_cloud: str = str(args.get("falcon_cloud", "us-1"))
stream_name: str = str(args.get("stream_name", "eda")).lower()
offset: int = int(args.get("offset", 0))
offset: int = int(args.get("offset"))
latest: bool = bool(args.get("latest", False))
delay: float = float(args.get("delay", 0))
include_event_types: list[str] = list(args.get("include_event_types", []))
exclude_event_types: list[str] = list(args.get("exclude_event_types", []))
Expand All @@ -447,6 +451,11 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None:
msg = f"Invalid falcon_cloud: {falcon_cloud}, must be one of {list(REGIONS.keys())}"
raise ValueError(msg)

# Offset and latest are mutually exclusive
if offset and latest:
msg = "'offset' and 'latest' are mutually exclusive parameters."
raise ValueError(msg)

falcon = AIOFalconAPI(
client_id=falcon_client_id,
client_secret=falcon_client_secret,
Expand All @@ -463,7 +472,7 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None:
return

streams: list[Stream] = [
Stream(falcon, stream_name, offset, include_event_types, stream)
Stream(falcon, stream_name, offset, latest, include_event_types, stream)
for stream in available_streams["resources"]
]

Expand All @@ -473,15 +482,20 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None:
async for event in stream.stream_events(exclude_event_types):
await queue.put(event)
await asyncio.sleep(delay)
except asyncio.TimeoutError:
logger.exception("Timeout occurred while streaming events.")
except aiohttp.ClientError:
logger.exception("Client error occurred while streaming events.")
except Exception: # pylint: disable=broad-except
logger.exception("Uncaught Plugin Task Error")
logger.exception("Uncaught Plugin Task Error.")
else:
logger.info("All streams processed successfully.")
finally:
logger.info("Plugin Task Finished..cleaning up")
# Close the stream and API session outside the loop
for stream in streams:
await stream.spigot.close()
if stream.spigot:
await stream.spigot.close()
await falcon.close()


Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ commands =
[testenv:ruff]
deps = ruff
commands =
bash -c 'ruff check --exclude .tox --select ALL --ignore INP001,FA102,UP001,UP010,I001,FA100,PLR0913,E501 -q extensions/eda/plugins'
bash -c 'ruff check --exclude .tox --select ALL --ignore INP001,FA102,UP001,UP010,I001,FA100,PLR0913,E501,FBT001,C901 -q extensions/eda/plugins'

[testenv:darglint]
deps = darglint
Expand Down

0 comments on commit e895fc1

Please sign in to comment.