Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update publishing of status messages over MQTT #264

Merged
merged 2 commits into from
May 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions src/isar/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,18 +192,26 @@ class Settings(BaseSettings):
# Data classification
DATA_CLASSIFICATION: str = Field(default="internal")

# Type of missions the robot performs
MISSION_TYPE: str = Field(default="inspection")
# List of MQTT Topics

ISAR_STATE: str = Field(default="isar_state")
TOPIC_ISAR_STATE: str = Field(default="state")

ISAR_MISSION: str = Field(default="isar_mission")
TOPIC_ISAR_MISSION: str = Field(default="mission")

ISAR_STEP_STATUS: str = Field(default="isar_step_status")
TOPIC_ISAR_TASK: str = Field(default="task")

@validator("ISAR_STATE", "ISAR_MISSION", "ISAR_STEP_STATUS", pre=True, always=True)
TOPIC_ISAR_STEP: str = Field(default="step")

@validator(
"TOPIC_ISAR_STATE",
"TOPIC_ISAR_MISSION",
"TOPIC_ISAR_TASK",
"TOPIC_ISAR_STEP",
pre=True,
always=True,
)
def prefix_isar_topics(cls, v, values):
return f"{values['ROBOT_ID']}/{v}"
return f"isar/{values['ROBOT_ID']}/{v}"

class Config:
with pkg_resources.path("isar.config", "settings.env") as path:
Expand Down
2 changes: 2 additions & 0 deletions src/isar/models/mission/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .mission import Mission, Task
from .status import MissionStatus, TaskStatus
19 changes: 11 additions & 8 deletions src/isar/models/mission.py → src/isar/models/mission/mission.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,20 @@
from isar.config.settings import settings
from isar.models.mission_metadata.mission_metadata import MissionMetadata
from robot_interface.models.mission import (
STEPS,
InspectionStep,
MotionStep,
STEPS,
Step,
StepStatus,
)

from .status import MissionStatus, TaskStatus


@dataclass
class Task:
steps: List[STEPS]
status: StepStatus = field(default=StepStatus.NotStarted, init=False)
status: TaskStatus = field(default=TaskStatus.NotStarted, init=False)
id: UUID = field(default_factory=uuid4, init=False)
_iterator: Iterator = None

Expand All @@ -29,7 +31,7 @@ def is_finished(self) -> bool:
if step.status is StepStatus.Failed and isinstance(step, MotionStep):
# One motion step has failed meaning the task as a whole should be
# aborted
self.status = StepStatus.Failed
self.status = TaskStatus.Failed
return True

elif (step.status is StepStatus.Failed) and isinstance(
Expand All @@ -38,10 +40,10 @@ def is_finished(self) -> bool:
# It should be possible to perform several inspections per task. If
# one out of many inspections fail the task is considered as
# partially successful.
self.status = StepStatus.PartiallySuccessful
self.status = TaskStatus.PartiallySuccessful
continue

elif step.status is StepStatus.Completed:
elif step.status is StepStatus.Successful:
# The task is complete once all steps are completed
continue
else:
Expand All @@ -50,13 +52,13 @@ def is_finished(self) -> bool:

# Check if the task has been marked as partially successful by having one or
# more inspection steps fail
if self.status is not StepStatus.PartiallySuccessful:
if self.status is not TaskStatus.PartiallySuccessful:
# All steps have been completed
self.status = StepStatus.Completed
self.status = TaskStatus.Successful

# Set the task to failed if all inspection steps failed
elif self._all_inspection_steps_failed():
self.status = StepStatus.Failed
self.status = TaskStatus.Failed

return True

Expand All @@ -78,6 +80,7 @@ def __post_init__(self) -> None:
class Mission:
tasks: List[Task]
id: Union[UUID, int, str, None] = None
status: MissionStatus = MissionStatus.NotStarted
metadata: MissionMetadata = None
_iterator: Iterator = None

Expand Down
19 changes: 19 additions & 0 deletions src/isar/models/mission/status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from enum import Enum


class MissionStatus(str, Enum):
NotStarted: str = "not_started"
Started: str = "started"
InProgress: str = "in_progress"
Failed: str = "failed"
Cancelled: str = "cancelled"
Completed: str = "completed"


class TaskStatus(str, Enum):
NotStarted: str = "not_started"
InProgress: str = "in_progress"
PartiallySuccessful: str = "partially_successful"
Failed: str = "failed"
Cancelled: str = "cancelled"
Successful: str = "successful"
65 changes: 54 additions & 11 deletions src/isar/state_machine/state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import queue
from collections import deque
from copy import deepcopy
from datetime import datetime
from typing import Deque, List, Optional, Tuple

from injector import Injector, inject
Expand All @@ -17,6 +18,7 @@
)
from isar.models.communication.queues.queues import Queues
from isar.models.mission import Mission, Task
from isar.models.mission.status import MissionStatus, TaskStatus
from isar.services.service_connections.mqtt.mqtt_client import MqttClientInterface
from isar.services.utilities.json_service import EnhancedJSONEncoder
from isar.state_machine.states import Finalize, Idle, InitiateStep, Monitor, Off
Expand Down Expand Up @@ -116,9 +118,11 @@ def to_next_state(self, next_state):

def update_current_task(self):
if self.current_task.is_finished():
self.publish_task_status()
try:
self.current_task = self.current_mission.next_task()
self.current_task.status = StepStatus.InProgress
self.current_task.status = TaskStatus.InProgress
self.publish_task_status()
except StopIteration:
# Indicates that all tasks are finished
self.current_task = None
Expand All @@ -136,7 +140,7 @@ def update_state(self):

if self.mqtt_client:
self.mqtt_client.publish(
topic=settings.ISAR_STATE,
topic=settings.TOPIC_ISAR_STATE,
payload=payload,
retain=True,
)
Expand Down Expand Up @@ -213,8 +217,11 @@ def start_mission(self, mission: Mission):
"""Starts a scheduled mission."""
self.mission_in_progress = True
self.current_mission = mission
self.current_mission.status = MissionStatus.InProgress
self.publish_mission_status()
self.current_task = mission.next_task()
self.current_task.status = StepStatus.InProgress
self.current_task.status = TaskStatus.InProgress
self.publish_task_status()

self.queues.start_mission.output.put(deepcopy(StartMissionMessages.success()))
self.logger.info(f"Starting new mission: {mission.id}")
Expand Down Expand Up @@ -266,31 +273,67 @@ def stop_mission(self):
self.queues.stop_mission.output.put(deepcopy(message))
self.logger.info(message)
if not failure:
self.current_mission.status = MissionStatus.Cancelled
self.mission_in_progress = False
for task in self.current_mission.tasks:
for step in task.steps:
if step.status in [StepStatus.NotStarted, StepStatus.InProgress]:
step.status = StepStatus.Cancelled
if task.status in [TaskStatus.NotStarted, TaskStatus.InProgress]:
task.status = TaskStatus.Cancelled

def publish_mission_status(self) -> None:
payload: str = json.dumps(
{
"robot_id": settings.ROBOT_ID,
"mission_id": self.current_mission.id if self.current_mission else None,
"status": self.current_mission.status if self.current_mission else None,
"timestamp": datetime.utcnow(),
},
cls=EnhancedJSONEncoder,
)

def publish_step_status(self) -> None:
"""Publishes the current step status to the MQTT Broker"""
self.mqtt_client.publish(
topic=settings.TOPIC_ISAR_MISSION,
payload=payload,
retain=True,
)

def publish_task_status(self) -> None:
"""Publishes the current task status to the MQTT Broker"""
payload: str = json.dumps(
{
"step_id": self.current_step.id if self.current_step else None,
"step_status": self.current_step.status if self.current_step else None,
"robot_id": settings.ROBOT_ID,
"misison_id": self.current_mission.id if self.current_mission else None,
"task_id": self.current_task.id if self.current_task else None,
"status": self.current_task.status if self.current_task else None,
"timestamp": datetime.utcnow(),
},
cls=EnhancedJSONEncoder,
)

self.mqtt_client.publish(
topic=settings.ISAR_STEP_STATUS,
topic=settings.TOPIC_ISAR_TASK,
payload=payload,
retain=True,
)

def publish_mission(self) -> None:
def publish_step_status(self) -> None:
"""Publishes the current step status to the MQTT Broker"""
payload: str = json.dumps(
{"mission": self.current_mission}, cls=EnhancedJSONEncoder
{
"robot_id": settings.ROBOT_ID,
"misison_id": self.current_mission.id if self.current_mission else None,
"task_id": self.current_task.id if self.current_task else None,
"step_id": self.current_step.id if self.current_step else None,
"status": self.current_step.status if self.current_step else None,
"timestamp": datetime.utcnow(),
},
cls=EnhancedJSONEncoder,
)

self.mqtt_client.publish(
topic=settings.ISAR_MISSION,
topic=settings.TOPIC_ISAR_STEP,
payload=payload,
retain=True,
)
Expand Down
4 changes: 3 additions & 1 deletion src/isar/state_machine/states/finalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from injector import inject
from transitions import State

from robot_interface.models.mission.status import StepStatus

if TYPE_CHECKING:
from isar.state_machine.state_machine import StateMachine

Expand All @@ -17,7 +19,7 @@ def __init__(self, state_machine: "StateMachine"):

def start(self):
self.state_machine.update_state()

self.state_machine.publish_mission_status()
self.state_machine.log_step_overview(mission=self.state_machine.current_mission)
next_state = self.state_machine.reset_state_machine()
self.state_machine.to_next_state(next_state)
Expand Down
6 changes: 3 additions & 3 deletions src/isar/state_machine/states/initiate_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from transitions import State

from isar.config.settings import settings
from isar.models.mission.status import MissionStatus
from isar.services.utilities.threaded_request import (
ThreadedRequest,
ThreadedRequestNotFinishedError,
Expand Down Expand Up @@ -39,14 +40,12 @@ def start(self):

if self.state_machine.mqtt_client:
self.state_machine.publish_step_status()
self.state_machine.publish_mission()

self._run()

def stop(self):
if self.state_machine.mqtt_client:
self.state_machine.publish_step_status()
self.state_machine.publish_mission()

self.initiate_step_failure_counter = 0
if self.initiate_step_thread:
Expand Down Expand Up @@ -84,7 +83,6 @@ def _run(self):
except ThreadedRequestNotFinishedError:
time.sleep(self.state_machine.sleep_time)
continue

except RobotInfeasibleStepException:
self.state_machine.current_step.status = StepStatus.Failed
self.logger.warning(
Expand Down Expand Up @@ -116,6 +114,8 @@ def _run(self):
self.initiate_step_failure_counter
>= self.initiate_step_failure_counter_limit
):
self.state_machine.current_step.status = StepStatus.Failed
self.state_machine.current_mission.status = MissionStatus.Failed
self.logger.error(
f"Failed to initiate step after "
f"{self.initiate_step_failure_counter_limit} attempts. "
Expand Down
9 changes: 4 additions & 5 deletions src/isar/state_machine/states/monitor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import time
from copy import deepcopy
from typing import Sequence, TYPE_CHECKING, Tuple
from typing import TYPE_CHECKING, Sequence, Tuple

from injector import inject
from transitions import State
Expand Down Expand Up @@ -39,8 +39,7 @@ def start(self):

def stop(self):
if self.state_machine.mqtt_client:
self.state_machine.publish_step_status()
self.state_machine.publish_mission()
self.state_machine.publish_task_status()
GodVenn marked this conversation as resolved.
Show resolved Hide resolved

self.iteration_counter = 0
if self.step_status_thread:
Expand Down Expand Up @@ -117,15 +116,15 @@ def _step_finished(self, step: Step) -> bool:
if step.status == StepStatus.Failed:
self.logger.warning(f"Step: {str(step.id)[:8]} failed")
finished = True
elif step.status == StepStatus.Completed:
elif step.status == StepStatus.Successful:
self.logger.info(
f"{type(step).__name__} step: {str(step.id)[:8]} completed"
)
finished = True
return finished

def _process_finished_step(self, step: Step) -> State:
if step.status == StepStatus.Completed and isinstance(step, InspectionStep):
if step.status == StepStatus.Successful and isinstance(step, InspectionStep):
self._queue_inspections_for_upload(current_step=step)

return States.InitiateStep
4 changes: 2 additions & 2 deletions src/robot_interface/models/mission/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
class StepStatus(str, Enum):

NotStarted: str = "not_started"
Completed: str = "completed"
PartiallySuccessful: str = "partially_successful"
Successful: str = "successful"
InProgress: str = "in_progress"
Failed: str = "failed"
Cancelled: str = "cancelled"
6 changes: 3 additions & 3 deletions tests/isar/state_machine/states/test_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
@pytest.mark.parametrize(
"mock_status, expected_output",
[
(StepStatus.Completed, True),
(StepStatus.Completed, True),
(StepStatus.Successful, True),
(StepStatus.Successful, True),
(StepStatus.Failed, True),
],
)
Expand All @@ -27,7 +27,7 @@ def test_step_finished(monitor: Monitor, mock_status, expected_output):
@pytest.mark.parametrize(
"mock_status, should_queue_upload",
[
(StepStatus.Completed, True),
(StepStatus.Successful, True),
(StepStatus.Failed, False),
],
)
Expand Down
1 change: 1 addition & 0 deletions tests/isar/state_machine/test_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ def test_should_stop_mission(


def test_stop_mission(state_machine):
state_machine.start_mission(MockMissionDefinition.default_mission)
state_machine.stop_mission()
message = state_machine.queues.stop_mission.output.get()
assert not state_machine.mission_in_progress
Expand Down
2 changes: 1 addition & 1 deletion tests/mocks/robot_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class MockRobot(RobotInterface):
def __init__(
self,
initiate_step: bool = True,
step_status: StepStatus = StepStatus.Completed,
step_status: StepStatus = StepStatus.Successful,
stop: bool = True,
pose: Pose = Pose(
position=Position(x=0, y=0, z=0, frame=Frame("robot")),
Expand Down