Skip to content

Commit

Permalink
[worker] Introduce new mecanic to split bundle and requeue (#8020)
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-julien authored and SamuelHassine committed Aug 19, 2024
1 parent de04476 commit 84120d6
Showing 1 changed file with 42 additions and 5 deletions.
47 changes: 42 additions & 5 deletions opencti-worker/src/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from pika.adapters.blocking_connection import BlockingChannel
from prometheus_client import start_http_server
from pycti import OpenCTIApiClient
from pycti import OpenCTIApiClient, OpenCTIStix2Splitter
from pycti.connector.opencti_connector_helper import (
create_mq_ssl_context,
get_config_variable,
Expand Down Expand Up @@ -229,10 +229,47 @@ def data_handler( # pylint: disable=too-many-statements, too-many-locals
try:
if event_type == "bundle":
content = base64.b64decode(data["content"]).decode("utf-8")
update = data["update"] if "update" in data else False
imported_items = self.api.stix2.import_bundle_from_json(
content, update, types, work_id
)
content_json = json.loads(content)
if "objects" not in content_json or len(content_json["objects"]) == 0:
raise ValueError("JSON data type is not a STIX2 bundle")
if len(content_json["objects"]) == 1:
update = data["update"] if "update" in data else False
imported_items = self.api.stix2.import_bundle_from_json(
content, update, types, work_id
)
else:
# As bundle is received as complete, split and requeue
# Create a specific channel to push the split bundles
push_pika_connection = pika.BlockingConnection(self.pika_parameters)
push_channel = push_pika_connection.channel()
try:
push_channel.confirm_delivery()
except Exception as err: # pylint: disable=broad-except
self.worker_logger.warning(str(err))
# Instance spliter and split the big bundle
event_version = (
content_json["x_opencti_event_version"]
if "x_opencti_event_version" in content_json
else None
)
stix2_splitter = OpenCTIStix2Splitter()
bundles = stix2_splitter.split_bundle(content_json, False, event_version)
# For each split bundle, send it to the same queue
for bundle in bundles:
text_bundle = json.dumps(bundle)
data["content"] = base64.b64encode(text_bundle.encode("utf-8", "escape")).decode(
"utf-8"
)
push_channel.basic_publish(
exchange=self.connector["config"]["push_exchange"],
routing_key=self.connector["config"]["push_routing"],
body=json.dumps(data),
properties=pika.BasicProperties(
delivery_mode=2, content_encoding="utf-8" # make message persistent
),
)
push_channel.close()

elif event_type == "event":
event = base64.b64decode(data["content"]).decode("utf-8")
event_content = json.loads(event)
Expand Down

0 comments on commit 84120d6

Please sign in to comment.