From 9b31c3f8a33479cd3b1038d5713a185f1fcc2d65 Mon Sep 17 00:00:00 2001 From: tsugumi-sys Date: Sat, 20 Jul 2024 12:01:05 +0900 Subject: [PATCH] output json stdout for polygonapi aggregates bars --- stocklake/polygonapi/aggregates_bars/cli.py | 4 ++-- stocklake/polygonapi/aggregates_bars/pipeline.py | 16 +++++++++++----- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/stocklake/polygonapi/aggregates_bars/cli.py b/stocklake/polygonapi/aggregates_bars/cli.py index 905c2d6..eae0367 100644 --- a/stocklake/polygonapi/aggregates_bars/cli.py +++ b/stocklake/polygonapi/aggregates_bars/cli.py @@ -16,13 +16,13 @@ @click.option("--symbols", default=None, help="symbols split by comma. ex `MSFT,AAPL`.") @click.option( "--store_type", - default=StoreType.LOCAL_ARTIFACT, + default=None, help=f"The storege type, should be in `{StoreType.types()}`.", ) def aggregates_bars( skip_download: bool, symbols: Optional[str], - store_type: StoreType, + store_type: StoreType | None, ): if symbols is None: raise StockLakeException("`symbols` must be given.") diff --git a/stocklake/polygonapi/aggregates_bars/pipeline.py b/stocklake/polygonapi/aggregates_bars/pipeline.py index 768ffd0..c94e3cd 100644 --- a/stocklake/polygonapi/aggregates_bars/pipeline.py +++ b/stocklake/polygonapi/aggregates_bars/pipeline.py @@ -1,3 +1,4 @@ +import json import logging from typing import List, Optional @@ -25,20 +26,21 @@ def __init__( self, symbols: List[str], skip_download: bool = False, - store_type: StoreType = StoreType.LOCAL_ARTIFACT, + store_type: StoreType | None = None, sqlalchemy_session: Optional[DATABASE_SESSION_TYPE] = None, ): self.symbols = symbols self.skip_download = skip_download - validate_store_type(store_type) + if store_type is not None: + validate_store_type(store_type) self.store_type = store_type self.data_loader = PolygonAggregatesBarsDataLoader(use_cache=self.skip_download) self.preprocessor = PolygonAggregatesBarsPreprocessor() if sqlalchemy_session is None: sqlalchemy_session = local_session() self.store = PolygonAggregatesBarsDataStore(sqlalchemy_session) - self.stdout = PipelineStdOut() + self.stdout = PipelineStdOut(enable_stdout=store_type is not None) def run(self): for symbol in self.symbols: @@ -58,5 +60,9 @@ def _run( self.stdout.skip_downloading() raw_data = data_loader.download(self.symbols) data = preprocessor.process(raw_data) - saved_location = store.save(self.store_type, data) - self.stdout.completed(saved_location) + if self.store_type is not None: + saved_location = store.save(self.store_type, data) + self.stdout.completed(saved_location) + else: + # MEMO: output a serialized json to the stdout for pipe. + print(json.dumps([d.model_dump() for d in data]), flush=True)