Skip to content

WebSocket source reconnect failure doesn't cause process termination #23992

@noamrcon

Description

@noamrcon

A note for the community

  • Please vote on this issue by adding a 👍 reaction to the original issue to help the community and maintainers prioritize this request
  • If you are interested in working on this issue or have submitted a pull request, please leave a comment

Problem

Using the websocket source, if the connection to the server is down for more than connect_timeout_secs
the connection is dropped, doesn't reconnect, and the process keeps running.
From the debug logs we get Source pump finished normally which seems off.

This only happens when:

  • The connection is at first okay - if the connection on startup fails the process is terminated
  • There is more than one source (like internal_logs) - if there it is the only source the process is terminated

Expected behavior:

  • Stop the process
  • Try to reconnect again and again (forever)

Configuration

sinks:
  out:
    inputs:
      - source
    type: console
    encoding:
      codec: text
sources:
  internal_logs:
    type: internal_logs
  source:
    connect_timeout_secs: 3
    type: websocket
    uri: ws://localhost:8765

Version

vector 0.50.0 (aarch64-apple-darwin 9053198 2025-09-23 14:18:50.944442940)

Debug Output

2025-10-12T11:33:25.734237Z DEBUG vector::app: Internal log rate limit configured. internal_log_rate_secs=10
2025-10-12T11:33:25.735179Z  INFO vector::app: Log level is enabled. level="debug"
2025-10-12T11:33:25.735219Z DEBUG vector::app: messaged="Building runtime." worker_threads=12
2025-10-12T11:33:25.735797Z  INFO vector::app: Loading configs. paths=["vector.yaml"]
2025-10-12T11:33:25.736561Z DEBUG vector::config::loading: No secret placeholder found, skipping secret resolution.
2025-10-12T11:33:25.737109Z  WARN vector::config::loading: Source "internal_logs" has no consumers
2025-10-12T11:33:25.737173Z DEBUG vector::topology::builder: Building new source. component=internal_logs
2025-10-12T11:33:25.737374Z DEBUG vector::topology::builder: Building new source. component=source
2025-10-12T11:33:25.737567Z DEBUG vector::topology::builder: Building new sink. component=out
2025-10-12T11:33:25.937697Z  INFO vector::topology::running: Running healthchecks.
2025-10-12T11:33:25.937724Z DEBUG vector::topology::running: Connecting changed/added component(s).
2025-10-12T11:33:25.937744Z DEBUG vector::topology::running: Configuring outputs for source. component=source
2025-10-12T11:33:25.937760Z DEBUG vector::topology::running: Configuring output for component. component=source output_id=None
2025-10-12T11:33:25.937766Z DEBUG vector::topology::running: Configuring outputs for source. component=internal_logs
2025-10-12T11:33:25.937771Z DEBUG vector::topology::running: Configuring output for component. component=internal_logs output_id=None
2025-10-12T11:33:25.937775Z DEBUG vector::topology::running: Connecting inputs for sink. component=out
2025-10-12T11:33:25.937788Z DEBUG vector::topology::running: Adding component input to fanout. component=out fanout_id=source
2025-10-12T11:33:25.937788Z  INFO vector::topology::builder: Healthcheck passed.
2025-10-12T11:33:25.937808Z DEBUG vector::topology::running: Spawning new source. key=source
2025-10-12T11:33:25.937840Z DEBUG vector::topology::running: Spawning new source. key=internal_logs
2025-10-12T11:33:25.937857Z DEBUG source{component_kind="source" component_id=source component_type=websocket}: vector::topology::builder: Source pump supervisor starting.
2025-10-12T11:33:25.937868Z  INFO vector: Vector has started. debug="false" version="0.50.0" arch="aarch64" revision="9053198 2025-09-23 14:18:50.944442940"
2025-10-12T11:33:25.937874Z DEBUG source{component_kind="source" component_id=source component_type=websocket}: vector::topology::builder: Source pump starting.
2025-10-12T11:33:25.937887Z DEBUG sink{component_kind="sink" component_id=out component_type=console}: vector::topology::builder: Sink starting.
2025-10-12T11:33:25.937877Z  INFO vector::app: API is disabled, enable by setting `api.enabled` to `true` and use commands like `vector top`.
2025-10-12T11:33:25.937920Z DEBUG source{component_kind="source" component_id=source component_type=websocket}: vector::topology::builder: Source starting.
2025-10-12T11:33:25.937939Z DEBUG source{component_kind="source" component_id=internal_logs component_type=internal_logs}: vector::topology::builder: Source pump supervisor starting.
2025-10-12T11:33:25.937953Z DEBUG source{component_kind="source" component_id=internal_logs component_type=internal_logs}: vector::topology::builder: Source pump starting.
2025-10-12T11:33:25.938603Z DEBUG vector::utilization: utilization=1
2025-10-12T11:33:25.938607Z DEBUG source{component_kind="source" component_id=internal_logs component_type=internal_logs}: vector::topology::builder: Source starting.
2025-10-12T11:33:25.941856Z DEBUG source{component_kind="source" component_id=source component_type=websocket}: tungstenite::handshake::client: Client handshake done.
2025-10-12T11:33:25.941865Z DEBUG source{component_kind="source" component_id=source component_type=websocket}: vector::internal_events::websocket: Connected.
2025-10-12T11:33:25.941872Z DEBUG source{component_kind="source" component_id=source component_type=websocket}: vector::internal_events::websocket: Connected.
message
2025-10-12T11:33:29.512843Z DEBUG source{component_kind="source" component_id=source component_type=websocket}: tungstenite::protocol: Received close frame: Some(CloseFrame { code: Away, reason: "" })
2025-10-12T11:33:29.513071Z DEBUG source{component_kind="source" component_id=source component_type=websocket}: tungstenite::protocol: Replying to close with Frame { header: FrameHeader { is_final: true, rsv1: false, rsv2: false, rsv3: false, opcode: Control(Close), mask: None }, payload: [3, 233] }
2025-10-12T11:33:29.513254Z ERROR source{component_kind="source" component_id=source component_type=websocket}: vector::internal_events::websocket: Error receiving message from websocket. error=IO error: Connection closed by server with code '1001' and reason: '' error_code="websocket_receive_error" error_type="connection_failed" stage="processing" internal_log_rate_limit=true
2025-10-12T11:33:29.514595Z  WARN source{component_kind="source" component_id=source component_type=websocket}: vector::sources::websocket::source: Connection closed by server. code=1001 reason= internal_log_rate_limit=true
2025-10-12T11:33:29.514678Z  WARN source{component_kind="source" component_id=source component_type=websocket}: vector::internal_events::websocket: Closed by the server.
2025-10-12T11:33:29.514731Z  INFO source{component_kind="source" component_id=source component_type=websocket}: vector::sources::websocket::source: Reconnecting to WebSocket... internal_log_rate_limit=true
2025-10-12T11:33:29.515416Z ERROR source{component_kind="source" component_id=source component_type=websocket}: vector::internal_events::websocket: WebSocket connection failed. error=Connect error: Connection refused (os error 61) error_code="websocket_connection_error" error_type="connection_failed" stage="sending" internal_log_rate_limit=true
2025-10-12T11:33:30.017676Z ERROR source{component_kind="source" component_id=source component_type=websocket}: vector::internal_events::websocket: Internal log [WebSocket connection failed.] is being suppressed to avoid flooding.
2025-10-12T11:33:30.738980Z DEBUG vector::utilization: utilization=0.10008874059589987
2025-10-12T11:33:32.517099Z ERROR source{component_kind="source" component_id=source component_type=websocket}: vector::internal_events::websocket: WebSocket connection error. error=IO error: Connection attempt timed out error_code="websocket_connection_error" error_type="writer_failed" stage="sending" internal_log_rate_limit=true
2025-10-12T11:33:32.517477Z DEBUG source{component_kind="source" component_id=source component_type=websocket}: vector::topology::builder: Source finished normally.
2025-10-12T11:33:32.517492Z DEBUG source{component_kind="source" component_id=source component_type=websocket}: vector::topology::builder: Source pump finished normally.
2025-10-12T11:33:32.517722Z DEBUG source{component_kind="source" component_id=source component_type=websocket}: vector::topology::builder: Source pump supervisor task finished normally.
2025-10-12T11:33:35.738917Z DEBUG vector::utilization: utilization=0.010008874059589985
2025-10-12T11:33:39.334003Z  INFO vector::signal: Signal received. signal="SIGINT"
2025-10-12T11:33:39.334542Z  INFO vector: Vector has stopped.
2025-10-12T11:33:39.334761Z DEBUG source{component_kind="source" component_id=internal_logs component_type=internal_logs}: vector::topology::builder: Source finished normally.
2025-10-12T11:33:39.334865Z DEBUG sink{component_kind="sink" component_id=out component_type=console}: vector::utilization: Couldn't send utilization start wait message. component_id=ComponentKey { id: "out" } error="Closed(..)"
2025-10-12T11:33:39.334999Z DEBUG source{component_kind="source" component_id=internal_logs component_type=internal_logs}: vector::topology::builder: Source pump finished normally.
2025-10-12T11:33:39.335041Z DEBUG sink{component_kind="sink" component_id=out component_type=console}: vector::utilization: Couldn't send utilization stop wait message. component_id=ComponentKey { id: "out" } error="Closed(..)"
2025-10-12T11:33:39.335134Z DEBUG source{component_kind="source" component_id=internal_logs component_type=internal_logs}: vector::topology::builder: Source pump supervisor task finished normally.
2025-10-12T11:33:39.335213Z DEBUG sink{component_kind="sink" component_id=out component_type=console}: vector::topology::builder: Sink finished normally.

Example Data

No response

Additional Context

I used a simple python websocket server to connect to
so the flow is:

  1. Start the server
  2. Start vector
  3. Close server
import asyncio
import websockets

async def handler(websocket):
    message = "message"
    await websocket.send(message)
    await websocket.wait_closed()

async def main():
    async with websockets.serve(handler, "localhost", 8765):
        print("WebSocket server running on ws://localhost:8765")
        await asyncio.Future()  # run forever

asyncio.run(main())

References

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    source: websocketAnything `websocket` source relatedtype: bugA code related bug.

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions