This is a minimal, production‑minded lab to exercise an MQTT v3.1 mobile client under network impairments. It includes:
- EMQX 5.8 cluster (3 nodes) behind an HAProxy MQTT gateway.
- Toxiproxy between the Java client and the gateway (for impairments).
- A Go backend that consumes driver locations and publishes offers/rides.
- A Java client (Paho MqttAsyncClient) that publishes driver locations and consumes offers/rides.
- Rich, structured logs and basic profiling endpoints (pprof for Go, JFR/thread dump for Java).
All app images use Debian slim bases. No Prometheus, no Grafana, no Streamlit UI.
- Quickstart
- Architecture
- Configuration
- Toxiproxy Usage (Impairments)
- Network Impairments (tc NetEm)
- Latency Charts (Per Topic)
- Profiling
- MQTT Gateway & EMQX
- MQTT Keepalive & Reconnect
- Dual Connections (Pub/Sub)
- Troubleshooting Container (netshoot)
- Operational Notes
- Message Payload Format
- Tests
cd simple-mqtt-network-lab
# Build and run
docker compose up --buildServices:
- HAProxy MQTT gateway:
mqtt-gateway:1883(host:localhost:1883) - EMQX dashboard: http://localhost:18083 (admin/public)
- Toxiproxy API: http://localhost:8474
- Proxy preconfigured via
toxiproxy/config.json(listen0.0.0.0:18830→ upstreammqtt-gateway:1883).
- Proxy preconfigured via
- Go backend profiling: http://localhost:6060/debug/pprof
- Java client profiling: http://localhost:6061 (see endpoints below)
- Network troubleshooting helper: container
network-troubleshootingshares toxiproxy's netns (tools: tcpdump, mtr, dig, curl, tc, etc.).
Logs are printed to stdout for each service. Stop with Ctrl+C; Compose triggers graceful shutdown for the apps.
java-client ──tcp──> toxiproxy:18830 ──tcp──> mqtt-gateway:1883 ──> emqx[1..3]
| publish /driver/location (configurable)
| subscribe /driver/offer, /driver/ride
backend ───────────────tcp──────────────> mqtt-gateway:1883 ──> emqx[1..3]
| subscribe /driver/location
| publish /driver/offer (every Y ms), /driver/ride (every Z ms)
/driver/location(Java → Backend)/driver/offer(Backend → Java)/driver/ride(Backend → Java)
Edit the YAML files under configs/ (structured, human‑readable). Defaults are sensible and focus on reconnect robustness and observability.
configs/backend.yamlcontrols the Go backend (publish rates, keepalive, retry, QoS, payload sizes, socket/buffer/inflight, debug).configs/client.yamlcontrols the Java client (publish rate, keepalive, retry, QoS, payload sizes, socket/buffer/inflight, debug).
Both apps hot‑reload only on restart (simple by design). Example snippets (full files provided):
# backend.yaml
mqtt:
host: mqtt-gateway
port: 1883
client_id: backend-1
keepalive_secs: 30
protocol_version: 3 # MQTT 3.1
clean_session: true # default
retry:
enabled: true
connect_timeout_ms: 5000
max_reconnect_interval_ms: 10000
ping_timeout_ms: 5000
write_timeout_ms: 5000
## App-level ping/pong removed. Built-in MQTT keepalive (PINGREQ/PINGRESP) is used.
publish:
offer_every_ms: 1000
ride_every_ms: 2000
qos:
location: 0
offer: 0
ride: 0
payload_bytes:
offer: 4096
ride: 4096
socket:
tcp_keepalive_secs: 60
tcp_nodelay: true
read_buffer: 262144
write_buffer: 262144
buffer_inflight:
max_inflight: 64
buffer_enabled: true
buffer_size: 1000
drop_oldest: true
persist: false
log:
debug: false# client.yaml
mqtt:
host: toxiproxy
port: 18830 # Toxiproxy → HAProxy:1883
client_id: java-1
keepalive_secs: 30
protocol_version: 3 # MQTT 3.1
clean_session: true # default
# Optional: use two separate MQTT connections
# - publisher: publishes /driver/location with client_id "java-1-pub"
# - subscriber: consumes /driver/offer and /driver/ride with client_id "java-1-sub"
separate_pubsub_connections: false
retry:
enabled: true
automatic_reconnect: true
connect_timeout_ms: 5000
max_reconnect_delay_ms: 10000
qos:
location: 0
offer: 0
ride: 0
payload_bytes:
location: 4096
socket:
tcp_keepalive: true
tcp_nodelay: true
receive_buffer: 262144
send_buffer: 262144
buffer_inflight:
max_inflight: 64
buffer_enabled: true
buffer_size: 1000
drop_oldest: true
persist: false
log:
debug: false
publish:
location_every_ms: 1000To compare packet loss and TCP congestion behavior with one vs two MQTT connections, the Java client can split publishing and subscribing over separate connections.
- Enable in
configs/client.yamlundermqtt.separate_pubsub_connections: true. - Client IDs derive automatically:
<client_id>-pubfor publishes to/driver/location,<client_id>-subfor subscriptions to/driver/offerand/driver/ride. - Logs include
connected_pub=andconnected_sub=in the periodic[stats]line to see each connection’s state. Per-message logs and the[publish]lines remain unchanged for reporting.
Suggested experiment:
- Single connection: set
separate_pubsub_connections: false, run a packet loss scenario (e.g.,bash scripts/netem.sh loss 5) and capture withbash scripts/latency-report.sh --pre 5 --post 20 -- bash scripts/netem.sh loss 5. - Dual connections: switch to
true, rebuilddocker compose up --build, repeat the same impairment and capture. - Compare delivered ratios and latency distributions for
/driver/locationvs/driver/offerand/driver/rideacross the two runs.
The Java client connects to Toxiproxy (localhost:18830), which forwards to the gateway. The proxy is created at boot from toxiproxy/config.json.
Helper script (recommended):
- Control the proxy with
bash scripts/mqtt-proxy.sh:- Down (hard drop):
bash scripts/mqtt-proxy.sh down - Up (restore):
bash scripts/mqtt-proxy.sh up - Timeout 5s:
bash scripts/mqtt-proxy.sh timeout 5000 - Half‑open (client view, block server→client):
bash scripts/mqtt-proxy.sh halfdown - Half‑open (server view, block client→server):
bash scripts/mqtt-proxy.sh halfup - Blackhole both ways (no FIN/RST):
bash scripts/mqtt-proxy.sh blackhole(orblackhole 600000for 10m) - Latency and jitter:
bash scripts/mqtt-proxy.sh latency 120 40 [down|up|both](default jitter=0, both directions) - Clear latency:
bash scripts/mqtt-proxy.sh unlatency - Bandwidth limit:
bash scripts/mqtt-proxy.sh bandwidth 256kbps [down|up|both](usebps|kbps|mbpsor bytes/s) - Clear bandwidth:
bash scripts/mqtt-proxy.sh unbandwidth - Approx packet loss:
bash scripts/mqtt-proxy.sh packetloss 20 [down|up|both](uses slicer; not real per‑packet drop) - Clear packet loss:
bash scripts/mqtt-proxy.sh unpacketloss - Status:
bash scripts/mqtt-proxy.sh status - Env: set
TOXIPROXY_URLif nothttp://localhost:8474(default).
- Down (hard drop):
Direct API examples:
- Inspect proxies
curl -s http://localhost:8474/proxies | jq .- Create the MQTT proxy (compose already does this via
toxiproxy/config.json):
curl -s -X POST http://localhost:8474/proxies \
-H 'Content-Type: application/json' \
-d '{"name":"mqtt","listen":"0.0.0.0:18830","upstream":"mqtt-gateway:1883"}'- Simulate full drop (reset connections instantly)
curl -s -X POST http://localhost:8474/proxies/mqtt/toxics \
-H 'Content-Type: application/json' \
-d '{"name":"drop","type":"reset_peer","stream":"downstream","toxicity":1.0}'- Remove the drop toxic
curl -s -X DELETE http://localhost:8474/proxies/mqtt/toxics/drop- Pause traffic for 5s (timeout toxic)
curl -s -X POST http://localhost:8474/proxies/mqtt/toxics \
-H 'Content-Type: application/json' \
-d '{"name":"timeout5s","type":"timeout","stream":"downstream","attributes":{"timeout":5000}}'- Remove the timeout toxic
curl -s -X DELETE http://localhost:8474/proxies/mqtt/toxics/timeout5sNote: Toxiproxy 2.5 treats enabled as read-only via the REST API; use toxics (above) or delete/recreate the proxy instead. For OS‑level packet impairments, see the dedicated section: Network Impairments (tc NetEm).
halfdownadds a downstreamlimit_datatoxic withbytes=0, effectively blackholing server→client. The client keeps sending (e.g., PINGREQ, publishes) but never receives responses (PINGRESP, PUBACK). No FIN/RST is sent; the server still sees client traffic until keepalive/app timeouts.halfupadds an upstreamlimit_datatoxic withbytes=0, blackholing client→server. The socket stays open but the server won’t see client packets.- Use
upto removehalfdown,halfup,timeout, anddowntoxics.
blackhole [ms]adds a downstreamtimeout_downand upstreamtimeout_uptoxic. With a large timeout (default ~1 year), both directions are blocked without FIN/RST so both ends think the connection is alive until their keepalive or app timeouts fire.- Use
upto removetimeout_down/timeout_up.
For true per‑packet loss/jitter/latency at the OS level, use the NetEm helper. It applies tc netem in the Toxiproxy network namespace so all proxied MQTT traffic is affected.
- Helper script:
bash scripts/netem.sh - Defaults:
TARGET=toxiproxy,IFACE=eth0, and it will exec into thenetwork-troubleshootingcontainer if present; otherwise it starts a short‑lived helper.
Examples:
- Show current qdisc:
bash scripts/netem.sh status - 120ms delay with 40ms jitter:
bash scripts/netem.sh delay 120 40 - 5% packet loss:
bash scripts/netem.sh loss 5 - Combine delay+loss:
bash scripts/netem.sh shape 120 20 2 10 - Egress bandwidth limit:
bash scripts/netem.sh rate 512kbps(acceptskbps|mbpsorkbit|mbit) - Combine all (with bandwidth):
bash scripts/netem.sh shape 120 20 2 10 1mbps - Clear NetEm:
bash scripts/netem.sh clear
Notes:
- This is real packet impairment below TCP, unlike Toxiproxy's slicer toxic which only fragments streams.
- Requires NET_ADMIN capability; the
network-troubleshootingcontainer has it by default. - Bandwidth limiting uses a TBF child qdisc under the
netemroot and shapes egress on the target interface. It typically affects both directions of proxied flows since traffic in each direction egresses that interface. - You can override TBF tuning via env:
TBF_BURST(default32kbit) andTBF_LATENCY(default400ms).
Both apps embed ts=<unix_ms>|seq=<n>| at the start of payloads. Receivers log per‑message latency as latency_ms = recv_ts_ms − pub_ts_ms with sequence. Use the helper to capture a time window and generate CSVs + a PNG‑based HTML report.
Requirements:
- Python 3
- gnuplot (for charts)
- macOS:
brew install gnuplot - Ubuntu/Debian:
sudo apt-get update && sudo apt-get install -y gnuplot - Fedora:
sudo dnf install -y gnuplot - CentOS/RHEL:
sudo yum install -y gnuplot - Arch:
sudo pacman -S gnuplot - Alpine:
sudo apk add gnuplot
- macOS:
Helper script (reporting):
bash scripts/latency-report.sh [--pre N] [--post N] [--] [command ...]
Examples:
- Capture 5s before and 10s after a netem change:
bash scripts/latency-report.sh --pre 5 --post 10 -- bash scripts/netem.sh shape 120 20 2 10 1mbps - Capture around a toxiproxy latency change:
bash scripts/latency-report.sh --pre 5 --post 10 -- bash scripts/mqtt-proxy.sh latency 120 40 both
Outputs (under captures/latency-<ts>/):
latency_offer.csv,latency_ride.csv,latency_location.csvwith columns:seq,latency_ms,pub_ts_ms,recv_ts_ms.- Missing publishes (not received within window):
latency_offer_missing.csv,latency_ride_missing.csv,latency_location_missing.csv(seq, pub_ts_ms). - Per-second delivery rate CSV per topic:
rate_<topic>.csvwith columnssecond_unix,published,received,delivered_ratio. - Summary files:
summary.jsonandsummary.txtwith totals, delivered ratio, and latency stats (min/mean/p50/p95/p99/max) per topic. - HTML report:
index.htmlsummarizes stats and embeds generated PNG charts; requiresgnuplotto render charts.
Notes:
- The script uses
docker logs --since/--untilto bound the time window. Adjust--pre/--postto include exactly the period you care about. - Open
index.htmlto see: latency line charts, missing markers (red), publish vs receive rates, and delivered ratios. Click any image to expand it (full‑screen lightbox). If charts are missing, installgnuplotand rerun the report.
What the charts mean:
- Latency vs Seq: per‑message latency for received messages; x‑axis is published sequence.
- Latency + Missing: same latency line plus red markers at y=0 for publishes with no receive inside the window.
- Published vs Received per Second: published counts grouped by publish second; received counts by receive second (time on x‑axis).
- Delivered Ratio per Pub‑Second: for each publish second, delivered/published ∈ [0,1].
Screenshot
-
Go backend
- Endpoint:
http://localhost:6060/debug/pprof - CPU profile 30s:
go tool pprof http://localhost:6060/debug/pprof/profile?seconds=30 - Heap:
curl -s http://localhost:6060/debug/pprof/heap > heap.pb.gz
- Endpoint:
-
Java client
- Base URL:
http://localhost:6061 - Health:
GET /healthz - Thread dump:
GET /profiling/threads(text/plain) - Start JFR (60s):
POST /profiling/jfr/start?name=run1&durationSec=60 - Stop JFR:
POST /profiling/jfr/stop?name=run1(returns path inside container)
- Base URL:
Note: JFR requires a JDK (we run on OpenJDK 17 slim). Retrieve the recorded JFR with docker cp if needed.
- HAProxy runs with TCP logging enabled and acts as a front door to EMQX cluster via round‑robin.
- EMQX 5.8 cluster (3 nodes) is formed with static seeds. Dashboard is exposed on http://localhost:18083 (admin/public).
- Purpose: Detect half‑open TCP connections without application pings.
- Mechanism: Client must send any MQTT control packet within the negotiated keepalive interval; if idle, it sends PINGREQ. The broker replies with PINGRESP.
- Broker side timeout: Most brokers (including EMQX) consider the connection dead after roughly 1.5 × keepalive with no incoming control packets.
- Client side timeout:
- Java: handled internally by Paho; when PINGRESP is not received,
connectionLost()fires and auto‑reconnect kicks in if enabled. - Go: configured via
retry.ping_timeout_ms(default 5000 ms). If a PINGRESP is not received within this window, the client treats the connection as lost.
- Java: handled internally by Paho; when PINGRESP is not received,
Both apps use MQTT keepalive only (no app‑level ping/pong). Enable debug logs to see PINGREQ/PINGRESP traces.
- Java client (Paho MqttAsyncClient):
- Auto‑reconnect enabled via
retry.automatic_reconnect: true. - Exponential backoff doubles per attempt and caps at
maxReconnectDelay(Paho default ≈ 128 s if not set). - Connect timeout controls how long each handshake may take (
retry.connect_timeout_ms). - Note: Paho Java’s
setMaxReconnectDelay(..)expects seconds. Our YAML keyretry.max_reconnect_delay_msis milliseconds; the example below uses seconds for readability.
- Auto‑reconnect enabled via
- Go backend (paho.mqtt.golang):
- Auto‑reconnect enabled via
retry.enabled: true. - Exponential backoff with a cap at
retry.max_reconnect_interval_ms. retry.connect_timeout_mslimits each connect attempt’s handshake;retry.ping_timeout_msbounds how long to wait for PINGRESP.
- Auto‑reconnect enabled via
Config:
keepalive = 120 s → connection considered lost after ~180 s (1.5 × KA)
connect timeout = 30 s
maxReconnectDelay = default ≈ 128 s (2 min)
Backoff pattern: 1 → 2 → 4 → 8 → 16 → 32 → 64 → 128 → 128 …
⏱ Event Timeline (network down → up to 5 min)
t = 0 s | Normal operation
t = 180 s | No packets for 1.5×KeepAlive → connectionLost() triggered
| Auto-reconnect loop starts
──────────────────────────────────────────────────────────────────────
Attempt #1 | delay=0 s connect timeout=30 s (180–210 s)
Attempt #2 | delay=1 s connect timeout=30 s (211–241 s)
Attempt #3 | delay=2 s connect timeout=30 s (243–273 s)
Attempt #4 | delay=4 s connect timeout=30 s (277–307 s)
Attempt #5 | delay=8 s connect timeout=30 s (315–345 s)
Attempt #6 | delay=16 s connect timeout=30 s (361–391 s)
Attempt #7 | delay=32 s connect timeout=30 s (423–453 s)
Attempt #8 | delay=64 s connect timeout=30 s (517–547 s)
Attempt #9+ | delay≈128 s (capped) (beyond ~9 min if still offline)
──────────────────────────────────────────────────────────────────────
Notes:
- With clean sessions enabled (default), subscriptions are re‑issued on reconnect (both apps already do this in their connect handlers).
- With persistent sessions (
clean_session: false), the broker retains subscriptions and queued QoS 1/2 messages; both apps still safely resubscribe. - Under full blackhole conditions (no FIN/RST), detection depends on keepalive; expect
~1×KAto send PINGREQ and up to~1.5×KAfor disconnect at the broker side. Client‑side may trigger earlier ifping_timeout_mselapses (Go) or Paho Java detects missing PINGRESPs.
A persistent nicolaka/netshoot container named network-troubleshooting shares the network namespace with Toxiproxy for deep inspection.
- Start automatically with compose:
docker compose up -d network-troubleshooting(included in the default stack) - Shell:
docker exec -it network-troubleshooting bash - Common tools available: tcpdump, tshark, dig, nslookup, curl, mtr, arping, tc, ss, iproute2.
- Capture MQTT proxy traffic to pcap:
docker exec -it network-troubleshooting tcpdump -i eth0 -n port 18830 -w /tmp/mqtt.pcap
Helper script for capture: bash scripts/capture.sh
- Save MQTT proxy traffic 60s to /tmp/mqtt-proxy.pcap:
bash scripts/capture.sh port 60 mqtt-proxy.pcap - Save custom filter 30s:
bash scripts/capture.sh filter "host mqtt-gateway" 30 gw.pcap - Live sniff (Ctrl+C to stop):
bash scripts/capture.sh live(defaults to both directions) - Live to Wireshark via named pipe:
bash scripts/capture.sh live-wireshark(defaults to both directions and auto-decodes MQTT) - List saved pcaps:
bash scripts/capture.sh list - Copy to host:
bash scripts/capture.sh copy mqtt-proxy.pcap ./captures
Preset filters (aliases):
- List presets:
bash scripts/capture.sh presets - Capture with preset (30s default):
bash scripts/capture.sh preset cp 60 cp.pcap - Live with preset:
bash scripts/capture.sh live-preset both - Live Wireshark with preset:
bash scripts/capture.sh live-wireshark-preset pg
Preset names:
cp(client↔proxy):port 18830pg(proxy↔gateway):host mqtt-gateway and port 1883both(cp or pg):port 18830 or (host mqtt-gateway and port 1883)
Design choices: Toxiproxy toxics simulate stream conditions (latency, bandwidth, half‑open, fragmentation). For realistic packet loss/reordering/corruption, prefer NetEm.
You can stream packets from the netshoot helper into Wireshark running on your host.
Option A — direct pipe (Linux):
docker exec network-troubleshooting \
tcpdump -i eth0 -U -s 0 -w - \
'port 18830 or (host mqtt-gateway and port 1883)' | \
wireshark -k -i -Option B — named pipe (Linux/macOS):
Terminal 1 (producer):
mkfifo /tmp/mqtt.pipe
docker exec network-troubleshooting \
tcpdump -i eth0 -U -s 0 -w - \
'port 18830 or (host mqtt-gateway and port 1883)' > /tmp/mqtt.pipeTerminal 2 (Wireshark):
- Linux:
wireshark -k -i /tmp/mqtt.pipe
- macOS:
open -a Wireshark --args -k -i /tmp/mqtt.pipe
Tips:
- Use display filter
mqttor decode ports as MQTT: Analyze → Decode As… → select TCP port 18830/1883 → MQTT. -U(unbuffered) and-s 0ensure low-latency, full-packet capture.- On Docker Desktop (macOS/Windows), capturing on host interfaces won’t see container traffic; the netshoot approach avoids that by sharing toxiproxy’s network namespace.
Shortcut: use the helper to set up the FIFO and launch Wireshark (defaults to both directions and sets MQTT decode)
bash scripts/capture.sh live-wiresharkNotes:
- The helper creates a FIFO at
/tmp/mqtt.pipe(override withFIFO=/path), starts tcpdump insidenetwork-troubleshooting, and launches Wireshark on the host. - It also hints Wireshark to decode TCP ports 18830 and 1883 as MQTT via
-d tcp.port==18830,mqttand-d tcp.port==1883,mqtt. - Close Wireshark to stop capture; the helper cleans up the background tcpdump and removes the FIFO.
-
Robustness: Both apps use auto‑reconnect, keepalive, inflight/buffers, and structured logs. They log connect/reconnect/disconnect with reasons, subscribe acks, publish results, buffer and inflight counts (every 1s), errors, and shutdown.
-
Graceful shutdown: SIGINT/SIGTERM stops publishers, flushes pending publishes, and disconnects cleanly.
-
Socket tuning: TCP keepalive and buffer sizes are configurable; Java uses a custom SocketFactory; Go uses a custom Dialer and adjusts TCP options.
-
Debug keepalive visibility: When debug is enabled in YAML (
log.debug: true), both apps surface Paho client debug logs. This includes MQTT keepalive traces (PINGREQ/PINGRESP) where supported by the client libraries. -
Liveness logging: Both apps log explicit transitions:
connection dead (lost connectivity)andconnection alive (recovered).
-
Human‑readable prefixes include timestamp and sequence number for ordering. Payloads are padded with
xto reach configured sizes when applicable.- Java
/driver/locationexample prefix:ts=<unix_ms>|seq=<n>|xxxx... - Go publishes
/driver/offerand/driver/ridewith:ts=<unix_ms>|seq=<n>|xxxx...
- Java
- Tweak
configs/*.yamland rebuild:docker compose up --build. - Change QoS, payload sizes, or the publish timers to stress buffering/inflight behavior.
- Use Toxiproxy toxics to simulate handovers/drops and observe logs for backoff, reconnects, and queue dynamics.
- Go unit tests:
cd go-backend && go test -cover ./... - Go integration test: set
TEST_MQTT_BROKER=tcp://localhost:1883then rungo test -run Integration ./...- Use Docker to bring a broker up:
docker compose up -d mqtt-gateway emqx1(or full stack)
- Use Docker to bring a broker up:
- Java unit tests:
cd java-client && ./gradlew test - Java integration test: set
TEST_MQTT_BROKER=tcp://localhost:1883then run./gradlew test- The integration test is automatically skipped if
TEST_MQTT_BROKERis not set
- The integration test is automatically skipped if
Notes:
- Go toolchain is set to
go1.25and the Dockerfile usesgolang:1.25-bookworm. - YAML parsing uses battle‑tested libraries:
gopkg.in/yaml.v3(Go) and SnakeYAML (Java). - Application‑level ping/pong topics were removed; MQTT keepalive interval defaults to 15s and is configurable via YAML.
