Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 186 additions & 0 deletions pycti/api/opencti_api_client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
# coding: utf-8
import atexit
import base64
import datetime
import io
import json
import os
import shutil
import signal
import tempfile
import threading
from typing import Dict, Tuple, Union

import magic
Expand Down Expand Up @@ -78,6 +84,12 @@
from pycti.utils.opencti_stix2 import OpenCTIStix2
from pycti.utils.opencti_stix2_utils import OpenCTIStix2Utils

# Global singleton variables for proxy certificate management
_PROXY_CERT_BUNDLE = None
_PROXY_CERT_DIR = None
_PROXY_CERT_LOCK = threading.Lock()
_PROXY_SIGNAL_HANDLERS_REGISTERED = False


def build_request_headers(token: str, custom_headers: str, app_logger):
headers_dict = {
Expand Down Expand Up @@ -166,6 +178,9 @@ def __init__(
self.app_logger = self.logger_class("api")
self.admin_logger = self.logger_class("admin")

# Setup proxy certificates if provided
self._setup_proxy_certificates()

# Define API
self.api_token = token
self.api_url = url + "/graphql"
Expand Down Expand Up @@ -249,6 +264,147 @@ def __init__(
"OpenCTI API is not reachable. Waiting for OpenCTI API to start or check your configuration..."
)

def _setup_proxy_certificates(self):
"""Setup HTTPS proxy certificates from environment variable.

Detects HTTPS_CA_CERTIFICATES environment variable and combines
proxy certificates with system certificates for SSL verification.
Supports both inline certificate content and file paths.

Uses a singleton pattern to ensure only one certificate bundle is created
across all instances, avoiding resource leaks and conflicts.
"""
global _PROXY_CERT_BUNDLE, _PROXY_CERT_DIR, _PROXY_SIGNAL_HANDLERS_REGISTERED

https_ca_certificates = os.getenv("HTTPS_CA_CERTIFICATES")
if not https_ca_certificates:
return

# Thread-safe check and setup
with _PROXY_CERT_LOCK:
# If already configured, reuse existing bundle
if _PROXY_CERT_BUNDLE is not None:
self.ssl_verify = _PROXY_CERT_BUNDLE
self.app_logger.debug(
"Reusing existing proxy certificate bundle",
{"cert_bundle": _PROXY_CERT_BUNDLE},
)
return

# First initialization - create the certificate bundle
try:
# Create secure temporary directory
cert_dir = tempfile.mkdtemp(prefix="opencti_proxy_certs_")

# Determine if HTTPS_CA_CERTIFICATES contains inline content or file path
cert_content = self._get_certificate_content(https_ca_certificates)

# Write proxy certificate to temp file
proxy_cert_file = os.path.join(cert_dir, "proxy-ca.crt")
with open(proxy_cert_file, "w") as f:
f.write(cert_content)

# Find system certificates
system_cert_paths = [
"/etc/ssl/certs/ca-certificates.crt", # Debian/Ubuntu
"/etc/pki/tls/certs/ca-bundle.crt", # RHEL/CentOS
"/etc/ssl/cert.pem", # Alpine/BSD
]

# Create combined certificate bundle
combined_cert_file = os.path.join(cert_dir, "combined-ca-bundle.crt")
with open(combined_cert_file, "w") as combined:
# Add system certificates first
for system_path in system_cert_paths:
if os.path.exists(system_path):
with open(system_path, "r") as sys_certs:
combined.write(sys_certs.read())
combined.write("\n")
break

# Add proxy certificate
combined.write(cert_content)

# Update global singleton variables
_PROXY_CERT_BUNDLE = combined_cert_file
_PROXY_CERT_DIR = cert_dir
self.ssl_verify = combined_cert_file

# Set environment variables for urllib and other libraries
os.environ["REQUESTS_CA_BUNDLE"] = combined_cert_file
os.environ["SSL_CERT_FILE"] = combined_cert_file

# Register cleanup handlers only once
atexit.register(_cleanup_proxy_certificates)

# Register signal handlers only once
if not _PROXY_SIGNAL_HANDLERS_REGISTERED:
signal.signal(signal.SIGTERM, _signal_handler_proxy_cleanup)
signal.signal(signal.SIGINT, _signal_handler_proxy_cleanup)
_PROXY_SIGNAL_HANDLERS_REGISTERED = True

self.app_logger.info(
"Proxy certificates configured",
{"cert_bundle": combined_cert_file},
)

except Exception as e:
self.app_logger.error(
"Failed to setup proxy certificates", {"error": str(e)}
)
raise

def _get_certificate_content(self, https_ca_certificates):
"""Extract certificate content from environment variable.

Supports both inline certificate content (PEM format) and file paths.

:param https_ca_certificates: Content from HTTPS_CA_CERTIFICATES env var
:type https_ca_certificates: str
:return: Certificate content in PEM format
:rtype: str
:raises ValueError: If the certificate content is invalid or cannot be read
"""
# Strip whitespace once at the beginning
stripped_https_ca_certificates = https_ca_certificates.strip()

# Check if it's inline certificate content (starts with PEM header)
if stripped_https_ca_certificates.startswith("-----BEGIN CERTIFICATE-----"):
self.app_logger.debug(
"HTTPS_CA_CERTIFICATES contains inline certificate content"
)
return https_ca_certificates

# Check if it's a file path
if os.path.isfile(stripped_https_ca_certificates):
cert_file_path = stripped_https_ca_certificates
try:
with open(cert_file_path, "r") as f:
cert_content = f.read()
# Validate it's actually a certificate
if "-----BEGIN CERTIFICATE-----" in cert_content:
self.app_logger.debug(
"HTTPS_CA_CERTIFICATES contains valid certificate file path",
{"file_path": cert_file_path},
)
return cert_content
else:
raise ValueError(
f"File at HTTPS_CA_CERTIFICATES path does not contain valid certificate: {cert_file_path}"
)
except ValueError:
# Re-raise ValueError from certificate validation
raise
except Exception as e:
raise ValueError(
f"Failed to read certificate file at {cert_file_path}: {str(e)}"
)

# Neither inline content nor valid file path
raise ValueError(
f"HTTPS_CA_CERTIFICATES is not a valid certificate or file path: {https_ca_certificates[:50]}..."
)

def set_applicant_id_header(self, applicant_id):
self.request_headers["opencti-applicant-id"] = applicant_id

Expand Down Expand Up @@ -884,3 +1040,33 @@ def get_attribute_in_mitre_extension(key, object) -> any:
"extension-definition--322b8f77-262a-4cb8-a915-1e441e00329b"
][key]
return None


# Global cleanup functions for proxy certificates singleton
def _cleanup_proxy_certificates():
"""Clean up temporary certificate directory for proxy certificates.

This function is called on normal program exit via atexit.
"""
global _PROXY_CERT_DIR
if _PROXY_CERT_DIR and os.path.exists(_PROXY_CERT_DIR):
try:
shutil.rmtree(_PROXY_CERT_DIR)
except Exception:
# Silently fail cleanup - best effort
pass
finally:
_PROXY_CERT_DIR = None


def _signal_handler_proxy_cleanup(signum, frame):
"""Handle termination signals (SIGTERM/SIGINT) for proxy certificate cleanup.

Performs cleanup and then raises SystemExit to allow
normal shutdown procedures to complete.

:param signum: Signal number
:param frame: Current stack frame
"""
_cleanup_proxy_certificates()
raise SystemExit(0)
1 change: 1 addition & 0 deletions tests/01-unit/api/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Unit tests for API client functionality
Loading