Skip to content

Commit

Permalink
Update publishing of status messages over MQTT
Browse files Browse the repository at this point in the history
  • Loading branch information
vetlek committed May 18, 2022
1 parent 24af9e9 commit 2db3130
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 22 deletions.
22 changes: 15 additions & 7 deletions src/isar/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,18 +192,26 @@ class Settings(BaseSettings):
# Data classification
DATA_CLASSIFICATION: str = Field(default="internal")

# Type of missions the robot performs
MISSION_TYPE: str = Field(default="inspection")
# List of MQTT Topics

ISAR_STATE: str = Field(default="isar_state")
TOPIC_ISAR_STATE: str = Field(default="state")

ISAR_MISSION: str = Field(default="isar_mission")
TOPIC_ISAR_MISSION: str = Field(default="mission")

ISAR_STEP_STATUS: str = Field(default="isar_step_status")
TOPIC_ISAR_TASK: str = Field(default="task")

@validator("ISAR_STATE", "ISAR_MISSION", "ISAR_STEP_STATUS", pre=True, always=True)
TOPIC_ISAR_STEP: str = Field(default="step")

@validator(
"TOPIC_ISAR_STATE",
"TOPIC_ISAR_MISSION",
"TOPIC_ISAR_TASK",
"TOPIC_ISAR_STEP",
pre=True,
always=True,
)
def prefix_isar_topics(cls, v, values):
return f"{values['ROBOT_ID']}/{v}"
return f"isar/{values['ROBOT_ID']}/{v}"

class Config:
with pkg_resources.path("isar.config", "settings.env") as path:
Expand Down
3 changes: 2 additions & 1 deletion src/isar/models/mission.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
from isar.config.settings import settings
from isar.models.mission_metadata.mission_metadata import MissionMetadata
from robot_interface.models.mission import (
STEPS,
InspectionStep,
MotionStep,
STEPS,
Step,
StepStatus,
)
Expand Down Expand Up @@ -78,6 +78,7 @@ def __post_init__(self) -> None:
class Mission:
tasks: List[Task]
id: Union[UUID, int, str, None] = None
status: StepStatus = StepStatus.NotStarted
metadata: MissionMetadata = None
_iterator: Iterator = None

Expand Down
57 changes: 49 additions & 8 deletions src/isar/state_machine/state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import queue
from collections import deque
from copy import deepcopy
from datetime import datetime
from typing import Deque, List, Optional, Tuple

from injector import Injector, inject
Expand Down Expand Up @@ -116,9 +117,11 @@ def to_next_state(self, next_state):

def update_current_task(self):
if self.current_task.is_finished():
self.publish_task_status()
try:
self.current_task = self.current_mission.next_task()
self.current_task.status = StepStatus.InProgress
self.publish_task_status()
except StopIteration:
# Indicates that all tasks are finished
self.current_task = None
Expand All @@ -136,7 +139,7 @@ def update_state(self):

if self.mqtt_client:
self.mqtt_client.publish(
topic=settings.ISAR_STATE,
topic=settings.TOPIC_ISAR_STATE,
payload=payload,
retain=True,
)
Expand Down Expand Up @@ -213,8 +216,11 @@ def start_mission(self, mission: Mission):
"""Starts a scheduled mission."""
self.mission_in_progress = True
self.current_mission = mission
self.current_mission.status = StepStatus.InProgress
self.publish_mission_status()
self.current_task = mission.next_task()
self.current_task.status = StepStatus.InProgress
self.publish_task_status()

self.queues.start_mission.output.put(deepcopy(StartMissionMessages.success()))
self.logger.info(f"Starting new mission: {mission.id}")
Expand Down Expand Up @@ -267,30 +273,65 @@ def stop_mission(self):
self.logger.info(message)
if not failure:
self.mission_in_progress = False
for task in self.current_mission.tasks:
for step in task.steps:
if step.status == StepStatus.NotStarted:
step.status = StepStatus.Cancelled
if task.status == StepStatus.NotStarted:
task.status = StepStatus.Cancelled

def publish_mission_status(self) -> None:
payload: str = json.dumps(
{
"robot_id": settings.ROBOT_ID,
"mission_id": self.current_mission.id if self.current_mission else None,
"status": self.current_mission.status if self.current_mission else None,
"timestamp": datetime.utcnow(),
},
cls=EnhancedJSONEncoder,
)

def publish_step_status(self) -> None:
self.mqtt_client.publish(
topic=settings.TOPIC_ISAR_MISSION,
payload=payload,
retain=True,
)

def publish_task_status(self) -> None:
"""Publishes the current step status to the MQTT Broker"""
payload: str = json.dumps(
{
"step_id": self.current_step.id if self.current_step else None,
"step_status": self.current_step.status if self.current_step else None,
"robot_id": settings.ROBOT_ID,
"misison_id": self.current_mission.id if self.current_mission else None,
"task_id": self.current_task.id if self.current_task else None,
"status": self.current_task.status if self.current_task else None,
"timestamp": datetime.utcnow(),
},
cls=EnhancedJSONEncoder,
)

self.mqtt_client.publish(
topic=settings.ISAR_STEP_STATUS,
topic=settings.TOPIC_ISAR_TASK,
payload=payload,
retain=True,
)

def publish_mission(self) -> None:
def publish_step_status(self) -> None:
"""Publishes the current step status to the MQTT Broker"""
payload: str = json.dumps(
{"mission": self.current_mission}, cls=EnhancedJSONEncoder
{
"robot_id": settings.ROBOT_ID,
"misison_id": self.current_mission.id if self.current_mission else None,
"task_id": self.current_task.id if self.current_task else None,
"step_id": self.current_step.id if self.current_step else None,
"status": self.current_step.status if self.current_step else None,
"timestamp": datetime.utcnow(),
},
cls=EnhancedJSONEncoder,
)

self.mqtt_client.publish(
topic=settings.ISAR_MISSION,
topic=settings.TOPIC_ISAR_STEP,
payload=payload,
retain=True,
)
Expand Down
13 changes: 12 additions & 1 deletion src/isar/state_machine/states/finalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from injector import inject
from transitions import State

from robot_interface.models.mission.status import StepStatus

if TYPE_CHECKING:
from isar.state_machine.state_machine import StateMachine

Expand All @@ -17,7 +19,8 @@ def __init__(self, state_machine: "StateMachine"):

def start(self):
self.state_machine.update_state()

self.state_machine.current_mission.status = StepStatus.Completed
self.state_machine.publish_mission_status()
self.state_machine.log_step_overview(mission=self.state_machine.current_mission)
next_state = self.state_machine.reset_state_machine()
self.state_machine.to_next_state(next_state)
Expand All @@ -36,3 +39,11 @@ def _log_state_transitions(self):
)

self.logger.info(f"State transitions:\n {state_transitions}")

def _update_tasks(self):
for task in self.state_machine.current_mission.tasks:
for step in task.steps:
if step.status == StepStatus.NotStarted:
step.status = StepStatus.Cancelled
if task.status == StepStatus.NotStarted:
task.status = StepStatus.Cancelled
2 changes: 0 additions & 2 deletions src/isar/state_machine/states/initiate_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,12 @@ def start(self):

if self.state_machine.mqtt_client:
self.state_machine.publish_step_status()
self.state_machine.publish_mission()

self._run()

def stop(self):
if self.state_machine.mqtt_client:
self.state_machine.publish_step_status()
self.state_machine.publish_mission()

self.initiate_step_failure_counter = 0
if self.initiate_step_thread:
Expand Down
5 changes: 2 additions & 3 deletions src/isar/state_machine/states/monitor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import time
from copy import deepcopy
from typing import Sequence, TYPE_CHECKING, Tuple
from typing import TYPE_CHECKING, Sequence, Tuple

from injector import inject
from transitions import State
Expand Down Expand Up @@ -39,8 +39,7 @@ def start(self):

def stop(self):
if self.state_machine.mqtt_client:
self.state_machine.publish_step_status()
self.state_machine.publish_mission()
self.state_machine.publish_task_status()

self.iteration_counter = 0
if self.step_status_thread:
Expand Down
1 change: 1 addition & 0 deletions src/robot_interface/models/mission/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ class StepStatus(str, Enum):
PartiallySuccessful: str = "partially_successful"
InProgress: str = "in_progress"
Failed: str = "failed"
Cancelled: str = "cancelled"
1 change: 1 addition & 0 deletions tests/isar/state_machine/test_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ def test_should_stop_mission(


def test_stop_mission(state_machine):
state_machine.start_mission(MockMissionDefinition.default_mission)
state_machine.stop_mission()
message = state_machine.queues.stop_mission.output.get()
assert not state_machine.mission_in_progress
Expand Down

0 comments on commit 2db3130

Please sign in to comment.