Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions .env.development
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Azure Relay Configuration
AZURE_RELAY_NAMESPACE=manbrs-gateway-dev.servicebus.windows.net
AZURE_RELAY_HYBRID_CONNECTION=name-of-your-choice-relay-test-hc
AZURE_RELAY_KEY_NAME=RootManageSharedAccessKey
AZURE_RELAY_SHARED_ACCESS_KEY=YOUR_SHARED_ACCESS_KEY_HERE

# MWL Server Configuration
MWL_AET=SCREENING_MWL
MWL_PORT=4243
MWL_DB_PATH=/var/lib/pacs/worklist.db

# PACS Server Configuration
PACS_AET=SCREENING_PACS
PACS_PORT=4244
PACS_STORAGE_PATH=/var/lib/pacs/storage
PACS_DB_PATH=/var/lib/pacs/pacs.db

# General Configuration
LOG_LEVEL=INFO
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,14 @@ The PACS server provides C-STORE functionality for receiving medical images:

See [PACS documentation](docs/pacs/README.md) for detailed information.

### Relay Listener

The Relay Listener handles incoming messages from the cloud service via Azure Relay:
- Listens on configured Hybrid Connection
- Processes worklist actions (e.g., create worklist item)

See [Relay Listener documentation](docs/relay-listener/README.md) for details.

## Testing

This project uses:
Expand Down
22 changes: 22 additions & 0 deletions compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,28 @@ services:
timeout: 5s
retries: 3

listener:
build:
context: .
dockerfile: Dockerfile
container_name: listener
command: ["uv", "run", "python", "-m", "src.relay_listener"]
volumes:
- pacs-db:/var/lib/pacs
environment:
- AZURE_RELAY_NAMESPACE=${AZURE_RELAY_NAMESPACE}
- AZURE_RELAY_HYBRID_CONNECTION=${AZURE_RELAY_HYBRID_CONNECTION}
- AZURE_RELAY_KEY_NAME=${AZURE_RELAY_KEY_NAME}
- AZURE_RELAY_SHARED_ACCESS_KEY=${AZURE_RELAY_SHARED_ACCESS_KEY}
- DB_PATH=${DB_PATH:-/var/lib/pacs/worklist.db}
- LOG_LEVEL=INFO
restart: unless-stopped
healthcheck:
test: ["CMD", "sqlite3", "/var/lib/pacs/worklist.db", "SELECT 1"]
interval: 30s
timeout: 5s
retries: 3

volumes:
pacs-storage:
driver: local
Expand Down
94 changes: 94 additions & 0 deletions docs/relay-listener/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# Azure Relay listener

Relay listener uses websocket communication to Manage Breast Screening service via Azure Relay.
The listener processes worklist actions sent from Manage/Django and creates worklist items in the Modality Worklist server.


## Architecture

```
┌─────────────────────┐ ┌──────────────────────┐
│ Django (Manage) │ │ Gateway (Behind FW) │
└─────────────────────┘ └──────────────────────┘
│ │
│ (1) Send Worklist Actions │
│ ────────────────────────────────> │
│ Connection: name-of-your-choice-relay-test-hc │
│ Django: SENDER │
│ Gateway: LISTENER (relay-listener) │
│ │
```


## Firewall Compatibility

Connection works through firewalls because:

- All communication uses **outbound HTTPS (port 443)**
- "Listening" means maintaining a persistent outbound WebSocket connection
- Azure Relay pushes messages down existing connections
- No inbound ports required on the gateway

## Setup Instructions

### 1. Create Azure Relay Resources

In Azure Portal:

1. Create an Azure Relay namespace (if not exists):
- Name: `manbrs-gateway-dev`
- Region: UK South

2. Create Hybrid Connection:
- `name-of-your-choice-relay-test-hc` (for worklist actions)

3. Get the Shared Access Policy:
- Policy Name: `RootManageSharedAccessKey` (default)
- Copy the Primary Key

### 2. Copy environment variables from .env.development to .env

#### Gateway (.env or .env.development)

```bash
AZURE_RELAY_NAMESPACE=manbrs-gateway-dev.servicebus.windows.net
AZURE_RELAY_HYBRID_CONNECTION=name-of-your-choice-relay-test-hc
AZURE_RELAY_KEY_NAME=RootManageSharedAccessKey
AZURE_RELAY_SHARED_ACCESS_KEY=your_actual_key_here
```

#### Django Manage (.env)

```bash
AZURE_RELAY_NAMESPACE=manbrs-gateway-dev.servicebus.windows.net
AZURE_RELAY_HYBRID_CONNECTION=name-of-your-choice-relay-test-hc
AZURE_RELAY_KEY_NAME=RootManageSharedAccessKey
AZURE_RELAY_SHARED_ACCESS_KEY=your_actual_key_here
```

### 3. Start the Gateway Services


```bash
docker compose up --build
```


## Message Flows

### Worklist Creation (Django → Gateway)

1. User clicks "Send to Modality" in clinic UI
2. Django creates `GatewayAction` with payload
3. Manage Breast Screening service sends via relay (as sender) to `name-of-your-choice-relay-test-hc`
4. Gateway `src.relay_listener.py` receives (as listener)
5. Gateway creates worklist item in Modality Worklist server storage via `CreateWorklistItem` service class.
6. Gateway sends success/failure response back.


## Testing

1. Start both gateway and Manage/Django services
2. Open clinic UI and send appointment to modality
3. Monitor logs to trace message flow
4. Verify worklist item created in Modality Emulator
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dependencies = [
"pylibjpeg-libjpeg>=2.0.0",
"pylibjpeg-openjpeg>=2.2.0",
"pillow>=11.0.0",
"websockets==15.0.1",
]

[dependency-groups]
Expand All @@ -22,6 +23,7 @@ dev = [
"ruff>=0.14.1,<0.15",
"pyright>=1.1.390",
"ipdb>=0.13.13,<0.14",
"pytest-asyncio>=1.3.0,<1.4.0",
]

[tool.uv]
Expand Down
130 changes: 130 additions & 0 deletions src/relay_listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
"""
RelayListener
Receives worklist actions from manage-screening.
Supports creation of Modality Worklist Items.
"""

import asyncio
import base64
import hashlib
import hmac
import json
import logging
import os
import time
import urllib.parse

from websockets.asyncio.client import connect

from services.mwl.create_worklist_item import CreateWorklistItem
from services.storage import MWLStorage

logger = logging.getLogger(__name__)

DB_PATH = os.getenv("MWL_DB_PATH", "/var/lib/pacs/worklist.db")
SAS_TOKEN_EXPIRY_SECONDS = 3600

ACTIONS = {
"worklist.create_item": CreateWorklistItem,
}


class RelayListener:
"""Socket Listener for Azure Relay."""

def __init__(self, storage: MWLStorage):
self.storage = storage
self.relay_uri = RelayURI()

async def listen(self):
"""Listen for messages from Azure Relay."""

logger.info(f"Connecting to Azure Relay: {self.relay_uri.hybrid_connection_name}...")

async with connect(self.relay_uri.connection_url(), compression=None) as websocket:
logger.info("Connected - waiting for worklist actions...")

async for message in websocket:
try:
data = json.loads(message)

if "accept" in data:
accept_url = data["accept"]["address"]
logger.info("Incoming connection...")

async with connect(accept_url, compression=None) as client_ws:
client_message = await asyncio.wait_for(client_ws.recv(), timeout=30)
payload = json.loads(client_message)
response = self.process_action(payload)

# Send acknowledgment
await client_ws.send(json.dumps(response))

except asyncio.TimeoutError:
logger.error("Timeout waiting for message")
except Exception as e:
logger.error(f"Error: {e}")

def process_action(self, payload: dict):
"""Process incoming action payload."""
action_name = payload.get("action_type", "no-op")

action_class = ACTIONS.get(action_name)
if not action_class:
raise ValueError(f"Unknown action: {action_name}")

return action_class(self.storage).call(payload)


class RelayURI:
def __init__(self):
self.relay_namespace = os.getenv("AZURE_RELAY_NAMESPACE", "relay-test.servicebus.windows.net")
self.hybrid_connection_name = os.getenv("AZURE_RELAY_HYBRID_CONNECTION", "relay-test-hc")
self.key_name = os.getenv("AZURE_RELAY_KEY_NAME", "RootManageSharedAccessKey")
self.shared_access_key = os.getenv("AZURE_RELAY_SHARED_ACCESS_KEY", "")

def create_sas_token(self, expiry_seconds: int = SAS_TOKEN_EXPIRY_SECONDS) -> str:
"""Create SAS token for Azure Relay authentication."""
uri = f"http://{self.relay_namespace}/{self.hybrid_connection_name}"
encoded_uri = urllib.parse.quote_plus(uri)
expiry = str(int(time.time() + expiry_seconds))
signature = base64.b64encode(
hmac.new(self.shared_access_key.encode(), f"{encoded_uri}\n{expiry}".encode(), hashlib.sha256).digest()
)
return (
f"SharedAccessSignature sr={encoded_uri}"
f"&sig={urllib.parse.quote_plus(signature)}"
f"&se={expiry}&skn={self.key_name}"
)

def connection_url(self) -> str:
token = self.create_sas_token()
return (
f"wss://{self.relay_namespace}/$hc/{self.hybrid_connection_name}"
f"?sb-hc-action=listen&sb-hc-token={urllib.parse.quote_plus(token)}"
)


async def main():
logging.basicConfig(
level=os.getenv("LOG_LEVEL", "INFO").upper(),
format=os.getenv("LOG_FORMAT", "%(asctime)s - %(name)s - %(levelname)s - %(message)s"),
)

logger.info("Socket Listener Starting...")
storage = MWLStorage(db_path=DB_PATH)

while True:
try:
await RelayListener(storage).listen()
except KeyboardInterrupt:
logger.warning("\nShutting down...")
break
except Exception as e:
logger.warning(f"Connection error: {e}")
logger.warning("Retrying in 5 seconds...")
await asyncio.sleep(5)


if __name__ == "__main__":
asyncio.run(main())
43 changes: 43 additions & 0 deletions src/services/mwl/create_worklist_item.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import logging

from services.storage import MWLStorage, WorklistItem

logger = logging.getLogger(__name__)


class CreateWorklistItem:
def __init__(self, storage: MWLStorage):
self.storage = storage

def call(self, payload: dict):
try:
action_id = payload.get("action_id")
if not action_id:
raise ValueError("Missing action_id in payload")

params = payload.get("parameters", {})

item = params.get("worklist_item", {})
participant = item.get("participant", {})
scheduled = item.get("scheduled", {})
procedure = item.get("procedure", {})

self.storage.store_worklist_item(
WorklistItem(
accession_number=item.get("accession_number"),
patient_id=participant.get("nhs_number"),
patient_name=participant.get("name"),
patient_birth_date=participant.get("birth_date"),
patient_sex=participant.get("sex", ""),
scheduled_date=scheduled.get("date"),
scheduled_time=scheduled.get("time"),
modality=procedure.get("modality"),
study_description=procedure.get("study_description", ""),
source_message_id=action_id,
)
)
logger.info(f"Created worklist item: {item.get('accession_number')}")
return {"status": "created", "action_id": action_id}
except Exception as e:
logger.error(f"Failed to create worklist item: {e}")
return {"status": "error", "action_id": action_id, "error": str(e)}
Loading