Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion src/falconpy/_error/_warnings.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,45 @@ class SSLDisabledWarning(SDKWarning):


class NoContentWarning(SDKWarning):
"""No content was received."""
"""No content was received or the content could not be parsed.

When non-JSON text content is received from the API, the original
response body is preserved in this warning so callers can still
access the raw error text (Issue #1154).
"""

_code = 204
_message = "No content was received for this request."

def __init__(self,
code: int = None,
message: str = None,
headers: dict = None,
response_body: str = None,
content_type: str = None
):
"""Construct an instance of the warning.

Keyword arguments:
response_body -- The raw text body received from the API.
content_type -- The Content-Type header from the response.
"""
self._response_body = response_body
self._content_type = content_type
if response_body and not message:
message = response_body.strip()
super().__init__(code=code, message=message, headers=headers)

@property
def response_body(self) -> str:
"""Return the raw response body text, if available."""
return self._response_body

@property
def content_type(self) -> str:
"""Return the Content-Type of the original response."""
return self._content_type


class NoAuthenticationMechanism(SDKWarning):
"""No authentication mechanism was specified when creating this class."""
Expand Down
21 changes: 21 additions & 0 deletions src/falconpy/_result/_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,19 @@ def __init__(self,
self.headers = Headers(_headers)
self._parse_body(body_rcv=body)

@staticmethod
def _is_text_error_body(body: dict) -> bool:
"""Detect a structured text error body from _build_text_error_body.

These bodies contain only "errors" and "resources" keys with no
"meta" or "access_token", indicating they were generated from a
non-JSON text response rather than returned directly by the API.
"""
if not isinstance(body, dict):
return False
keys = set(body.keys())
return keys == {"errors", "resources"} and "meta" not in keys

def _parse_body(self, body_rcv: Dict[str, Union[str, dict, list, int, float, bytes]]):
if isinstance(body_rcv, list):
# Specific to report_executions_download_get returning raw
Expand All @@ -97,6 +110,14 @@ def _parse_body(self, body_rcv: Dict[str, Union[str, dict, list, int, float, byt
self.meta = Meta()
self.resources = Resources()
self.errors = Errors()
elif self._is_text_error_body(body_rcv):
# Structured text error body produced by _build_text_error_body.
# Contains only "errors" and "resources" keys (no "meta" or
# "access_token"). Route through the standard Meta / Errors /
# Resources path so downstream consumers work uniformly.
self.meta = Meta()
self.errors = Errors(body_rcv.get("errors", []))
self.resources = Resources(body_rcv.get("resources", []))
elif body_rcv.get("access_token", None):
# Authentication response
self.raw = RawBody(body_rcv)
Expand Down
3 changes: 2 additions & 1 deletion src/falconpy/_util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
deprecated_operation,
deprecated_class,
params_to_keywords,
_build_text_error_body,
_ALLOWED_METHODS
)
from ._service import service_override_payload
Expand All @@ -79,5 +80,5 @@
"_ALLOWED_METHODS", "login_payloads", "logout_payloads", "sanitize_dictionary",
"calc_content_return", "log_class_startup", "service_override_payload",
"deprecated_operation", "deprecated_class", "review_provided_credentials",
"params_to_keywords"
"params_to_keywords", "_build_text_error_body"
]
40 changes: 35 additions & 5 deletions src/falconpy/_util/_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,20 @@ def service_request(caller: ServiceClass = None, **kwargs) -> Union[Dict[str, Un
)


def _build_text_error_body(text: str, status_code: int) -> dict:
"""Build a standard error body from a non-JSON text response.

When the API returns a text/plain or text/html response that is not
valid JSON, wrap the raw text in the standard CrowdStrike error
format so downstream code (error logging, pythonic mode, Result
parsing) behaves consistently.
"""
return {
"errors": [{"code": status_code, "message": text.strip()}],
"resources": []
}


# pylint: disable=R0912 # I don't disagree, but this will work for now.
def calc_content_return(resp: requests.Response,
contain: bool,
Expand Down Expand Up @@ -289,11 +303,25 @@ def calc_content_return(resp: requests.Response,
head_request=bool(api_method == "HEAD")
).full_return
elif returned_content_type.startswith("text/plain"):
# Assuming UTF-8 for now
returned = Result(resp.status_code,
resp.headers,
loads(resp.content.decode("utf-8"))
).full_return
# Attempt to parse as JSON first for backward compatibility
# with text/plain responses that carry valid JSON payloads.
# Fall back to wrapping raw text in a standard error body
# so the actual message is surfaced to the caller (Issue #1154).
text_body = resp.content.decode("utf-8")
try:
json_body = loads(text_body)
except (JSONDecodeError, ValueError):
json_body = _build_text_error_body(text_body, resp.status_code)
returned = Result(resp.status_code, resp.headers, json_body).full_return
elif returned_content_type.startswith("text/html"):
# Some proxy and WAF error pages arrive as HTML.
# Surface the raw HTML in the standard error format.
html_body = resp.content.decode("utf-8", errors="replace")
returned = Result(
resp.status_code,
resp.headers,
_build_text_error_body(html_body, resp.status_code)
).full_return
elif contain:
returned = Result(resp.status_code, resp.headers, resp.json()).full_return
else:
Expand Down Expand Up @@ -527,6 +555,8 @@ def log_api_activity(content_return: Union[dict, bytes], content_type: str, api:
api.log_util.debug("RESULT: %s", content_return)
elif content_type.startswith("text/plain"):
api.log_util.debug("RESULT: %s", content_return)
elif content_type.startswith("text/html"):
api.log_util.debug("RESULT: %s", content_return)
else:
api.log_util.debug("RESULT: binary response received from API")

Expand Down
10 changes: 9 additions & 1 deletion src/falconpy/api_complete/_legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
args_to_params,
return_preferred_default,
autodiscover_region,
_build_text_error_body,
)
from .._enum import BaseURL, ContainerBaseURL, TokenFailReason
from .._constant import PREFER_IDS_IN_BODY, MOCK_OPERATIONS
Expand Down Expand Up @@ -220,7 +221,14 @@ def authenticate(self: object) -> bool:
self._token_fail_headers = result["headers"]
if "errors" in result["body"]:
if result["body"]["errors"]:
self.token_fail_reason = result["body"]["errors"][0]["message"]
err = result["body"]["errors"][0]
# Text error bodies from _build_text_error_body
# use a "code" key alongside "message".
if isinstance(err, dict):
self.token_fail_reason = err.get("message",
str(err))
else:
self.token_fail_reason = str(err)
else: # pragma: no cover
self.authenticated = False
self.token_fail_reason = TokenFailReason["UNEXPECTED"].value
Expand Down
Loading