Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 62 additions & 8 deletions web-agent/app/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import logging
import time
from urllib.parse import unquote
import zipfile

# Global variables
letters: str = string.ascii_letters
Expand All @@ -20,6 +21,7 @@
log_folder: str = '/tmp/armorcode/log'
output_file_folder: str = '/tmp/armorcode/output_files'
output_file: str = f"{output_file_folder}/large_output_file{rand_string}.txt"
output_file_zip: str = f"{output_file_folder}/large_output_file{rand_string}.zip"

max_file_size: int = 1024 * 100 # max_size data that would be sent in payload, more than that will send via s3
logger: Optional[logging.Logger] = None
Expand All @@ -40,9 +42,11 @@
# throttling to 25 requests per seconds to avoid rate limit errors
rate_limiter = None

upload_to_ac = False


def main() -> None:
global api_key, server_url, logger, exponential_time_backoff, verify_cert, timeout, rate_limiter, inward_proxy, outgoing_proxy
global api_key, server_url, logger, exponential_time_backoff, verify_cert, timeout, rate_limiter, inward_proxy, outgoing_proxy, upload_to_ac

parser = argparse.ArgumentParser()
parser.add_argument("--serverUrl", required=False, help="Server Url")
Expand All @@ -57,6 +61,7 @@ def main() -> None:

parser.add_argument("--outgoingProxyHttps", required=False, help="Pass outgoing Https proxy", default=None)
parser.add_argument("--outgoingProxyHttp", required=False, help="Pass outgoing Http proxy", default=None)
parser.add_argument("--uploadToAc", action="store_true", help="Upload to Armorcode instead of s3 (default: False)", default=False)

args = parser.parse_args()

Expand All @@ -66,6 +71,7 @@ def main() -> None:
timeout_cmd = args.timeout
verify_cmd = args.verify
debug_cmd = args.debugMode
upload_to_ac = args.uploadToAc

inward_proxy_https = args.inwardProxyHttps
inward_proxy_http = args.inwardProxyHttp
Expand Down Expand Up @@ -191,6 +197,11 @@ def process() -> None:
os.remove(output_file)
except OSError as e:
logger.error("Error removing output file: %s", e)
if os.path.exists(output_file_zip):
try:
os.remove(output_file_zip)
except OSError as e:
logger.error("Error removing output file: %s", e)


def update_task(task: Dict[str, Any], count: int = 0) -> None:
Expand Down Expand Up @@ -283,15 +294,12 @@ def process_task(task: Dict[str, Any]) -> Dict[str, Any]:
file_size: int = os.path.getsize(output_file)
logger.info("file size %s", file_size)
is_s3_upload: bool = file_size > max_file_size # if size is greater than max_size, upload data to s3
isZipped = False
if is_s3_upload:
s3_upload_url, s3_signed_get_url = get_s3_upload_url(taskId)
if s3_upload_url is None:
logger.warning("Failed to get S3 upload URL for URL %s", url)
else:
upload_success = upload_s3(s3_upload_url)
s3_signed_get_url, isZipped = upload_response()

# update task with the output
_update_task_with_response(task, response, s3_signed_get_url)
_update_task_with_response(task, response, s3_signed_get_url, isZipped)

logger.info("Task %s processed successfully.", taskId)
return task
Expand All @@ -306,15 +314,61 @@ def process_task(task: Dict[str, Any]) -> Dict[str, Any]:
return task


def zip_response():
try:
with zipfile.ZipFile(output_file_zip, 'w') as zipf:
zipf.write(output_file)
except Exception as e:
logger.error("Unable to upload file to armorcode: %s", e)
raise e

def upload_response(taskId: str, url: str):
if upload_to_ac:
try:
rate_limiter.throttle()
zip_response()
headers: Dict[str, str] = {
"Authorization": f"Bearer {api_key}",
}
files = {
# 'fileFieldName' is the name of the form field expected by the server
"file": (f"{taskId}_{uuid.uuid4().hex}.zip", open(output_file_zip, "rb"), "application/zip")
# If you have multiple files, you can add them here as more entries
}
upload_result: requests.Response = requests.post(
f"{server_url}/api/http-teleport/upload-result/{taskId}",
headers=headers,
timeout=50, verify=verify_cert, proxies=outgoing_proxy, files=files
)
upload_result.raise_for_status()

data: Optional[Dict[str, str]] = upload_result.json().get('data', None)
if data is not None:
return data.get('getUrl'), True
logger.warning("No data returned when uploading the data to s3")
return None
except Exception as e:
logger.error("Unable to upload file to armorcode: %s", e)
raise e
else:
s3_upload_url, s3_signed_get_url = get_s3_upload_url(taskId)
if s3_upload_url is None:
logger.warning("Failed to get S3 upload URL for URL %s", url)
else:
upload_success = upload_s3(s3_upload_url)
return s3_signed_get_url, False


def check_and_update_encode_url(headers, url: str):
if "/cxrestapi/auth/identity/connect/token" in url:
headers["Content-Type"] = "application/x-www-form-urlencoded"


def _update_task_with_response(task: Dict[str, Any], response: requests.Response,
s3_signed_get_url: Optional[str]) -> None:
s3_signed_get_url: Optional[str], isZipped=False) -> None:
task['responseHeaders'] = dict(response.headers)
task['statusCode'] = response.status_code
task['zippedResponse'] = str(isZipped).lower()
if s3_signed_get_url is None: # check if needs to send data or fileURL
with open(output_file, 'r') as file:
task['output'] = file.read()
Expand Down
Loading