Skip to content

Commit

Permalink
Merge pull request #213 from tsugumi-sys/feature/output-json-stdout-p…
Browse files Browse the repository at this point in the history
…olygonapi-aggregates-bar

output json stdout for polygonapi aggregates bars
  • Loading branch information
tsugumi-sys authored Jul 20, 2024
2 parents adba268 + 9b31c3f commit 3097997
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 7 deletions.
4 changes: 2 additions & 2 deletions stocklake/polygonapi/aggregates_bars/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
@click.option("--symbols", default=None, help="symbols split by comma. ex `MSFT,AAPL`.")
@click.option(
"--store_type",
default=StoreType.LOCAL_ARTIFACT,
default=None,
help=f"The storege type, should be in `{StoreType.types()}`.",
)
def aggregates_bars(
skip_download: bool,
symbols: Optional[str],
store_type: StoreType,
store_type: StoreType | None,
):
if symbols is None:
raise StockLakeException("`symbols` must be given.")
Expand Down
16 changes: 11 additions & 5 deletions stocklake/polygonapi/aggregates_bars/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import logging
from typing import List, Optional

Expand Down Expand Up @@ -25,20 +26,21 @@ def __init__(
self,
symbols: List[str],
skip_download: bool = False,
store_type: StoreType = StoreType.LOCAL_ARTIFACT,
store_type: StoreType | None = None,
sqlalchemy_session: Optional[DATABASE_SESSION_TYPE] = None,
):
self.symbols = symbols
self.skip_download = skip_download

validate_store_type(store_type)
if store_type is not None:
validate_store_type(store_type)
self.store_type = store_type
self.data_loader = PolygonAggregatesBarsDataLoader(use_cache=self.skip_download)
self.preprocessor = PolygonAggregatesBarsPreprocessor()
if sqlalchemy_session is None:
sqlalchemy_session = local_session()
self.store = PolygonAggregatesBarsDataStore(sqlalchemy_session)
self.stdout = PipelineStdOut()
self.stdout = PipelineStdOut(enable_stdout=store_type is not None)

def run(self):
for symbol in self.symbols:
Expand All @@ -58,5 +60,9 @@ def _run(
self.stdout.skip_downloading()
raw_data = data_loader.download(self.symbols)
data = preprocessor.process(raw_data)
saved_location = store.save(self.store_type, data)
self.stdout.completed(saved_location)
if self.store_type is not None:
saved_location = store.save(self.store_type, data)
self.stdout.completed(saved_location)
else:
# MEMO: output a serialized json to the stdout for pipe.
print(json.dumps([d.model_dump() for d in data]), flush=True)

0 comments on commit 3097997

Please sign in to comment.