From bf941e5de929bed892d4ae619a58237c02c26675 Mon Sep 17 00:00:00 2001 From: Kai Koehler Date: Tue, 4 Jun 2024 17:34:48 -0700 Subject: [PATCH] Add functional POSTing and CLI confirmation --- src/sasquatchbackpack/cli.py | 2 ++ src/sasquatchbackpack/sasquatch.py | 41 ++++++++++++------------- src/sasquatchbackpack/schemas/usgs.avsc | 2 +- src/sasquatchbackpack/sources.py | 4 +-- tests/cli_test.py | 4 +-- 5 files changed, 26 insertions(+), 27 deletions(-) diff --git a/src/sasquatchbackpack/cli.py b/src/sasquatchbackpack/cli.py index a8ec55d..f905b2a 100644 --- a/src/sasquatchbackpack/cli.py +++ b/src/sasquatchbackpack/cli.py @@ -188,6 +188,8 @@ def usgs_earthquake_data( click.echo("------") return + click.confirm("Do you want to post the above data?", abort=True) + click.echo("Sending data...") config = sources.USGSConfig( diff --git a/src/sasquatchbackpack/sasquatch.py b/src/sasquatchbackpack/sasquatch.py index 1b9fffe..2987f10 100644 --- a/src/sasquatchbackpack/sasquatch.py +++ b/src/sasquatchbackpack/sasquatch.py @@ -35,7 +35,7 @@ class DispatcherConfig: ) partitions_count = 1 replication_factor = 3 - namespace = os.getenv("BACKPACK_NAMESPACE", "lsst.example") + namespace = os.getenv("BACKPACK_NAMESPACE", "lsst.backpack") class BackpackDispatcher: @@ -99,7 +99,7 @@ def create_topic(self) -> str: ) return response.text - def post(self) -> dict: + def post(self) -> str: """Assemble schema and payload from the given source, then makes a POST request to kafka. @@ -112,22 +112,21 @@ def post(self) -> dict: payload = {"value_schema": self.schema, "records": records} - # Temporary lint bypassing during testing - # ruff: noqa: ERA001 - return payload # noqa: RET504 - - # url = f"{self.config.sasquatch_rest_proxy_url}/topics/" - # f"{self.namespace}.{self.source.topic_name}" - - # headers = { - # "Content-Type": "application/vnd.kafka.avro.v2+json", - # "Accept": "application/vnd.kafka.v2+json", - # } - - # response = requests.request("POST", - # url, - # json=payload, - # headers=headers, - # timeout=10, - # ) - # return response.text + url = ( + f"{self.config.sasquatch_rest_proxy_url}/topics/" + f"{self.config.namespace}.{self.source.topic_name}" + ) + + headers = { + "Content-Type": "application/vnd.kafka.avro.v2+json", + "Accept": "application/vnd.kafka.v2+json", + } + + response = requests.request( + "POST", + url, + json=payload, + headers=headers, + timeout=10, + ) + return response.text diff --git a/src/sasquatchbackpack/schemas/usgs.avsc b/src/sasquatchbackpack/schemas/usgs.avsc index 4c14773..a2c158c 100644 --- a/src/sasquatchbackpack/schemas/usgs.avsc +++ b/src/sasquatchbackpack/schemas/usgs.avsc @@ -1 +1 @@ -{"namespace": "$namespace", "type": "record", "name": "$topic_name", "description": "Collection of earthquakes near the summit", "fields": [{"name": "timestamp", "type": "long"}, {"name": "id", "type": "str", "description": "unique earthquake id"}, {"name": "latitude", "type": "float", "units": "Degrees"}, {"name": "longitude", "type": "float", "units": "Degrees"}, {"name": "depth", "type": "float", "units": "Km"}, {"name": "magnitude", "type": "float", "units": "Richter Magnitudes"}]} +{"namespace": "$namespace", "name": "$topic_name", "type": "record", "description": "Collection of earthquakes near the summit", "fields": [{"name": "timestamp", "type": "long"}, {"name": "id", "type": "string", "description": "unique earthquake id"}, {"name": "latitude", "type": "float", "units": "Degrees"}, {"name": "longitude", "type": "float", "units": "Degrees"}, {"name": "depth", "type": "float", "units": "Km"}, {"name": "magnitude", "type": "float", "units": "Richter Magnitudes"}]} \ No newline at end of file diff --git a/src/sasquatchbackpack/sources.py b/src/sasquatchbackpack/sources.py index 7a07583..e719326 100644 --- a/src/sasquatchbackpack/sources.py +++ b/src/sasquatchbackpack/sources.py @@ -73,7 +73,7 @@ class USGSSource(DataSource): def __init__( self, config: USGSConfig, - topic_name: str = "usgs-earthquake-data", + topic_name: str = "usgs_earthquake_data", ) -> None: super().__init__(topic_name) self.duration = config.duration @@ -109,7 +109,7 @@ def get_records(self) -> list: return [ { "value": { - "timestamp": result.time.strftime("%s"), + "timestamp": int(result.time.strftime("%s")), "id": result.id, "latitude": result.latitude, "longitude": result.longitude, diff --git a/tests/cli_test.py b/tests/cli_test.py index d16e082..0a43796 100644 --- a/tests/cli_test.py +++ b/tests/cli_test.py @@ -138,10 +138,8 @@ def test_usgs_earthquake_data( str(magnitude_bounds[0]), str(magnitude_bounds[1]), ], + input="N", ) - if "SUCCESS" in expected: - assert result.exit_code == 0 - for value in expected: assert value in result.output