Skip to content
This repository was archived by the owner on Dec 5, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 58 additions & 4 deletions pycti/api/opencti_api_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,43 @@ class OpenCTIApiConnector:
def __init__(self, api):
self.api = api

def read(self, connector_id: str) -> Dict:
"""Reading the connector and its details

:return: return all the connector details
:rtype: dict
"""
self.api.app_logger.info("[INFO] Getting connector details ...")
query = """
query GetConnector($id: String!) {
connector(id: $id) {
id
name
active
auto
only_contextual
connector_type
connector_scope
connector_state
connector_queue_details {
messages_number
messages_size
}
updated_at
created_at
config {
listen
listen_exchange
push
push_exchange
}
built_in
}
}
"""
result = self.api.query(query, {"id": connector_id})
return result["data"]["connector"]

def list(self) -> Dict:
"""list available connectors

Expand Down Expand Up @@ -41,27 +78,44 @@ def list(self) -> Dict:
result = self.api.query(query)
return result["data"]["connectorsForWorker"]

def ping(self, connector_id: str, connector_state: Any) -> Dict:
def ping(
self, connector_id: str, connector_state: Any, connector_info: Dict
) -> Dict:
"""pings a connector by id and state

:param connector_id: the connectors id
:type connector_id: str
:param connector_state: state for the connector
:type connector_state:
:param connector_info: all details connector
:type connector_info: Dict
:return: the response pingConnector data dict
:rtype: dict
"""

query = """
mutation PingConnector($id: ID!, $state: String) {
pingConnector(id: $id, state: $state) {
mutation PingConnector($id: ID!, $state: String, $connectorInfo: ConnectorInfoInput ) {
pingConnector(id: $id, state: $state, connectorInfo: $connectorInfo) {
id
connector_state
connector_info {
run_and_terminate
buffering
queue_threshold
queue_messages_size
next_run_datetime
last_run_datetime
}
}
}
"""
result = self.api.query(
query, {"id": connector_id, "state": json.dumps(connector_state)}
query,
{
"id": connector_id,
"state": json.dumps(connector_state),
"connectorInfo": connector_info,
},
)
return result["data"]["pingConnector"]

Expand Down
Loading