Skip to content

Cyberwatch integration 1.0.4 #40695

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
359 changes: 359 additions & 0 deletions Packs/Cyberwatch/Integrations/Cyberwatch/Cyberwatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,126 @@ def get_one_asset(self, params, namespace="vulnerabilities"):

return response

def get_sysadmin_assets(self, params):
"""
Retrieve servers from the /assets/servers route (a.k.a. \"Sysadmin\" view).
Behaviour (pagination, logging) intentionally mirrors get_assets() but
returns the raw payload from the /api/v3/assets/servers endpoint without
any field normalization.
"""
path = "/api/v3/assets/servers"

# if a page is given in params, we will fetch only the provided page
if "page" in params:
response = self._http_request(method="GET", url_suffix=path, params=params)
# if no page is given, we will fetch all the assets
else:
demisto.info('Fetching all Sysadmin Assets by default. You can override this by specifying the "page" parameter')
response = []
curr_page = 1
params["page"] = curr_page
raw_response = self._http_request(
method="GET",
url_suffix=path,
resp_type="response", # used to get the headers of the response
params=params,
)
response += raw_response.json()
paginate_objperpage = int(raw_response.headers.get("x-per-page", len(response) or 1))
paginate_total_results = int(raw_response.headers.get("x-total", len(response)))
while curr_page * paginate_objperpage < paginate_total_results:
curr_page += 1
params["page"] = curr_page
raw_response = self._http_request(
method="GET",
url_suffix=path,
resp_type="response",
params=params,
)
response += raw_response.json()

return response

def get_sysadmin_one_asset(self, params):
"""
Retrieve a single server from the /assets/servers/<ID> route (Sysadmin view).
Expects: params['id'] (int).
"""

path = f"/api/v3/assets/servers/{str(params.get('id'))}"
response = self._http_request(method="GET", url_suffix=path, params=params)
return response

def get_compliance_assets(self, params):
"""
Retrieve assets from the /compliance/assets route (Compliance view).
Behaviour mirrors get_assets() but returns native /compliance payloads.
"""
path = "/api/v3/compliance/assets"

# if a page is given in params, we will fetch only the provided page
if "page" in params:
response = self._http_request(method="GET", url_suffix=path, params=params)
# if no page is given, we will fetch all the assets
else:
demisto.info('Fetching all Compliance Assets by default. You can override this by specifying the "page" parameter')
response = []
curr_page = 1
params["page"] = curr_page
raw_response = self._http_request(
method="GET",
url_suffix=path,
resp_type="response",
params=params,
)
response += raw_response.json()
paginate_objperpage = int(raw_response.headers.get("x-per-page", len(response) or 1))
paginate_total_results = int(raw_response.headers.get("x-total", len(response)))
while curr_page * paginate_objperpage < paginate_total_results:
curr_page += 1
params["page"] = curr_page
raw_response = self._http_request(
method="GET",
url_suffix=path,
resp_type="response",
params=params,
)
response += raw_response.json()

return response

def get_compliance_one_asset(self, params):
"""
Retrieve a single asset from the /compliance/servers/<ID> route (Compliance view).
Expects: params['id'] (int).
Returns: JSON payload from the /api/v3/compliance/servers/{id} endpoint.
"""

path = f"/api/v3/compliance/servers/{params.get('id')}"
response = self._http_request(method="GET", url_suffix=path, params=params)
return response

def upload_declarative_data(self, output_text: str, timeout: int = 90):
"""
Upload Declarative Data (airgap scan result text) via v2 endpoint.
Mirrors the official CLI behaviour.
"""
path = "/api/v2/cbw_scans/scripts"
return self._http_request(
method="POST",
url_suffix=path,
json_data={"output": output_text},
timeout=timeout,
)

def get_declarative_data(self, server_id: int):
"""
Read Declarative Data for a server (GET /api/v3/servers/{id}/info).
Returns the raw text blob.
"""
path = f"/api/v3/servers/{server_id}/info"
return self._http_request(method="GET", url_suffix=path, resp_type="text")

def get_security_issues(self, params):
"""
Send the request for list_security_issues_command.
Expand Down Expand Up @@ -589,6 +709,87 @@ def test_module(client: Client) -> str:
""" HELPERS """


def _build_declarative_payload(hostname: str, data_json: str) -> str:
"""
Build a minimal Declarative Data text blob from hostname + JSON string.
Example:
hostname=Test
data_json='{"metadata": "metadate", "foo": "bar"}'
=>
HOSTNAME:Test
METADATA:metadate
FOO:bar
"""
try:
extra = json.loads(data_json) if data_json else {}
except Exception as e:
raise DemistoException(f"Invalid JSON in 'data' argument: {e!s}")

lines = [f"HOSTNAME:{hostname}"]
for k, v in extra.items():
# simple upper-case key mapping
lines.append(f"{str(k).upper()}:{v}")
return "\n".join(lines) + "\n"


def send_declarative_data_asset_command(client: Client, args: Dict[str, Any]):
"""
Upload Declarative Data to Cyberwatch.
Required args: hostname (str), data (JSON string)
"""
hostname: str = cast(str, args["hostname"])

data_json: str = str(args.get("data", "{}"))

# Pre‑check to avoid creating a new server
matches = client.get_assets({"hostname": hostname, "per_page": 1, "page": 1})
if not matches:
raise DemistoException(f"Hostname '{hostname}' not found in Cyberwatch. Upload cancelled to avoid auto creation.")
server_id_known: int = matches[0]["id"]

blob = _build_declarative_payload(hostname, data_json)
result: dict[str, Any] = client.upload_declarative_data(blob) or {}
result.setdefault("matched_server_id", server_id_known)

readable = {
"hostname": hostname,
"server_id": result.get("server_id"),
"matched_server_id": server_id_known,
"status": result.get("status") or "submitted",
"message": result.get("message") or "",
}

return CommandResults(
outputs=result,
outputs_prefix="Cyberwatch.DeclarativeDataUpload",
raw_response=result,
readable_output=tableToMarkdown(
"Cyberwatch Declarative Data Upload",
readable,
["hostname", "server_id", "matched_server_id", "status", "message"],
removeNull=True,
),
)


def get_declarative_data_asset_command(client: Client, args: Dict[str, Any]):
"""
Retrieve Declarative Data for a server.
Required: id
"""
server_id: int = int(args["id"])
raw_text = client.get_declarative_data(int(server_id)) # this is a string now

readable_output = f"### Cyberwatch Declarative Data (server {server_id})\n```\n{raw_text}\n```"

return CommandResults(
outputs={"id": int(server_id), "raw": raw_text},
outputs_prefix="Cyberwatch.DeclarativeData",
raw_response=raw_text,
readable_output=readable_output,
)


def iso8601_to_human(iso8601_str, default_value=""):
"""
Convert ISO8601 string to human readable date time.
Expand Down Expand Up @@ -1189,6 +1390,158 @@ def fetch_security_issue_command(client: Client, args: Dict[str, Any]):
)


def list_sysadmin_assets_command(client: Client, args: Dict[str, Any]):
assets = client.get_sysadmin_assets(args)

if len(assets) == 0:
return_results("No Sysadmin assets found.")

# minimal readable table: id, hostname, last_communication, category
readable_headers = ["id", "hostname", "last_communication", "category"]
readable_assets = [
{
"id": str(a.get("id")),
"hostname": a.get("hostname"),
"last_communication": iso8601_to_human(a.get("last_communication")),
"category": a.get("category"),
}
for a in assets
]

return CommandResults(
outputs=createContext(assets, removeNull=True),
outputs_prefix="Cyberwatch.SysadminAsset",
raw_response=assets,
outputs_key_field="id",
readable_output=tableToMarkdown(
"Cyberwatch Sysadmin Assets",
readable_assets,
readable_headers,
removeNull=False,
is_auto_json_transform=True,
date_fields=["last_communication"],
),
)


def fetch_sysadmin_asset_command(client: Client, args: Dict[str, Any]):
asset = client.get_sysadmin_one_asset(args)

if len(asset) == 0:
return_results(f"Sysadmin asset with {args.get('id')} not found.")

readable_headers = [
"id",
"hostname",
"description",
"last_communication",
"category",
"deploying_period_id",
"rebooting_period_id",
"policy_id",
"ignoring_policy_id",
]
readable_asset = {
"id": str(asset.get("id")),
"hostname": asset.get("hostname"),
"description": str(asset.get("description")),
"last_communication": iso8601_to_human(asset.get("last_communication")),
"category": str(asset.get("category")),
"deploying_period_id": str(asset.get("deploying_period_id")),
"rebooting_period_id": str(asset.get("rebooting_period_id")),
"policy_id": str(asset.get("policy_id")),
"ignoring_policy_id": str(asset.get("ignoring_policy_id")),
}

return CommandResults(
outputs=createContext(asset, removeNull=False),
outputs_prefix="Cyberwatch.SysadminAsset",
raw_response=asset,
outputs_key_field="id",
readable_output=tableToMarkdown(
"Cyberwatch Sysadmin Asset",
readable_asset,
readable_headers,
removeNull=False,
is_auto_json_transform=True,
date_fields=["last_communication"],
),
)


def list_compliance_assets_command(client: Client, args: Dict[str, Any]):
assets = client.get_compliance_assets(args)

if len(assets) == 0:
return_results("No Compliance assets found.")

readable_headers = ["id", "hostname", "status", "compliance_rules_failed_count", "compliance_rules_succeed_count"]
readable_assets = [
{
"id": str(a.get("id")),
"hostname": a.get("hostname"),
"status": a.get("status"),
"compliance_rules_failed_count": str(a.get("compliance_rules_failed_count")),
"compliance_rules_succeed_count": str(a.get("compliance_rules_succeed_count")),
}
for a in assets
]

return CommandResults(
outputs=createContext(assets, removeNull=True),
outputs_prefix="Cyberwatch.ComplianceAsset",
raw_response=assets,
outputs_key_field="id",
readable_output=tableToMarkdown(
"Cyberwatch Compliance Assets",
readable_assets,
readable_headers,
removeNull=False,
is_auto_json_transform=True,
),
)


def fetch_compliance_asset_command(client: Client, args: Dict[str, Any]):
asset = client.get_compliance_one_asset(args)

if len(asset) == 0:
return_results(f"Compliance asset with {args.get('id')} not found.")

readable_headers = [
"id",
"hostname",
"status",
"compliance_rules_count",
"compliance_rules_failed_count",
"compliance_rules_succeed_count",
"compliance_repositories",
]
readable_asset = {
"id": str(asset.get("id")),
"hostname": asset.get("hostname"),
"status": asset.get("status"),
"compliance_rules_count": str(asset.get("compliance_rules_count")),
"compliance_rules_failed_count": str(asset.get("compliance_rules_failed_count")),
"compliance_rules_succeed_count": str(asset.get("compliance_rules_succeed_count")),
"compliance_repositories": [c.get("name") for c in asset.get("compliance_repositories", [])],
}

return CommandResults(
outputs=createContext(asset, removeNull=False),
outputs_prefix="Cyberwatch.ComplianceAsset",
raw_response=asset,
outputs_key_field="id",
readable_output=tableToMarkdown(
"Cyberwatch Compliance Asset",
readable_asset,
readable_headers,
removeNull=False,
is_auto_json_transform=True,
),
)


def main(): # pragma: no cover
params = demisto.params()
command = demisto.command()
Expand All @@ -1215,6 +1568,12 @@ def main(): # pragma: no cover
"cyberwatch-fetch-cve": fetch_cve_command,
"cyberwatch-list-securityissues": list_security_issues_command,
"cyberwatch-fetch-securityissue": fetch_security_issue_command,
"cyberwatch-list-sysadmin-assets": list_sysadmin_assets_command,
"cyberwatch-fetch-sysadmin-asset": fetch_sysadmin_asset_command,
"cyberwatch-list-compliance-assets": list_compliance_assets_command,
"cyberwatch-fetch-compliance-asset": fetch_compliance_asset_command,
"cyberwatch-send-declarative-data-asset": send_declarative_data_asset_command,
"cyberwatch-get-declarative-data-asset": get_declarative_data_asset_command,
}

try:
Expand Down
Loading
Loading