@@ -776,6 +776,9 @@ def __init__(self, config: Dict, playbook_compatible=False) -> None:
776776 self .connect_id = get_config_variable (
777777 "CONNECTOR_ID" , ["connector" , "id" ], config
778778 )
779+ self .queue_protocol = get_config_variable (
780+ "QUEUE_PROTOCOL" , ["connector" , "queue_protocol" ], config , default = "amqp"
781+ )
779782 self .connect_type = get_config_variable (
780783 "CONNECTOR_TYPE" , ["connector" , "type" ], config
781784 )
@@ -994,7 +997,6 @@ def __init__(self, config: Dict, playbook_compatible=False) -> None:
994997
995998 # Start ping thread
996999 if not self .connect_run_and_terminate :
997-
9981000 is_run_and_terminate = False
9991001 if self .connect_duration_period == 0 :
10001002 is_run_and_terminate = True
@@ -1689,10 +1691,11 @@ def send_stix2_bundle(self, bundle: str, **kwargs) -> list:
16891691 expectations_number = len (json .loads (bundle )["objects" ])
16901692 else :
16911693 stix2_splitter = OpenCTIStix2Splitter ()
1692- expectations_number , bundles = (
1693- stix2_splitter .split_bundle_with_expectations (
1694- bundle , True , event_version
1695- )
1694+ (
1695+ expectations_number ,
1696+ bundles ,
1697+ ) = stix2_splitter .split_bundle_with_expectations (
1698+ bundle , True , event_version
16961699 )
16971700
16981701 if len (bundles ) == 0 :
@@ -1704,42 +1707,53 @@ def send_stix2_bundle(self, bundle: str, **kwargs) -> list:
17041707 self .api .work .add_expectations (work_id , expectations_number )
17051708 if entities_types is None :
17061709 entities_types = []
1707- pika_credentials = pika .PlainCredentials (
1708- self .connector_config ["connection" ]["user" ],
1709- self .connector_config ["connection" ]["pass" ],
1710- )
1711- pika_parameters = pika .ConnectionParameters (
1712- host = self .connector_config ["connection" ]["host" ],
1713- port = self .connector_config ["connection" ]["port" ],
1714- virtual_host = self .connector_config ["connection" ]["vhost" ],
1715- credentials = pika_credentials ,
1716- ssl_options = (
1717- pika .SSLOptions (
1718- create_mq_ssl_context (self .config ),
1719- self .connector_config ["connection" ]["host" ],
1710+ if self .queue_protocol == "amqp" :
1711+ pika_credentials = pika .PlainCredentials (
1712+ self .connector_config ["connection" ]["user" ],
1713+ self .connector_config ["connection" ]["pass" ],
1714+ )
1715+ pika_parameters = pika .ConnectionParameters (
1716+ host = self .connector_config ["connection" ]["host" ],
1717+ port = self .connector_config ["connection" ]["port" ],
1718+ virtual_host = self .connector_config ["connection" ]["vhost" ],
1719+ credentials = pika_credentials ,
1720+ ssl_options = (
1721+ pika .SSLOptions (
1722+ create_mq_ssl_context (self .config ),
1723+ self .connector_config ["connection" ]["host" ],
1724+ )
1725+ if self .connector_config ["connection" ]["use_ssl" ]
1726+ else None
1727+ ),
1728+ )
1729+ pika_connection = pika .BlockingConnection (pika_parameters )
1730+ channel = pika_connection .channel ()
1731+ try :
1732+ channel .confirm_delivery ()
1733+ except Exception as err : # pylint: disable=broad-except
1734+ self .connector_logger .warning (str (err ))
1735+ self .connector_logger .info (
1736+ self .connect_name + " sending bundle to queue"
1737+ )
1738+ for sequence , bundle in enumerate (bundles , start = 1 ):
1739+ self ._send_bundle (
1740+ channel ,
1741+ bundle ,
1742+ work_id = work_id ,
1743+ entities_types = entities_types ,
1744+ sequence = sequence ,
1745+ update = update ,
17201746 )
1721- if self .connector_config ["connection" ]["use_ssl" ]
1722- else None
1723- ),
1724- )
1725- pika_connection = pika .BlockingConnection (pika_parameters )
1726- channel = pika_connection .channel ()
1727- try :
1728- channel .confirm_delivery ()
1729- except Exception as err : # pylint: disable=broad-except
1730- self .connector_logger .warning (str (err ))
1731- self .connector_logger .info (self .connect_name + " sending bundle to queue" )
1732- for sequence , bundle in enumerate (bundles , start = 1 ):
1733- self ._send_bundle (
1734- channel ,
1735- bundle ,
1736- work_id = work_id ,
1737- entities_types = entities_types ,
1738- sequence = sequence ,
1739- update = update ,
1747+ channel .close ()
1748+ pika_connection .close ()
1749+ elif self .queue_protocol == "api" :
1750+ self .api .send_bundle_to_api (
1751+ connector_id = self .connector_id , bundle = bundle
1752+ )
1753+ else :
1754+ raise ValueError (
1755+ f"{ self .queue_protocol } : this queue protocol is not supported"
17401756 )
1741- channel .close ()
1742- pika_connection .close ()
17431757
17441758 return bundles
17451759
0 commit comments