Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 160 additions & 35 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@
timeplus_password = os.getenv("TIMEPLUS_PASSWORD") or "timeplus@t+"
timeplus_port = int(os.getenv("TIMEPLUS_PORT", "8463"))

# Configuration for metadata storage
# Options: 'sqlite' (default) or 'mutable_stream'
METADATA_STORAGE = os.getenv("METADATA_STORAGE", "sqlite").lower()
SQLITE_DB_PATH = os.getenv("SQLITE_DB_PATH", "pipelines.db")

def wait_for_timeplus_connection(max_retries=30, retry_delay=2):
"""
Wait for Timeplus server to be available by pinging it with 'SELECT 1'
Expand Down Expand Up @@ -302,7 +307,7 @@ def generate_pipeline(self):

class PipelineManager:
def __init__(self):
db_logger.info("Initializing PipelineManager with SQLite metadata storage")
db_logger.info(f"Initializing PipelineManager with metadata storage: {METADATA_STORAGE}")
db_logger.info(f"Connecting to Timeplus: {timeplus_user}@{timeplus_host}:{timeplus_port}")

# Initialize Timeplus client for stream operations
Expand All @@ -318,12 +323,41 @@ def __init__(self):
db_logger.error(f"Failed to connect to Timeplus: {e}")
raise

# Initialize SQLite manager for metadata
# Initialize metadata storage based on configuration
self.use_sqlite = METADATA_STORAGE == "sqlite"

if self.use_sqlite:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code should be moved into method _init_pipeline_metadata

# Initialize SQLite manager for metadata
try:
self.metadata_manager = SQLitePipelineManager(SQLITE_DB_PATH)
db_logger.info(f"SQLite metadata manager initialized with database: {SQLITE_DB_PATH}")
except Exception as e:
db_logger.error(f"Failed to initialize SQLite manager: {e}")
raise
else:
# Initialize mutable stream for metadata (legacy mode)
self.pipeline_stream_name = "synthetic_data_pipelines"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why both setting will use sqlite for metadata, seems the mutable stream based metadata is not here?

self._init_pipeline_metadata()

def _init_pipeline_metadata(self):
"""Initialize mutable stream for metadata storage (legacy mode)"""
db_logger.info(f"Initializing pipeline metadata stream: {self.pipeline_stream_name}")

try:
self.metadata_manager = SQLitePipelineManager("pipelines.db")
db_logger.info("SQLite metadata manager initialized")
create_sql = f"""CREATE MUTABLE STREAM IF NOT EXISTS {self.pipeline_stream_name} (
id string,
name string,
pipeline string
)
PRIMARY KEY (id)
"""

db_logger.debug(f"Executing DDL: {create_sql}")
self.client.execute(create_sql)
db_logger.info("Pipeline metadata stream initialized successfully")

except Exception as e:
db_logger.error(f"Failed to initialize SQLite manager: {e}")
db_logger.error(f"Failed to initialize pipeline metadata stream: {e}")
raise

def create(self, pipeline, name):
Expand Down Expand Up @@ -371,26 +405,39 @@ def create(self, pipeline, name):
db_logger.error(f"Failed DDL might be: {pipeline.get('write_to_kafka_mv', {}).get('ddl', 'N/A')}")
raise RuntimeError(f"Failed to create pipeline: {e}")

# Save metadata to SQLite
# Save metadata using the configured backend
try:
db_logger.info("Saving pipeline metadata to SQLite...")
saved_id = self.metadata_manager.create(pipeline, name)
db_logger.info("Pipeline metadata saved successfully")
db_logger.info(f"Saving pipeline metadata using {METADATA_STORAGE}...")

if self.use_sqlite:
# Save to SQLite
saved_id = self.metadata_manager.create(pipeline, name)
db_logger.info("Pipeline metadata saved successfully to SQLite")
else:
# Save to mutable stream (legacy mode)
pipeline_json = json.dumps(pipeline, indent=2)
insert_sql = f"INSERT INTO {self.pipeline_stream_name} (id, name, pipeline) VALUES"
values = [[pipeline_id, name, pipeline_json]]

db_logger.debug(f"Executing insert: {insert_sql}")
db_logger.debug(f"Values: id={pipeline_id}, name={name}, pipeline_length={len(pipeline_json)}")

self.client.execute(insert_sql, values)
saved_id = pipeline_id
db_logger.info("Pipeline metadata saved successfully to mutable stream")

except Exception as e:
db_logger.error(f"Failed to save pipeline metadata: {e}")
raise RuntimeError(f"Failed to save pipeline metadata: {e}")

db_logger.info(f"Pipeline creation completed successfully with ID: {saved_id}")
return saved_id

def _get_pipeline_write_count(self, pipeline_data):
"""Get write count from Timeplus materialized view"""
def _get_pipeline_write_count(self, pipeline_json):
query_sql = f"SELECT COUNT(*) FROM table({pipeline_json['write_to_kafka_mv']['name']}) WHERE _tp_time > earliest_ts()"
db_logger.debug(f"Executing query: {query_sql}")

try:
mv_name = pipeline_data['write_to_kafka_mv']['name']
query_sql = f"SELECT COUNT(*) FROM table({mv_name}) WHERE _tp_time > earliest_ts()"
db_logger.debug(f"Executing write count query: {query_sql}")

result = self.client.execute(query_sql)
db_logger.debug(f"Query result: {len(result) if result else 0} rows")

Expand All @@ -406,17 +453,53 @@ def get(self, pipeline_id):
db_logger.info(f"Retrieving pipeline with ID: {pipeline_id}")

try:
# Get pipeline from SQLite
pipeline_info = self.metadata_manager.get(pipeline_id)

# Get live write count from Timeplus
live_count = self._get_pipeline_write_count(pipeline_info['pipeline'])

# Update the write count
pipeline_info['write_count'] = live_count

db_logger.info(f"Successfully retrieved pipeline: {pipeline_info['name']} (writes: {live_count})")
return pipeline_info
if self.use_sqlite:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logic should be wrapper in the metadata manager

# Get from SQLite
pipeline_info = self.metadata_manager.get(pipeline_id)

# Get live write count from Timeplus
live_count = self._get_pipeline_write_count(pipeline_info['pipeline'])

# Update the write count
pipeline_info['write_count'] = live_count

db_logger.info(f"Successfully retrieved pipeline from SQLite: {pipeline_info['name']} (writes: {live_count})")
return pipeline_info
else:
# Get from mutable stream (legacy mode)
query_sql = f"SELECT name, pipeline FROM table({self.pipeline_stream_name}) WHERE id = '{pipeline_id}'"
db_logger.debug(f"Executing query: {query_sql}")

result = self.client.execute(query_sql)
db_logger.debug(f"Query result: {len(result) if result else 0} rows")

if result:
name = result[0][0]
pipeline_json = result[0][1]

db_logger.debug(f"Found pipeline: name={name}, json_length={len(pipeline_json)}")

# Get Pipeline Stats
count = self._get_pipeline_write_count(json.loads(pipeline_json))
db_logger.info(f"Pipeline {name} has {count} writes")

try:
pipeline_data = json.loads(pipeline_json)
db_logger.info(f"Successfully retrieved pipeline from mutable stream: {name}")

return {
"id": pipeline_id,
"name": name,
"pipeline": pipeline_data,
"write_count": count
}
except json.JSONDecodeError as e:
db_logger.error(f"Failed to parse pipeline JSON: {e}")
db_logger.debug(f"Malformed JSON: {pipeline_json}")
raise RuntimeError(f"Failed to parse pipeline data: {e}")
else:
db_logger.warning(f"Pipeline with id {pipeline_id} not found")
raise ValueError(f"Pipeline with id {pipeline_id} not found.")

except Exception as e:
if isinstance(e, ValueError):
Expand All @@ -428,9 +511,41 @@ def list_all(self):
db_logger.info("Listing all pipelines")

try:
pipelines = self.metadata_manager.list_all()
db_logger.info(f"Successfully listed {len(pipelines)} pipelines")
return pipelines
if self.use_sqlite:
# List from SQLite
pipelines = self.metadata_manager.list_all()
db_logger.info(f"Successfully listed {len(pipelines)} pipelines from SQLite")
return pipelines
else:
# List from mutable stream (legacy mode)
query_sql = f"SELECT id, name, pipeline FROM table({self.pipeline_stream_name})"
db_logger.debug(f"Executing query: {query_sql}")

result = self.client.execute(query_sql)
db_logger.info(f"Found {len(result) if result else 0} pipelines")

pipelines = []
for i, row in enumerate(result):
try:
id, name, pipeline_json = row
pipeline_data = json.loads(pipeline_json)

pipeline_info = {
"id": id,
"name": name,
"question": pipeline_data.get("question", ""),
"created_at": pipeline_data.get("created_at", "")
}
pipelines.append(pipeline_info)

db_logger.debug(f"Pipeline {i}: {id} - {name}")

except Exception as e:
db_logger.error(f"Failed to parse pipeline {i}: {e}")
continue

db_logger.info(f"Successfully processed {len(pipelines)} pipelines from mutable stream")
return pipelines

except Exception as e:
db_logger.error(f"Failed to list pipelines: {e}")
Expand All @@ -439,9 +554,9 @@ def list_all(self):
def delete(self, pipeline_id):
db_logger.info(f"Deleting pipeline with ID: {pipeline_id}")

# Get pipeline info first
# Get pipeline info first (this will work with both backends)
try:
pipeline_info = self.metadata_manager.get(pipeline_id)
pipeline_info = self.get(pipeline_id)
pipeline = pipeline_info["pipeline"]
name = pipeline_info["name"]

Expand Down Expand Up @@ -478,11 +593,21 @@ def delete(self, pipeline_id):

# TODO: delete Kafka topic

# Delete pipeline metadata from SQLite
# Delete pipeline metadata using the configured backend
try:
db_logger.info("Deleting pipeline metadata from SQLite...")
self.metadata_manager.delete(pipeline_id)
db_logger.info(f"Pipeline {name} deleted successfully")
if self.use_sqlite:
# Delete from SQLite
db_logger.info("Deleting pipeline metadata from SQLite...")
self.metadata_manager.delete(pipeline_id)
db_logger.info(f"Pipeline {name} deleted successfully from SQLite")
else:
# Delete from mutable stream (legacy mode)
db_logger.info("Deleting pipeline metadata from mutable stream...")
delete_sql = f"DELETE FROM {self.pipeline_stream_name} WHERE id = '{pipeline_id}'"
db_logger.debug(f"Executing: {delete_sql}")

self.client.execute(delete_sql)
db_logger.info(f"Pipeline {name} deleted successfully from mutable stream")

except Exception as e:
db_logger.error(f"Failed to delete pipeline metadata: {e}")
Expand Down
Loading