|
3 | 3 | import datetime |
4 | 4 | import io |
5 | 5 | import json |
| 6 | +import os |
| 7 | +import tempfile |
6 | 8 | from typing import Dict, Tuple, Union |
7 | 9 |
|
8 | 10 | import magic |
@@ -166,6 +168,9 @@ def __init__( |
166 | 168 | self.app_logger = self.logger_class("api") |
167 | 169 | self.admin_logger = self.logger_class("admin") |
168 | 170 |
|
| 171 | + # Setup proxy certificates if provided |
| 172 | + self._setup_proxy_certificates() |
| 173 | + |
169 | 174 | # Define API |
170 | 175 | self.api_token = token |
171 | 176 | self.api_url = url + "/graphql" |
@@ -249,6 +254,62 @@ def __init__( |
249 | 254 | "OpenCTI API is not reachable. Waiting for OpenCTI API to start or check your configuration..." |
250 | 255 | ) |
251 | 256 |
|
| 257 | + def _setup_proxy_certificates(self): |
| 258 | + """Setup HTTPS proxy certificates from environment variable. |
| 259 | + |
| 260 | + Detects HTTPS_CA_CERTIFICATES environment variable and combines |
| 261 | + proxy certificates with system certificates for SSL verification. |
| 262 | + """ |
| 263 | + https_ca_certificates = os.getenv("HTTPS_CA_CERTIFICATES") |
| 264 | + if not https_ca_certificates: |
| 265 | + return |
| 266 | + |
| 267 | + try: |
| 268 | + # Create secure temporary directory |
| 269 | + cert_dir = tempfile.mkdtemp(prefix="opencti_proxy_certs_") |
| 270 | + |
| 271 | + # Write proxy certificate to temp file |
| 272 | + proxy_cert_file = os.path.join(cert_dir, "proxy-ca.crt") |
| 273 | + with open(proxy_cert_file, "w") as f: |
| 274 | + f.write(https_ca_certificates) |
| 275 | + |
| 276 | + # Find system certificates |
| 277 | + system_cert_paths = [ |
| 278 | + "/etc/ssl/certs/ca-certificates.crt", # Debian/Ubuntu |
| 279 | + "/etc/pki/tls/certs/ca-bundle.crt", # RHEL/CentOS |
| 280 | + "/etc/ssl/cert.pem", # Alpine/BSD |
| 281 | + ] |
| 282 | + |
| 283 | + # Create combined certificate bundle |
| 284 | + combined_cert_file = os.path.join(cert_dir, "combined-ca-bundle.crt") |
| 285 | + with open(combined_cert_file, "w") as combined: |
| 286 | + # Add system certificates first |
| 287 | + for system_path in system_cert_paths: |
| 288 | + if os.path.exists(system_path): |
| 289 | + with open(system_path, "r") as sys_certs: |
| 290 | + combined.write(sys_certs.read()) |
| 291 | + combined.write("\n") |
| 292 | + break |
| 293 | + |
| 294 | + # Add proxy certificate |
| 295 | + combined.write(https_ca_certificates) |
| 296 | + |
| 297 | + # Update ssl_verify to use combined certificate bundle |
| 298 | + self.ssl_verify = combined_cert_file |
| 299 | + |
| 300 | + # Set environment variables for urllib and other libraries |
| 301 | + os.environ["REQUESTS_CA_BUNDLE"] = combined_cert_file |
| 302 | + os.environ["SSL_CERT_FILE"] = combined_cert_file |
| 303 | + |
| 304 | + self.app_logger.info("Proxy certificates configured", { |
| 305 | + "cert_bundle": combined_cert_file |
| 306 | + }) |
| 307 | + |
| 308 | + except Exception as e: |
| 309 | + self.app_logger.warning("Failed to setup proxy certificates", { |
| 310 | + "error": str(e) |
| 311 | + }) |
| 312 | + |
252 | 313 | def set_applicant_id_header(self, applicant_id): |
253 | 314 | self.request_headers["opencti-applicant-id"] = applicant_id |
254 | 315 |
|
|
0 commit comments