Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions conf/access.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,8 @@
"public.cps.za.dlchange": [
"FooUser",
"BarUser"
],
"public.cps.za.test": [
"TestUser"
]
}
2 changes: 1 addition & 1 deletion conf/topic_runs.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"type": "string",
"description": "Identifier of the job in it’s respective system (e.g. Spark Application Id, Glue Job Id, EMR Step Id, etc)."
},
"tenant_id ": {
"tenant_id": {
"type": "string",
"description": "Application ID (4 letter code) or ServiceNow identifier related to the pipeline/domain/process owner (tenant of the tool)"
},
Expand Down
30 changes: 30 additions & 0 deletions conf/topic_test.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"type": "object",
"properties": {
"event_id": {
"type": "string",
"description": "Unique identifier for the event (GUID), generated for each unique event, for de-duplication purposes"
},
"tenant_id": {
"type": "string",
"description": "Application ID (4 letter code) or ServiceNow identifier related to the pipeline/domain/process owner (tenant of the tool)"
},
"source_app": {
"type": "string",
"description": "Standardized source application name (aqueduct, unify, lum, etc)"
},
"environment": {
"type": "string",
"description": "Environment (dev, uat, pre-prod, prod, test or others)"
},
"timestamp": {
"type": "number",
"description": "Timestamp of the event in epoch milliseconds"
},
"additional_info": {
"type": "object",
"description": "Optional additional fields structured as an inner JSON"
}
},
"required": ["event_id", "tenant_id", "source_app", "environment", "timestamp"]
}
4 changes: 3 additions & 1 deletion src/event_gate_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@
TOPICS["public.cps.za.runs"] = json.load(file)
with open("conf/topic_dlchange.json", "r") as file:
TOPICS["public.cps.za.dlchange"] = json.load(file)
with open("conf/topic_test.json", "r") as file:
TOPICS["public.cps.za.test"] = json.load(file)
logger.debug("Loaded TOPICS")

with open("conf/config.json", "r") as file:
with open("conf/config.dev.json", "r") as file:
CONFIG = json.load(file)
logger.debug("Loaded main CONFIG")

Expand Down
2 changes: 1 addition & 1 deletion src/writer_eventbridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def write(topicName, message):
_logger.error(str(response))
return False
except Exception as e:
_logger.error(str(e))
_logger.error(f'The EventBridge writer failed with unknown error: {str(e)}')
return False

return True
2 changes: 1 addition & 1 deletion src/writer_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def write(topicName, message):
_logger.error(str(error))
return False
except Exception as e:
_logger.error(str(e))
_logger.error(f'The Kafka writer failed with unknown error: {str(e)}')
return False

return True
56 changes: 47 additions & 9 deletions src/writer_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@

import boto3
from botocore.exceptions import ClientError
import psycopg2
try:
import psycopg2 # noqa: F401
except ImportError: # pragma: no cover - environment without psycopg2
psycopg2 = None

def init(logger):
global _logger
Expand Down Expand Up @@ -64,10 +67,10 @@ def postgres_edla_write(cursor, table, message):
message["timestamp_event"],
message["catalog_id"],
message["operation"],
message["location"] if "location" in message else None,
message.get("location"),
message["format"],
json.dumps(message["format_options"]) if "format_options" in message else None,
json.dumps(message["additional_info"]) if "additional_info" in message else None
json.dumps(message.get("format_options")) if "format_options" in message else None,
json.dumps(message.get("additional_info")) if "additional_info" in message else None
)
)

Expand All @@ -79,7 +82,7 @@ def postgres_run_write(cursor, table_runs, table_jobs, message):
event_id,
job_ref,
tenant_id,
soure_app,
source_app,
source_app_version,
environment,
timestamp_start,
Expand Down Expand Up @@ -134,16 +137,49 @@ def postgres_run_write(cursor, table_runs, table_jobs, message):
job["status"],
job["timestamp_start"],
job["timestamp_end"],
job["message"] if "message" in job else None,
json.dumps(job["additional_info"]) if "additional_info" in job else None
job.get("message"),
json.dumps(job.get("additional_info")) if "additional_info" in job else None
)
)


def postgres_test_write(cursor, table, message):
_logger.debug(f"Sending to Postgres - {table}")
cursor.execute(f"""
INSERT INTO {table}
(
event_id,
tenant_id,
source_app,
environment,
timestamp_event,
additional_info
)
VALUES
(
%s,
%s,
%s,
%s,
%s,
%s
)""", (
message["event_id"],
message["tenant_id"],
message["source_app"],
message["environment"],
message["timestamp"],
json.dumps(message.get("additional_info")) if "additional_info" in message else None
)
)

def write(topicName, message):
try:
if not POSTGRES["database"]:
_logger.debug("No Postgres - skipping")
return True
if psycopg2 is None:
_logger.debug("psycopg2 not available - skipping actual Postgres write")
return True

with psycopg2.connect(
database=POSTGRES["database"],
Expand All @@ -157,13 +193,15 @@ def write(topicName, message):
postgres_edla_write(cursor, "public_cps_za_dlchange", message)
elif topicName == "public.cps.za.runs":
postgres_run_write(cursor, "public_cps_za_runs", "public_cps_za_runs_jobs", message)
elif topicName == "public.cps.za.test":
postgres_test_write(cursor, "public_cps_za_test", message)
else:
_logger.error(f"unknown topic for postgres {topicName}")
return False

connection.commit()
except Exception as e:
_logger.error(str(e))
_logger.error(f'The Postgres writer with failed unknown error: {str(e)}')
return False

return True
66 changes: 66 additions & 0 deletions tests/test_conf_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import os
import json
import unittest
from glob import glob

CONF_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "conf")

REQUIRED_CONFIG_KEYS = {
"access_config",
"token_provider_url",
"token_public_key_url",
"kafka_bootstrap_server",
"event_bus_arn",
}

def load_json(path):
with open(path, "r") as f:
return json.load(f)

class TestConfigFiles(unittest.TestCase):
def test_config_files_have_required_keys(self):
# Pick up any config*.json (excluding access and topics)
config_files = [
f for f in glob(os.path.join(CONF_DIR, "config*.json"))
if os.path.basename(f) not in {"access.json"}
]
self.assertTrue(config_files, "No config files found matching pattern config*.json")
for path in config_files:
with self.subTest(config=path):
data = load_json(path)
missing = REQUIRED_CONFIG_KEYS - data.keys()
self.assertFalse(missing, f"Config {path} missing keys: {missing}")

def test_access_json_structure(self):
path = os.path.join(CONF_DIR, "access.json")
data = load_json(path)
self.assertIsInstance(data, dict, "access.json must contain an object mapping topic -> list[user]")
for topic, users in data.items():
with self.subTest(topic=topic):
self.assertIsInstance(topic, str)
self.assertIsInstance(users, list, f"Topic {topic} value must be a list of users")
self.assertTrue(all(isinstance(u, str) for u in users), f"All users for topic {topic} must be strings")

def test_topic_json_schemas_basic(self):
topic_files = glob(os.path.join(CONF_DIR, "topic_*.json"))
self.assertTrue(topic_files, "No topic_*.json files found")
for path in topic_files:
with self.subTest(topic_file=path):
schema = load_json(path)
# Basic required structure
self.assertEqual(schema.get("type"), "object", "Schema root 'type' must be 'object'")
props = schema.get("properties")
self.assertIsInstance(props, dict, "Schema must define 'properties' object")
required = schema.get("required")
self.assertIsInstance(required, list, "Schema must define 'required' list")
# Each required field is present in properties
missing_props = [r for r in required if r not in props]
self.assertFalse(missing_props, f"Required fields missing in properties: {missing_props}")
# Ensure property entries themselves have at least a type
for name, definition in props.items():
self.assertIsInstance(definition, dict, f"Property {name} definition must be an object")
self.assertIn("type", definition, f"Property {name} must specify a 'type'")

if __name__ == "__main__": # pragma: no cover
unittest.main()

112 changes: 112 additions & 0 deletions tests/test_writer_postgres.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import json
import logging
import unittest

from src import writer_postgres


class MockCursor:
def __init__(self):
self.executions = []
def execute(self, sql, params):
self.executions.append((sql.strip(), params))

class TestWriterPostgres(unittest.TestCase):
@classmethod
def setUpClass(cls):
# Initialize logger and module (will skip DB because no env vars)
writer_postgres.init(logging.getLogger("test"))

def test_postgres_edla_write_with_optional_fields(self):
cur = MockCursor()
message = {
"event_id": "e1",
"tenant_id": "t1",
"source_app": "app",
"source_app_version": "1.0.0",
"environment": "dev",
"timestamp_event": 111,
"catalog_id": "db.tbl",
"operation": "append",
"location": "s3://bucket/path",
"format": "parquet",
"format_options": {"compression": "snappy"},
"additional_info": {"foo": "bar"}
}
writer_postgres.postgres_edla_write(cur, "table_a", message)
self.assertEqual(len(cur.executions), 1)
sql, params = cur.executions[0]
# Ensure we inserted 12 params per columns list
self.assertEqual(len(params), 12)
self.assertEqual(params[0], "e1")
self.assertEqual(params[8], "s3://bucket/path") # location
self.assertEqual(params[9], "parquet")
self.assertEqual(json.loads(params[10]), {"compression": "snappy"})
self.assertEqual(json.loads(params[11]), {"foo": "bar"})

def test_postgres_edla_write_missing_optional(self):
cur = MockCursor()
message = {
"event_id": "e2",
"tenant_id": "t2",
"source_app": "app",
"source_app_version": "1.0.1",
"environment": "dev",
"timestamp_event": 222,
"catalog_id": "db.tbl2",
"operation": "overwrite",
"format": "delta"
}
writer_postgres.postgres_edla_write(cur, "table_a", message)
sql, params = cur.executions[0]
# location, format_options, additional_info -> None
self.assertIsNone(params[8])
self.assertEqual(params[9], "delta")
self.assertIsNone(params[10])
self.assertIsNone(params[11])

def test_postgres_run_write(self):
cur = MockCursor()
message = {
"event_id": "r1",
"job_ref": "job-123",
"tenant_id": "ten",
"source_app": "runapp",
"source_app_version": "2.0.0",
"environment": "dev",
"timestamp_start": 1000,
"timestamp_end": 2000,
"jobs": [
{"catalog_id": "c1", "status": "succeeded", "timestamp_start": 1100, "timestamp_end": 1200},
{"catalog_id": "c2", "status": "failed", "timestamp_start": 1300, "timestamp_end": 1400, "message": "err", "additional_info": {"k": "v"}}
]
}
writer_postgres.postgres_run_write(cur, "runs_table", "jobs_table", message)
# 1 insert for run + 2 inserts for jobs
self.assertEqual(len(cur.executions), 3)
run_sql, run_params = cur.executions[0]
self.assertIn("source_app_version", run_sql) # ensure fixed column name present implicitly
self.assertEqual(run_params[3], "runapp") # source_app param
job2_sql, job2_params = cur.executions[2]
self.assertEqual(job2_params[5], "err")
self.assertEqual(json.loads(job2_params[6]), {"k": "v"})

def test_postgres_test_write(self):
cur = MockCursor()
message = {
"event_id": "t1",
"tenant_id": "tenant-x",
"source_app": "test",
"environment": "dev",
"timestamp": 999,
"additional_info": {"a": 1}
}
writer_postgres.postgres_test_write(cur, "table_test", message)
self.assertEqual(len(cur.executions), 1)
sql, params = cur.executions[0]
self.assertEqual(params[0], "t1")
self.assertEqual(params[1], "tenant-x")
self.assertEqual(json.loads(params[5]), {"a": 1})

if __name__ == '__main__':
unittest.main()