Skip to content

Commit

Permalink
Add functional POSTing and CLI confirmation
Browse files Browse the repository at this point in the history
  • Loading branch information
Fireye04 committed Jun 5, 2024
1 parent 936a429 commit bf941e5
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 27 deletions.
2 changes: 2 additions & 0 deletions src/sasquatchbackpack/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ def usgs_earthquake_data(
click.echo("------")
return

click.confirm("Do you want to post the above data?", abort=True)

click.echo("Sending data...")

config = sources.USGSConfig(
Expand Down
41 changes: 20 additions & 21 deletions src/sasquatchbackpack/sasquatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class DispatcherConfig:
)
partitions_count = 1
replication_factor = 3
namespace = os.getenv("BACKPACK_NAMESPACE", "lsst.example")
namespace = os.getenv("BACKPACK_NAMESPACE", "lsst.backpack")


class BackpackDispatcher:
Expand Down Expand Up @@ -99,7 +99,7 @@ def create_topic(self) -> str:
)
return response.text

def post(self) -> dict:
def post(self) -> str:
"""Assemble schema and payload from the given source, then
makes a POST request to kafka.
Expand All @@ -112,22 +112,21 @@ def post(self) -> dict:

payload = {"value_schema": self.schema, "records": records}

# Temporary lint bypassing during testing
# ruff: noqa: ERA001
return payload # noqa: RET504

# url = f"{self.config.sasquatch_rest_proxy_url}/topics/"
# f"{self.namespace}.{self.source.topic_name}"

# headers = {
# "Content-Type": "application/vnd.kafka.avro.v2+json",
# "Accept": "application/vnd.kafka.v2+json",
# }

# response = requests.request("POST",
# url,
# json=payload,
# headers=headers,
# timeout=10,
# )
# return response.text
url = (
f"{self.config.sasquatch_rest_proxy_url}/topics/"
f"{self.config.namespace}.{self.source.topic_name}"
)

headers = {
"Content-Type": "application/vnd.kafka.avro.v2+json",
"Accept": "application/vnd.kafka.v2+json",
}

response = requests.request(
"POST",
url,
json=payload,
headers=headers,
timeout=10,
)
return response.text
2 changes: 1 addition & 1 deletion src/sasquatchbackpack/schemas/usgs.avsc
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"namespace": "$namespace", "type": "record", "name": "$topic_name", "description": "Collection of earthquakes near the summit", "fields": [{"name": "timestamp", "type": "long"}, {"name": "id", "type": "str", "description": "unique earthquake id"}, {"name": "latitude", "type": "float", "units": "Degrees"}, {"name": "longitude", "type": "float", "units": "Degrees"}, {"name": "depth", "type": "float", "units": "Km"}, {"name": "magnitude", "type": "float", "units": "Richter Magnitudes"}]}
{"namespace": "$namespace", "name": "$topic_name", "type": "record", "description": "Collection of earthquakes near the summit", "fields": [{"name": "timestamp", "type": "long"}, {"name": "id", "type": "string", "description": "unique earthquake id"}, {"name": "latitude", "type": "float", "units": "Degrees"}, {"name": "longitude", "type": "float", "units": "Degrees"}, {"name": "depth", "type": "float", "units": "Km"}, {"name": "magnitude", "type": "float", "units": "Richter Magnitudes"}]}
4 changes: 2 additions & 2 deletions src/sasquatchbackpack/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class USGSSource(DataSource):
def __init__(
self,
config: USGSConfig,
topic_name: str = "usgs-earthquake-data",
topic_name: str = "usgs_earthquake_data",
) -> None:
super().__init__(topic_name)
self.duration = config.duration
Expand Down Expand Up @@ -109,7 +109,7 @@ def get_records(self) -> list:
return [
{
"value": {
"timestamp": result.time.strftime("%s"),
"timestamp": int(result.time.strftime("%s")),
"id": result.id,
"latitude": result.latitude,
"longitude": result.longitude,
Expand Down
4 changes: 1 addition & 3 deletions tests/cli_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,8 @@ def test_usgs_earthquake_data(
str(magnitude_bounds[0]),
str(magnitude_bounds[1]),
],
input="N",
)

if "SUCCESS" in expected:
assert result.exit_code == 0

for value in expected:
assert value in result.output

0 comments on commit bf941e5

Please sign in to comment.