Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions calypso_anemometer/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,20 @@ def cli(ctx, quiet: t.Optional[bool], verbose: t.Optional[bool], debug: t.Option
)
subscribe_option = click.option("--subscribe", is_flag=True, required=False, help="Continuously receive readings")
target_option = click.option("--target", type=str, required=False, help="Submit telemetry data to target")
log_to_option = click.option(
"--log-to",
type=click.Path(writable=True, dir_okay=False),
required=False,
help="Path to a file to log telemetry data (e.g., data.csv).",
)
log_file_lines_option = click.option(
"--log-file-lines",
type=int,
required=False,
default=1000,
help="Maximum number of lines to keep in the log file. Default: 1000.",
)



@click.command()
Expand Down Expand Up @@ -177,6 +191,8 @@ async def handler(calypso: CalypsoDeviceApi):
@target_option
@rate_option
@compass_option
@log_to_option
@log_file_lines_option
@click.pass_context
@make_sync
async def read(
Expand All @@ -189,6 +205,8 @@ async def read(
target: t.Optional[str] = None,
rate: t.Optional[CalypsoDeviceDataRate] = None,
compass: t.Optional[CalypsoDeviceCompassStatus] = None,
log_to: t.Optional[str] = None,
log_file_lines: t.Optional[int] = None,
):
quiet = ctx.parent.params.get("quiet")
settings = Settings(
Expand All @@ -197,7 +215,7 @@ async def read(
ble_discovery_timeout=ble_discovery_timeout,
ble_connect_timeout=ble_connect_timeout,
)
handler = await handler_factory(subscribe=subscribe, target=target, rate=rate, compass=compass, quiet=quiet)
handler = await handler_factory(subscribe=subscribe, target=target, rate=rate, compass=compass, quiet=quiet, log_to=log_to, log_file_lines=log_file_lines)
await run_engine(workhorse=CalypsoDeviceApi, settings=settings, handler=handler)


Expand All @@ -206,6 +224,8 @@ async def read(
@target_option
@rate_option
@compass_option
@log_to_option
@log_file_lines_option
@click.pass_context
@make_sync
async def fake(
Expand All @@ -214,11 +234,13 @@ async def fake(
target: t.Optional[str] = None,
rate: t.Optional[CalypsoDeviceDataRate] = None,
compass: t.Optional[CalypsoDeviceCompassStatus] = None,
log_to: t.Optional[str] = None,
log_file_lines: t.Optional[int] = None,
):
from calypso_anemometer.fake import CalypsoDeviceApiFake

quiet = ctx.parent.params.get("quiet")
handler = await handler_factory(subscribe=subscribe, target=target, rate=rate, compass=compass, quiet=quiet)
handler = await handler_factory(subscribe=subscribe, target=target, rate=rate, compass=compass, quiet=quiet, log_to=log_to, log_file_lines=log_file_lines)
await run_engine(workhorse=CalypsoDeviceApiFake, handler=handler)


Expand Down
39 changes: 39 additions & 0 deletions calypso_anemometer/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
from calypso_anemometer.telemetry.adapter import TelemetryAdapter
from calypso_anemometer.util import wait_forever

from datetime import datetime, timezone
import os

logger = logging.getLogger(__name__)


Expand All @@ -34,6 +37,8 @@ async def handler_factory(
rate: t.Optional[CalypsoDeviceDataRate] = None,
compass: t.Optional[CalypsoDeviceCompassStatus] = None,
quiet: bool = False,
log_to: t.Optional[str] = None,
log_file_lines: t.Optional[int] = None,
) -> t.Callable:
"""
Create an asynchronous handler function for processing readings.
Expand All @@ -54,15 +59,49 @@ async def handler_factory(
telemetry = None
if target is not None:
telemetry = TelemetryAdapter(uri=target)

# Optionally log to a file
log_file = None
if log_to is not None:
log_file = open(log_to, mode="a", encoding="utf-8")
# Write header if file is empty
if log_file.tell() == 0:
log_file.write("wind_speed,wind_direction,temperature\n")

# Function to write a row to a CSV file, truncating if necessary.
def write_row_to_csv(filepath: str, row: str, log_file_lines: int):
lines = []
if os.path.exists(filepath):
with open(filepath, "r") as f:
lines = f.readlines()

lines.append(row + "\n")

# Truncate if too many lines
if log_file_lines is not None and len(lines) > log_file_lines:
lines = lines[-log_file_lines:]

with open(filepath, "w") as f:
f.writelines(lines)

# When a reading is received, optionally display on STDOUT or hand over to telemetry adapter.
def process_reading(reading: CalypsoReading):
nonlocal message_counter
message_counter += 1
timestamp = datetime.now(timezone.utc).isoformat()


if not quiet:
print(f"[{timestamp}] ", end="")
reading.dump()
if telemetry is not None:
telemetry.submit(reading)

# Optionally log to a file.
if log_file is not None:
row = f"{timestamp},{reading.wind_speed},{reading.wind_direction},{reading.temperature}\n"
write_row_to_csv(log_to, row, log_file_lines)

if message_counter % message_counter_log_each == 0:
logger.info(f"Processed readings: {message_counter}")

Expand Down