Skip to content

Commit

Permalink
feat(eda): add ability to start stream from the latest event
Browse files Browse the repository at this point in the history
Fixes CrowdStrike#526
Fixes CrowdStrike#549

This option is mutually exclusive with offset, and if selected, will
allow the event stream to always start from the latest event. This is
useful for not having to manage the offset.
  • Loading branch information
carlosmmatos committed Aug 31, 2024
1 parent 8696dc3 commit 37de8a4
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 3 deletions.
3 changes: 2 additions & 1 deletion docs/crowdstrike.falcon.eventstream.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ An ansible-rulebook event source plugin for generating events from the Falcon Ev
| **stream_name**</br><font color=purple>string</font> | Label that identifies your connection.</br>**Max:** 32 alphanumeric characters (a-z, A-Z, 0-9)</br><font color=blue>**Default:** eda</font> |
| **include_event_types**</br><font color=purple>list</font> | List of event types to include. Otherwise all event types are included.</br>Refer to the [Streaming API Event Dictionary](https://falcon.crowdstrike.com/documentation/62/streaming-api-event-dictionary).</br><font color=blue>**Default:** None.</font> |
| **exclude_event_types**</br><font color=purple>list</font> | List of event types to exclude.</br>Refer to the [Streaming API Event Dictionary](https://falcon.crowdstrike.com/documentation/62/streaming-api-event-dictionary).</br><font color=blue>**Default:** None.</font> |
| **offset**</br><font color=purple>int</font> | The offset to start streaming from.</br><font color=blue>**Default:** 0.</font> |
| **offset**</br><font color=purple>int</font> | The offset to start streaming from. The default (0) is to begin the stream from the start of all events.</br>*This option is mutually exclusive with* `latest`.</br><font color=blue>**Default:** 0.</font> |
| **latest**</br><font color=purple>bool</font> | Start the stream from the newest/latest event.</br>*This option is mutually exclusive with* `offset`.</br><font color=blue>**Default:** false.</font> |
| **delay**</br><font color=purple>float</font> | Introduce a delay between each event.</br><font color=blue>**Default:** 0.</font> |

## Example Rulebook
Expand Down
17 changes: 15 additions & 2 deletions extensions/eda/plugins/event_source/eventstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
include_event_types: List of event types to filter on. Defaults.
exclude_event_types: List of event types to exclude. Default: None.
offset: The offset to start streaming from. Default: 0.
latest: Start stream at the latest event. Default: False.
delay: Introduce a delay between each event. Default: float(0).
Expand Down Expand Up @@ -233,6 +234,7 @@ def __init__(
client: AIOFalconAPI,
stream_name: str,
offset: int,
latest: bool,
include_event_types: list[str],
stream: dict,
) -> None:
Expand Down Expand Up @@ -261,6 +263,7 @@ def __init__(
self.refresh_url: str = stream["refreshActiveSessionURL"]
self.partition: str = re.findall(r"v1/(\d+)", self.refresh_url)[0]
self.offset: int = offset
self.latest: bool = latest
self.include_event_types: list[str] = include_event_types
self.epoch: int = int(time.time())
self.refresh_interval: int = int(stream["refreshActiveSessionInterval"])
Expand Down Expand Up @@ -322,7 +325,10 @@ async def open_stream(self: "Stream") -> aiohttp.ClientResponse:
if not self.include_event_types
else f"&eventType={','.join(self.include_event_types)}"
)
offset_filter = f"&offset={self.offset}"
if self.latest:
offset_filter = "&whence=2"
else:
offset_filter = f"&offset={self.offset}"

kwargs = {
"url": f"{self.data_feed}{offset_filter}{event_type_filter}",
Expand All @@ -340,6 +346,7 @@ async def open_stream(self: "Stream") -> aiohttp.ClientResponse:
self.stream_name,
self.partition,
)
logger.debug("Stream URL: %s", kwargs["url"])
return self.spigot

async def stream_events(
Expand Down Expand Up @@ -439,6 +446,7 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None:
falcon_cloud: str = str(args.get("falcon_cloud", "us-1"))
stream_name: str = str(args.get("stream_name", "eda")).lower()
offset: int = int(args.get("offset", 0))
latest: bool = bool(args.get("latest", False))
delay: float = float(args.get("delay", 0))
include_event_types: list[str] = list(args.get("include_event_types", []))
exclude_event_types: list[str] = list(args.get("exclude_event_types", []))
Expand All @@ -447,6 +455,11 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None:
msg = f"Invalid falcon_cloud: {falcon_cloud}, must be one of {list(REGIONS.keys())}"
raise ValueError(msg)

# Offset and latest are mutually exclusive
if offset > 0 and latest:
msg = "offset and latest are mutually exclusive arguments."
raise ValueError(msg)

falcon = AIOFalconAPI(
client_id=falcon_client_id,
client_secret=falcon_client_secret,
Expand All @@ -463,7 +476,7 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None:
return

streams: list[Stream] = [
Stream(falcon, stream_name, offset, include_event_types, stream)
Stream(falcon, stream_name, offset, latest, include_event_types, stream)
for stream in available_streams["resources"]
]

Expand Down

0 comments on commit 37de8a4

Please sign in to comment.