1616
1717from __future__ import annotations
1818
19+ import argparse
1920import json
2021import pkgutil
2122import sys
2223import traceback
23- from collections .abc import Mapping
24+ from collections .abc import MutableMapping
2425from pathlib import Path
2526from typing import Any , cast
2627
2728import orjson
29+ import yaml
2830
2931from airbyte_cdk .entrypoint import AirbyteEntrypoint , launch
3032from airbyte_cdk .models import (
@@ -54,7 +56,7 @@ class SourceLocalYaml(YamlDeclarativeSource):
5456 def __init__ (
5557 self ,
5658 catalog : ConfiguredAirbyteCatalog | None ,
57- config : Mapping [str , Any ] | None ,
59+ config : MutableMapping [str , Any ] | None ,
5860 state : TState ,
5961 ** kwargs : Any ,
6062 ) -> None :
@@ -91,7 +93,8 @@ def handle_command(args: list[str]) -> None:
9193
9294def _get_local_yaml_source (args : list [str ]) -> SourceLocalYaml :
9395 try :
94- config , catalog , state = _parse_inputs_into_config_catalog_state (args )
96+ parsed_args = AirbyteEntrypoint .parse_args (args )
97+ config , catalog , state = _parse_inputs_into_config_catalog_state (parsed_args )
9598 return SourceLocalYaml (config = config , catalog = catalog , state = state )
9699 except Exception as error :
97100 print (
@@ -162,21 +165,40 @@ def create_declarative_source(
162165 connector builder.
163166 """
164167 try :
165- config : Mapping [str , Any ] | None
168+ config : MutableMapping [str , Any ] | None
166169 catalog : ConfiguredAirbyteCatalog | None
167170 state : list [AirbyteStateMessage ]
168- config , catalog , state = _parse_inputs_into_config_catalog_state (args )
169- if config is None or "__injected_declarative_manifest" not in config :
171+
172+ parsed_args = AirbyteEntrypoint .parse_args (args )
173+ config , catalog , state = _parse_inputs_into_config_catalog_state (parsed_args )
174+
175+ if config is None :
176+ raise ValueError (
177+ "Invalid config: `__injected_declarative_manifest` should be provided at the root "
178+ "of the config or using the --manifest-path argument."
179+ )
180+
181+ # If a manifest_path is provided in the args, inject it into the config
182+ if hasattr (parsed_args , "manifest_path" ) and parsed_args .manifest_path :
183+ injected_manifest = _parse_manifest_from_file (parsed_args .manifest_path )
184+ if injected_manifest :
185+ config ["__injected_declarative_manifest" ] = injected_manifest
186+
187+ if "__injected_declarative_manifest" not in config :
170188 raise ValueError (
171189 "Invalid config: `__injected_declarative_manifest` should be provided at the root "
172- f"of the config but config only has keys: { list (config .keys () if config else [])} "
190+ "of the config or using the --manifest-path argument. "
191+ f"Config only has keys: { list (config .keys () if config else [])} "
173192 )
174193 if not isinstance (config ["__injected_declarative_manifest" ], dict ):
175194 raise ValueError (
176195 "Invalid config: `__injected_declarative_manifest` should be a dictionary, "
177196 f"but got type: { type (config ['__injected_declarative_manifest' ])} "
178197 )
179198
199+ if hasattr (parsed_args , "components_path" ) and parsed_args .components_path :
200+ _register_components_from_file (parsed_args .components_path )
201+
180202 return ConcurrentDeclarativeSource (
181203 config = config ,
182204 catalog = catalog ,
@@ -205,13 +227,12 @@ def create_declarative_source(
205227
206228
207229def _parse_inputs_into_config_catalog_state (
208- args : list [ str ] ,
230+ parsed_args : argparse . Namespace ,
209231) -> tuple [
210- Mapping [str , Any ] | None ,
232+ MutableMapping [str , Any ] | None ,
211233 ConfiguredAirbyteCatalog | None ,
212234 list [AirbyteStateMessage ],
213235]:
214- parsed_args = AirbyteEntrypoint .parse_args (args )
215236 config = (
216237 ConcurrentDeclarativeSource .read_config (parsed_args .config )
217238 if hasattr (parsed_args , "config" )
@@ -231,6 +252,44 @@ def _parse_inputs_into_config_catalog_state(
231252 return config , catalog , state
232253
233254
255+ def _parse_manifest_from_file (filepath : str ) -> dict [str , Any ] | None :
256+ """Extract and parse a manifest file specified in the args."""
257+ try :
258+ with open (filepath , "r" , encoding = "utf-8" ) as manifest_file :
259+ manifest_content = yaml .safe_load (manifest_file )
260+ if manifest_content is None :
261+ raise ValueError (f"Manifest file at { filepath } is empty" )
262+ if not isinstance (manifest_content , dict ):
263+ raise ValueError (f"Manifest must be a dictionary, got { type (manifest_content )} " )
264+ return manifest_content
265+ except Exception as error :
266+ raise ValueError (f"Failed to load manifest file from { filepath } : { error } " )
267+
268+
269+ def _register_components_from_file (filepath : str ) -> None :
270+ """Load and register components from a Python file specified in the args."""
271+ import importlib .util
272+ import sys
273+
274+ components_path = Path (filepath )
275+
276+ module_name = "components"
277+ sdm_module_name = "source_declarative_manifest.components"
278+
279+ # Create module spec
280+ spec = importlib .util .spec_from_file_location (module_name , components_path )
281+ if spec is None or spec .loader is None :
282+ raise ImportError (f"Could not load module from { components_path } " )
283+
284+ # Create module and execute code, registering the module before executing its code
285+ # To avoid issues with dataclasses that look up the module
286+ module = importlib .util .module_from_spec (spec )
287+ sys .modules [module_name ] = module
288+ sys .modules [sdm_module_name ] = module
289+
290+ spec .loader .exec_module (module )
291+
292+
234293def run () -> None :
235294 args : list [str ] = sys .argv [1 :]
236295 handle_command (args )
0 commit comments