-
Couldn't load subscription status.
- Fork 1.9k
Labels
source: websocketAnything `websocket` source relatedAnything `websocket` source relatedtype: bugA code related bug.A code related bug.
Description
A note for the community
- Please vote on this issue by adding a 👍 reaction to the original issue to help the community and maintainers prioritize this request
- If you are interested in working on this issue or have submitted a pull request, please leave a comment
Problem
Using the websocket source, if the connection to the server is down for more than connect_timeout_secs
the connection is dropped, doesn't reconnect, and the process keeps running.
From the debug logs we get Source pump finished normally which seems off.
This only happens when:
- The connection is at first okay - if the connection on startup fails the process is terminated
- There is more than one source (like
internal_logs) - if there it is the only source the process is terminated
Expected behavior:
- Stop the process
- Try to reconnect again and again (forever)
Configuration
sinks:
out:
inputs:
- source
type: console
encoding:
codec: text
sources:
internal_logs:
type: internal_logs
source:
connect_timeout_secs: 3
type: websocket
uri: ws://localhost:8765
Version
vector 0.50.0 (aarch64-apple-darwin 9053198 2025-09-23 14:18:50.944442940)
Debug Output
2025-10-12T11:33:25.734237Z DEBUG vector::app: Internal log rate limit configured. internal_log_rate_secs=10
2025-10-12T11:33:25.735179Z INFO vector::app: Log level is enabled. level="debug"
2025-10-12T11:33:25.735219Z DEBUG vector::app: messaged="Building runtime." worker_threads=12
2025-10-12T11:33:25.735797Z INFO vector::app: Loading configs. paths=["vector.yaml"]
2025-10-12T11:33:25.736561Z DEBUG vector::config::loading: No secret placeholder found, skipping secret resolution.
2025-10-12T11:33:25.737109Z WARN vector::config::loading: Source "internal_logs" has no consumers
2025-10-12T11:33:25.737173Z DEBUG vector::topology::builder: Building new source. component=internal_logs
2025-10-12T11:33:25.737374Z DEBUG vector::topology::builder: Building new source. component=source
2025-10-12T11:33:25.737567Z DEBUG vector::topology::builder: Building new sink. component=out
2025-10-12T11:33:25.937697Z INFO vector::topology::running: Running healthchecks.
2025-10-12T11:33:25.937724Z DEBUG vector::topology::running: Connecting changed/added component(s).
2025-10-12T11:33:25.937744Z DEBUG vector::topology::running: Configuring outputs for source. component=source
2025-10-12T11:33:25.937760Z DEBUG vector::topology::running: Configuring output for component. component=source output_id=None
2025-10-12T11:33:25.937766Z DEBUG vector::topology::running: Configuring outputs for source. component=internal_logs
2025-10-12T11:33:25.937771Z DEBUG vector::topology::running: Configuring output for component. component=internal_logs output_id=None
2025-10-12T11:33:25.937775Z DEBUG vector::topology::running: Connecting inputs for sink. component=out
2025-10-12T11:33:25.937788Z DEBUG vector::topology::running: Adding component input to fanout. component=out fanout_id=source
2025-10-12T11:33:25.937788Z INFO vector::topology::builder: Healthcheck passed.
2025-10-12T11:33:25.937808Z DEBUG vector::topology::running: Spawning new source. key=source
2025-10-12T11:33:25.937840Z DEBUG vector::topology::running: Spawning new source. key=internal_logs
2025-10-12T11:33:25.937857Z DEBUG source{component_kind="source" component_id=source component_type=websocket}: vector::topology::builder: Source pump supervisor starting.
2025-10-12T11:33:25.937868Z INFO vector: Vector has started. debug="false" version="0.50.0" arch="aarch64" revision="9053198 2025-09-23 14:18:50.944442940"
2025-10-12T11:33:25.937874Z DEBUG source{component_kind="source" component_id=source component_type=websocket}: vector::topology::builder: Source pump starting.
2025-10-12T11:33:25.937887Z DEBUG sink{component_kind="sink" component_id=out component_type=console}: vector::topology::builder: Sink starting.
2025-10-12T11:33:25.937877Z INFO vector::app: API is disabled, enable by setting `api.enabled` to `true` and use commands like `vector top`.
2025-10-12T11:33:25.937920Z DEBUG source{component_kind="source" component_id=source component_type=websocket}: vector::topology::builder: Source starting.
2025-10-12T11:33:25.937939Z DEBUG source{component_kind="source" component_id=internal_logs component_type=internal_logs}: vector::topology::builder: Source pump supervisor starting.
2025-10-12T11:33:25.937953Z DEBUG source{component_kind="source" component_id=internal_logs component_type=internal_logs}: vector::topology::builder: Source pump starting.
2025-10-12T11:33:25.938603Z DEBUG vector::utilization: utilization=1
2025-10-12T11:33:25.938607Z DEBUG source{component_kind="source" component_id=internal_logs component_type=internal_logs}: vector::topology::builder: Source starting.
2025-10-12T11:33:25.941856Z DEBUG source{component_kind="source" component_id=source component_type=websocket}: tungstenite::handshake::client: Client handshake done.
2025-10-12T11:33:25.941865Z DEBUG source{component_kind="source" component_id=source component_type=websocket}: vector::internal_events::websocket: Connected.
2025-10-12T11:33:25.941872Z DEBUG source{component_kind="source" component_id=source component_type=websocket}: vector::internal_events::websocket: Connected.
message
2025-10-12T11:33:29.512843Z DEBUG source{component_kind="source" component_id=source component_type=websocket}: tungstenite::protocol: Received close frame: Some(CloseFrame { code: Away, reason: "" })
2025-10-12T11:33:29.513071Z DEBUG source{component_kind="source" component_id=source component_type=websocket}: tungstenite::protocol: Replying to close with Frame { header: FrameHeader { is_final: true, rsv1: false, rsv2: false, rsv3: false, opcode: Control(Close), mask: None }, payload: [3, 233] }
2025-10-12T11:33:29.513254Z ERROR source{component_kind="source" component_id=source component_type=websocket}: vector::internal_events::websocket: Error receiving message from websocket. error=IO error: Connection closed by server with code '1001' and reason: '' error_code="websocket_receive_error" error_type="connection_failed" stage="processing" internal_log_rate_limit=true
2025-10-12T11:33:29.514595Z WARN source{component_kind="source" component_id=source component_type=websocket}: vector::sources::websocket::source: Connection closed by server. code=1001 reason= internal_log_rate_limit=true
2025-10-12T11:33:29.514678Z WARN source{component_kind="source" component_id=source component_type=websocket}: vector::internal_events::websocket: Closed by the server.
2025-10-12T11:33:29.514731Z INFO source{component_kind="source" component_id=source component_type=websocket}: vector::sources::websocket::source: Reconnecting to WebSocket... internal_log_rate_limit=true
2025-10-12T11:33:29.515416Z ERROR source{component_kind="source" component_id=source component_type=websocket}: vector::internal_events::websocket: WebSocket connection failed. error=Connect error: Connection refused (os error 61) error_code="websocket_connection_error" error_type="connection_failed" stage="sending" internal_log_rate_limit=true
2025-10-12T11:33:30.017676Z ERROR source{component_kind="source" component_id=source component_type=websocket}: vector::internal_events::websocket: Internal log [WebSocket connection failed.] is being suppressed to avoid flooding.
2025-10-12T11:33:30.738980Z DEBUG vector::utilization: utilization=0.10008874059589987
2025-10-12T11:33:32.517099Z ERROR source{component_kind="source" component_id=source component_type=websocket}: vector::internal_events::websocket: WebSocket connection error. error=IO error: Connection attempt timed out error_code="websocket_connection_error" error_type="writer_failed" stage="sending" internal_log_rate_limit=true
2025-10-12T11:33:32.517477Z DEBUG source{component_kind="source" component_id=source component_type=websocket}: vector::topology::builder: Source finished normally.
2025-10-12T11:33:32.517492Z DEBUG source{component_kind="source" component_id=source component_type=websocket}: vector::topology::builder: Source pump finished normally.
2025-10-12T11:33:32.517722Z DEBUG source{component_kind="source" component_id=source component_type=websocket}: vector::topology::builder: Source pump supervisor task finished normally.
2025-10-12T11:33:35.738917Z DEBUG vector::utilization: utilization=0.010008874059589985
2025-10-12T11:33:39.334003Z INFO vector::signal: Signal received. signal="SIGINT"
2025-10-12T11:33:39.334542Z INFO vector: Vector has stopped.
2025-10-12T11:33:39.334761Z DEBUG source{component_kind="source" component_id=internal_logs component_type=internal_logs}: vector::topology::builder: Source finished normally.
2025-10-12T11:33:39.334865Z DEBUG sink{component_kind="sink" component_id=out component_type=console}: vector::utilization: Couldn't send utilization start wait message. component_id=ComponentKey { id: "out" } error="Closed(..)"
2025-10-12T11:33:39.334999Z DEBUG source{component_kind="source" component_id=internal_logs component_type=internal_logs}: vector::topology::builder: Source pump finished normally.
2025-10-12T11:33:39.335041Z DEBUG sink{component_kind="sink" component_id=out component_type=console}: vector::utilization: Couldn't send utilization stop wait message. component_id=ComponentKey { id: "out" } error="Closed(..)"
2025-10-12T11:33:39.335134Z DEBUG source{component_kind="source" component_id=internal_logs component_type=internal_logs}: vector::topology::builder: Source pump supervisor task finished normally.
2025-10-12T11:33:39.335213Z DEBUG sink{component_kind="sink" component_id=out component_type=console}: vector::topology::builder: Sink finished normally.
Example Data
No response
Additional Context
I used a simple python websocket server to connect to
so the flow is:
- Start the server
- Start vector
- Close server
import asyncio
import websockets
async def handler(websocket):
message = "message"
await websocket.send(message)
await websocket.wait_closed()
async def main():
async with websockets.serve(handler, "localhost", 8765):
print("WebSocket server running on ws://localhost:8765")
await asyncio.Future() # run forever
asyncio.run(main())References
No response
Metadata
Metadata
Assignees
Labels
source: websocketAnything `websocket` source relatedAnything `websocket` source relatedtype: bugA code related bug.A code related bug.