-
Notifications
You must be signed in to change notification settings - Fork 1
Better SQLite Support #10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -40,6 +40,11 @@ | |
| timeplus_password = os.getenv("TIMEPLUS_PASSWORD") or "timeplus@t+" | ||
| timeplus_port = int(os.getenv("TIMEPLUS_PORT", "8463")) | ||
|
|
||
| # Configuration for metadata storage | ||
| # Options: 'sqlite' (default) or 'mutable_stream' | ||
| METADATA_STORAGE = os.getenv("METADATA_STORAGE", "sqlite").lower() | ||
| SQLITE_DB_PATH = os.getenv("SQLITE_DB_PATH", "pipelines.db") | ||
|
|
||
| def wait_for_timeplus_connection(max_retries=30, retry_delay=2): | ||
| """ | ||
| Wait for Timeplus server to be available by pinging it with 'SELECT 1' | ||
|
|
@@ -302,7 +307,7 @@ def generate_pipeline(self): | |
|
|
||
| class PipelineManager: | ||
| def __init__(self): | ||
| db_logger.info("Initializing PipelineManager with SQLite metadata storage") | ||
| db_logger.info(f"Initializing PipelineManager with metadata storage: {METADATA_STORAGE}") | ||
| db_logger.info(f"Connecting to Timeplus: {timeplus_user}@{timeplus_host}:{timeplus_port}") | ||
|
|
||
| # Initialize Timeplus client for stream operations | ||
|
|
@@ -318,12 +323,41 @@ def __init__(self): | |
| db_logger.error(f"Failed to connect to Timeplus: {e}") | ||
| raise | ||
|
|
||
| # Initialize SQLite manager for metadata | ||
| # Initialize metadata storage based on configuration | ||
| self.use_sqlite = METADATA_STORAGE == "sqlite" | ||
|
|
||
| if self.use_sqlite: | ||
| # Initialize SQLite manager for metadata | ||
| try: | ||
| self.metadata_manager = SQLitePipelineManager(SQLITE_DB_PATH) | ||
| db_logger.info(f"SQLite metadata manager initialized with database: {SQLITE_DB_PATH}") | ||
| except Exception as e: | ||
| db_logger.error(f"Failed to initialize SQLite manager: {e}") | ||
| raise | ||
| else: | ||
| # Initialize mutable stream for metadata (legacy mode) | ||
| self.pipeline_stream_name = "synthetic_data_pipelines" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why both setting will use sqlite for metadata, seems the mutable stream based metadata is not here? |
||
| self._init_pipeline_metadata() | ||
|
|
||
| def _init_pipeline_metadata(self): | ||
| """Initialize mutable stream for metadata storage (legacy mode)""" | ||
| db_logger.info(f"Initializing pipeline metadata stream: {self.pipeline_stream_name}") | ||
|
|
||
| try: | ||
| self.metadata_manager = SQLitePipelineManager("pipelines.db") | ||
| db_logger.info("SQLite metadata manager initialized") | ||
| create_sql = f"""CREATE MUTABLE STREAM IF NOT EXISTS {self.pipeline_stream_name} ( | ||
| id string, | ||
| name string, | ||
| pipeline string | ||
| ) | ||
| PRIMARY KEY (id) | ||
| """ | ||
|
|
||
| db_logger.debug(f"Executing DDL: {create_sql}") | ||
| self.client.execute(create_sql) | ||
| db_logger.info("Pipeline metadata stream initialized successfully") | ||
|
|
||
| except Exception as e: | ||
| db_logger.error(f"Failed to initialize SQLite manager: {e}") | ||
| db_logger.error(f"Failed to initialize pipeline metadata stream: {e}") | ||
| raise | ||
|
|
||
| def create(self, pipeline, name): | ||
|
|
@@ -371,26 +405,39 @@ def create(self, pipeline, name): | |
| db_logger.error(f"Failed DDL might be: {pipeline.get('write_to_kafka_mv', {}).get('ddl', 'N/A')}") | ||
| raise RuntimeError(f"Failed to create pipeline: {e}") | ||
|
|
||
| # Save metadata to SQLite | ||
| # Save metadata using the configured backend | ||
| try: | ||
| db_logger.info("Saving pipeline metadata to SQLite...") | ||
| saved_id = self.metadata_manager.create(pipeline, name) | ||
| db_logger.info("Pipeline metadata saved successfully") | ||
| db_logger.info(f"Saving pipeline metadata using {METADATA_STORAGE}...") | ||
|
|
||
| if self.use_sqlite: | ||
| # Save to SQLite | ||
| saved_id = self.metadata_manager.create(pipeline, name) | ||
| db_logger.info("Pipeline metadata saved successfully to SQLite") | ||
| else: | ||
| # Save to mutable stream (legacy mode) | ||
| pipeline_json = json.dumps(pipeline, indent=2) | ||
| insert_sql = f"INSERT INTO {self.pipeline_stream_name} (id, name, pipeline) VALUES" | ||
| values = [[pipeline_id, name, pipeline_json]] | ||
|
|
||
| db_logger.debug(f"Executing insert: {insert_sql}") | ||
| db_logger.debug(f"Values: id={pipeline_id}, name={name}, pipeline_length={len(pipeline_json)}") | ||
|
|
||
| self.client.execute(insert_sql, values) | ||
| saved_id = pipeline_id | ||
| db_logger.info("Pipeline metadata saved successfully to mutable stream") | ||
|
|
||
| except Exception as e: | ||
| db_logger.error(f"Failed to save pipeline metadata: {e}") | ||
| raise RuntimeError(f"Failed to save pipeline metadata: {e}") | ||
|
|
||
| db_logger.info(f"Pipeline creation completed successfully with ID: {saved_id}") | ||
| return saved_id | ||
|
|
||
| def _get_pipeline_write_count(self, pipeline_data): | ||
| """Get write count from Timeplus materialized view""" | ||
| def _get_pipeline_write_count(self, pipeline_json): | ||
| query_sql = f"SELECT COUNT(*) FROM table({pipeline_json['write_to_kafka_mv']['name']}) WHERE _tp_time > earliest_ts()" | ||
| db_logger.debug(f"Executing query: {query_sql}") | ||
|
|
||
| try: | ||
| mv_name = pipeline_data['write_to_kafka_mv']['name'] | ||
| query_sql = f"SELECT COUNT(*) FROM table({mv_name}) WHERE _tp_time > earliest_ts()" | ||
| db_logger.debug(f"Executing write count query: {query_sql}") | ||
|
|
||
| result = self.client.execute(query_sql) | ||
| db_logger.debug(f"Query result: {len(result) if result else 0} rows") | ||
|
|
||
|
|
@@ -406,17 +453,53 @@ def get(self, pipeline_id): | |
| db_logger.info(f"Retrieving pipeline with ID: {pipeline_id}") | ||
|
|
||
| try: | ||
| # Get pipeline from SQLite | ||
| pipeline_info = self.metadata_manager.get(pipeline_id) | ||
|
|
||
| # Get live write count from Timeplus | ||
| live_count = self._get_pipeline_write_count(pipeline_info['pipeline']) | ||
|
|
||
| # Update the write count | ||
| pipeline_info['write_count'] = live_count | ||
|
|
||
| db_logger.info(f"Successfully retrieved pipeline: {pipeline_info['name']} (writes: {live_count})") | ||
| return pipeline_info | ||
| if self.use_sqlite: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this logic should be wrapper in the metadata manager |
||
| # Get from SQLite | ||
| pipeline_info = self.metadata_manager.get(pipeline_id) | ||
|
|
||
| # Get live write count from Timeplus | ||
| live_count = self._get_pipeline_write_count(pipeline_info['pipeline']) | ||
|
|
||
| # Update the write count | ||
| pipeline_info['write_count'] = live_count | ||
|
|
||
| db_logger.info(f"Successfully retrieved pipeline from SQLite: {pipeline_info['name']} (writes: {live_count})") | ||
| return pipeline_info | ||
| else: | ||
| # Get from mutable stream (legacy mode) | ||
| query_sql = f"SELECT name, pipeline FROM table({self.pipeline_stream_name}) WHERE id = '{pipeline_id}'" | ||
| db_logger.debug(f"Executing query: {query_sql}") | ||
|
|
||
| result = self.client.execute(query_sql) | ||
| db_logger.debug(f"Query result: {len(result) if result else 0} rows") | ||
|
|
||
| if result: | ||
| name = result[0][0] | ||
| pipeline_json = result[0][1] | ||
|
|
||
| db_logger.debug(f"Found pipeline: name={name}, json_length={len(pipeline_json)}") | ||
|
|
||
| # Get Pipeline Stats | ||
| count = self._get_pipeline_write_count(json.loads(pipeline_json)) | ||
| db_logger.info(f"Pipeline {name} has {count} writes") | ||
|
|
||
| try: | ||
| pipeline_data = json.loads(pipeline_json) | ||
| db_logger.info(f"Successfully retrieved pipeline from mutable stream: {name}") | ||
|
|
||
| return { | ||
| "id": pipeline_id, | ||
| "name": name, | ||
| "pipeline": pipeline_data, | ||
| "write_count": count | ||
| } | ||
| except json.JSONDecodeError as e: | ||
| db_logger.error(f"Failed to parse pipeline JSON: {e}") | ||
| db_logger.debug(f"Malformed JSON: {pipeline_json}") | ||
| raise RuntimeError(f"Failed to parse pipeline data: {e}") | ||
| else: | ||
| db_logger.warning(f"Pipeline with id {pipeline_id} not found") | ||
| raise ValueError(f"Pipeline with id {pipeline_id} not found.") | ||
|
|
||
| except Exception as e: | ||
| if isinstance(e, ValueError): | ||
|
|
@@ -428,9 +511,41 @@ def list_all(self): | |
| db_logger.info("Listing all pipelines") | ||
|
|
||
| try: | ||
| pipelines = self.metadata_manager.list_all() | ||
| db_logger.info(f"Successfully listed {len(pipelines)} pipelines") | ||
| return pipelines | ||
| if self.use_sqlite: | ||
| # List from SQLite | ||
| pipelines = self.metadata_manager.list_all() | ||
| db_logger.info(f"Successfully listed {len(pipelines)} pipelines from SQLite") | ||
| return pipelines | ||
| else: | ||
| # List from mutable stream (legacy mode) | ||
| query_sql = f"SELECT id, name, pipeline FROM table({self.pipeline_stream_name})" | ||
| db_logger.debug(f"Executing query: {query_sql}") | ||
|
|
||
| result = self.client.execute(query_sql) | ||
| db_logger.info(f"Found {len(result) if result else 0} pipelines") | ||
|
|
||
| pipelines = [] | ||
| for i, row in enumerate(result): | ||
| try: | ||
| id, name, pipeline_json = row | ||
| pipeline_data = json.loads(pipeline_json) | ||
|
|
||
| pipeline_info = { | ||
| "id": id, | ||
| "name": name, | ||
| "question": pipeline_data.get("question", ""), | ||
| "created_at": pipeline_data.get("created_at", "") | ||
| } | ||
| pipelines.append(pipeline_info) | ||
|
|
||
| db_logger.debug(f"Pipeline {i}: {id} - {name}") | ||
|
|
||
| except Exception as e: | ||
| db_logger.error(f"Failed to parse pipeline {i}: {e}") | ||
| continue | ||
|
|
||
| db_logger.info(f"Successfully processed {len(pipelines)} pipelines from mutable stream") | ||
| return pipelines | ||
|
|
||
| except Exception as e: | ||
| db_logger.error(f"Failed to list pipelines: {e}") | ||
|
|
@@ -439,9 +554,9 @@ def list_all(self): | |
| def delete(self, pipeline_id): | ||
| db_logger.info(f"Deleting pipeline with ID: {pipeline_id}") | ||
|
|
||
| # Get pipeline info first | ||
| # Get pipeline info first (this will work with both backends) | ||
| try: | ||
| pipeline_info = self.metadata_manager.get(pipeline_id) | ||
| pipeline_info = self.get(pipeline_id) | ||
| pipeline = pipeline_info["pipeline"] | ||
| name = pipeline_info["name"] | ||
|
|
||
|
|
@@ -478,11 +593,21 @@ def delete(self, pipeline_id): | |
|
|
||
| # TODO: delete Kafka topic | ||
|
|
||
| # Delete pipeline metadata from SQLite | ||
| # Delete pipeline metadata using the configured backend | ||
| try: | ||
| db_logger.info("Deleting pipeline metadata from SQLite...") | ||
| self.metadata_manager.delete(pipeline_id) | ||
| db_logger.info(f"Pipeline {name} deleted successfully") | ||
| if self.use_sqlite: | ||
| # Delete from SQLite | ||
| db_logger.info("Deleting pipeline metadata from SQLite...") | ||
| self.metadata_manager.delete(pipeline_id) | ||
| db_logger.info(f"Pipeline {name} deleted successfully from SQLite") | ||
| else: | ||
| # Delete from mutable stream (legacy mode) | ||
| db_logger.info("Deleting pipeline metadata from mutable stream...") | ||
| delete_sql = f"DELETE FROM {self.pipeline_stream_name} WHERE id = '{pipeline_id}'" | ||
| db_logger.debug(f"Executing: {delete_sql}") | ||
|
|
||
| self.client.execute(delete_sql) | ||
| db_logger.info(f"Pipeline {name} deleted successfully from mutable stream") | ||
|
|
||
| except Exception as e: | ||
| db_logger.error(f"Failed to delete pipeline metadata: {e}") | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this code should be moved into method _init_pipeline_metadata