Skip to content

[Feature][Zeta] STIP-24 Phase 1: Support lightweight EdgeSocket ingress for edge collector MVP#10878

Open
nzw921rx wants to merge 7 commits into
apache:devfrom
nzw921rx:feature/stip24_edge_socket
Open

[Feature][Zeta] STIP-24 Phase 1: Support lightweight EdgeSocket ingress for edge collector MVP#10878
nzw921rx wants to merge 7 commits into
apache:devfrom
nzw921rx:feature/stip24_edge_socket

Conversation

@nzw921rx
Copy link
Copy Markdown
Collaborator

@nzw921rx nzw921rx commented May 12, 2026

Purpose of this pull request

Related: #10666

Phase 2 of STIP-24: lightweight edge collector ingress in Zeta, implemented as the EdgeSocket source for remote collection.

This PR delivers the engine-side MVP data path: edge client connects → token auth → payload deserialize → existing transform/sink pipeline in Zeta.

Out of scope for this phase: edge-side collector runtime (standalone agent), edge sink, and edge-cluster control plane.

Changes:

  • Add EdgeSocket source connector for Zeta (factory, reader, deserialization, option model).
  • Add source options for auth (mode/token), packet mode, compression/encryption, reconnect/retry, and timeouts.
  • Add queue/runtime helpers for ingress payload handling and bounded offer under backpressure.
  • Register plugin mapping seatunnel.source.EdgeSocket.
  • Add E2E module connector-edge-socket-e2e (MySQL 8.4 sink + SQL transform; container bootstrap + JDBC driver injection).
  • Add connector documentation and changelog (docs/en, docs/zh).

Does this PR introduce any user-facing change?

Yes — no impact on existing jobs.

  • New EdgeSocket source for edge ingestion.
  • New source options (authentication, packet mode, connection/retry behavior).
  • New plugin mapping entry seatunnel.source.EdgeSocket.
  • Updated connector docs and changelog (English and Chinese).

How was this patch tested?

  • Unit tests: EdgeSocketFactoryTest (factory and config validation).
  • E2E (EdgeSocketMysql8_4IT): start Zeta job with EdgeSocket source; simulated edge collector sends records; SQL transform; write to MySQL sink and verify rows.

Copy link
Copy Markdown
Contributor

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this. I went through the full diff and rechecked the latest head from the real runtime path, not just the connector classes. The direction makes sense, but I found one main-path blocker in the automatic-discovery contract.

What This PR Fixes

  • User pain: an external edge collector needs a lightweight way to push data into a running EdgeSocket source in Zeta.
  • Fix approach: the PR adds the connector-edge-socket source plus a new engine client/server API, getJobTaskGroupAddresses(jobId), so a collector can discover worker addresses by job.
  • One-line summary: the feature direction is good, but the current “discover ingress by jobId” path still cannot uniquely identify the worker that actually hosts the EdgeSocket source.

Runtime Chain Rechecked

Automatic discovery mode in docs
  -> docs/en/connectors/source/EdgeSocket.md:115-123
      -> omit endpoint
      -> resolve workerHost:port by jobId

Client / server path
  -> JobClient.getJobTaskGroupAddresses(...) [JobClient.java:130-133]
  -> CoordinatorService.queryJobTaskGroupAddresses(...) [CoordinatorService.java:458-463]
  -> JobMaster.queryTaskGroupAddresses() [JobMaster.java:836-846]
      -> returns all task-group worker addresses for the job

Current E2E helper
  -> AbstractEdgeSocketIT.resolveEdgeIngressHostByJobClient() [AbstractEdgeSocketIT.java:320-351]
      -> parse the address array
      -> return the first host it sees

Key Findings

  • The normal path absolutely hits this logic because the docs present jobId-based automatic discovery as a recommended mode.
  • But getJobTaskGroupAddresses(jobId) returns all task-group addresses for the job, not “the EdgeSocket source ingress worker”.
  • The current E2E helper exposes the same gap: it just takes the first host from the returned array, with no source-role filtering at all.
  • So once source / transform / sink task groups are placed on different workers, the collector can connect to a worker that does not host the EdgeSocket ingress listener.

Findings

Issue 1: jobId-based discovery returns the whole job’s task-group address set, not the EdgeSocket ingress target

  • Location: JobClient.java:130-133; CoordinatorService.java:458-463; JobMaster.java:836-846; AbstractEdgeSocketIT.java:320-351
  • Why it matters: the automatic-discovery main path currently treats “all task-group worker addresses for the job” as if that were “the ingress address for the EdgeSocket source”.
  • Risk: real main-path failure. A collector can connect to a transform/sink worker that is not listening for EdgeSocket ingress at all.
  • Suggested fix:
    • Option A: have the engine return only the EdgeSocket source ingress address set.
    • Option B: enrich the returned JSON with enough role metadata for the client to filter the EdgeSocket source task group deterministically.
  • Severity: High

Issue 2: there is still no regression test proving discovery picks the source ingress in a multi-task-group / multi-worker layout

  • Location: AbstractEdgeSocketIT.java:320-351; edge_socket_source_to_mysql84.conf
  • Why it matters: the current tests cover the happy path, but they do not pin the exact contract this PR relies on most.
  • Suggested fix: add a focused discovery regression case where source / transform / sink do not all collapse onto the same worker, and assert that discovery still resolves the actual EdgeSocket source ingress.
  • Severity: Medium

Issue 3: the latest Build check is still pending

  • Location: GitHub check Build
  • Severity: Low

Merge Decision

Conclusion: can merge after fixes

  1. Blocking items
  • Issue 1: make the automatic-discovery contract uniquely identify the EdgeSocket source ingress target.
  1. Suggested follow-up
  • Issue 2: add a regression test for the multi-task-group discovery case.
  • Issue 3: wait for the latest Build to turn green.

Overall, I like the direction, but I do not think Phase 1 is closed yet. As long as the discovery API only returns the whole job’s task-group address set, the automatic-ingress path is still guessing.

@nzw921rx nzw921rx marked this pull request as ready for review May 12, 2026 18:57
Copy link
Copy Markdown
Contributor

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this. I re-reviewed the latest head from scratch against upstream/dev.

What This PR Fixes

  • User pain: an external collector needs a lightweight ingress path into a running Zeta job.
  • Fix approach: this PR adds the EdgeSocket source, plus docs/tests around how a collector is supposed to reach the worker-side socket.
  • One-line summary: the source-side direction is reasonable, but the latest head has already removed the jobId -> workerHost:port discovery implementation while the docs and test story still present that path as supported.

Simple example:

  • Before this PR family, there was no dedicated socket source for a collector to push records into.
  • In the current head, the source can bind 0.0.0.0:<port> on a worker, but if an operator follows the docs and omits endpoint, there is no longer any runtime API that can actually resolve the target worker address from jobId.

Runtime Chain Rechecked

Job submission
  -> EdgeSocketSourceFactory.optionRule() [EdgeSocketSourceFactory.java:42-56]
      -> accepts endpoint / port / auth_token / retry options

Real source runtime
  -> EdgeSocketSourceReader.openServerSocketWithRetry() [EdgeSocketSourceReader.java:199-218]
      -> bind 0.0.0.0:<port>
      -> log endpoint as metadata only

What the docs still advertise
  -> docs/en/connectors/source/EdgeSocket.md:109-123
      -> omit endpoint
      -> resolve workerHost:port by jobId

What the current E2E actually does
  -> AbstractEdgeSocketIT.ensureEdgeSocketForwarderByJobId() [114-116]
      -> directly use EDGE_INGRESS_HOST
      -> no jobId-based lookup anymore

Key Findings

  • The normal main path absolutely hits EdgeSocketSourceReader.openServerSocketWithRetry().
  • The current head no longer exposes any jobId discovery API for ingress resolution.
  • The docs still present automatic discovery as a supported access mode, and the helper method name still suggests that too, but the code path is gone.
  • The optional endpoint is currently only logged by the source; there is no client/API surface consuming it.

Findings

Issue 1: the latest head removed automatic discovery, but the user-facing contract still says it exists

  • Location: docs/en/connectors/source/EdgeSocket.md:109-123, docs/zh/connectors/source/EdgeSocket.md:104-123, AbstractEdgeSocketIT.java:114-116, EdgeSocketSourceReader.java:207-216
  • Why it matters: the real runtime now only binds a local worker port. The docs still tell users that omitting endpoint is fine because workerHost:port can be discovered from jobId, but that implementation path has been deleted on the current head.
  • Risk: main-path user failure. A collector/operator following the published docs still expects a discovery flow that no longer exists.
  • Suggested fix:
    • Option A: explicitly narrow this PR to direct/manual ingress only, and remove the automatic-discovery wording/examples/tests in one pass.
    • Option B: if automatic discovery is still part of the intended contract, add back a real discovery surface instead of leaving it as documentation only.
  • Severity: High

Issue 2: the new tests still do not pin the published access-mode contract, and one unit test uses a fixed sleep for async failure timing

  • Location: EdgeSocketSourceReaderTest.java:95-109, AbstractEdgeSocketIT.java:114-116
  • Why it matters: shouldFailWhenRetryBudgetExhausted() waits with Thread.sleep(300) before asserting an async failure path, which is timing-sensitive on slower CI. At the same time, the E2E helper no longer verifies the access mode that the docs describe most prominently.
  • Risk: medium test-stability / coverage risk. The suite can miss future regressions around the actual published ingress contract.
  • Suggested fix: replace the fixed sleep with an explicit condition wait, and align the tests with the real access mode that this PR intends to support.
  • Severity: Medium

Issue 3: the latest Build check is still not green

  • Location: GitHub check Build
  • Severity: Low

Merge Decision

Conclusion: can merge after fixes

  1. Blocking items
  • Issue 1: make the access-mode contract consistent. Either this PR is direct-connect only, or it needs a real discovery surface again.
  1. Suggested follow-up
  • Issue 2: tighten the async test waiting strategy and align coverage with the real supported ingress mode.
  • Issue 3: rerun the latest Build and merge only from a green head.

Overall, I think the latest head is closer to a workable Phase 1 than the earlier version, because it stops pretending the old discovery API was reliable. But it still is not merge-ready until the user-facing contract is cleaned up end to end.

@nzw921rx nzw921rx marked this pull request as draft May 13, 2026 05:57
@nzw921rx nzw921rx marked this pull request as ready for review May 13, 2026 15:15
@nzw921rx
Copy link
Copy Markdown
Collaborator Author

@davidzollo Can you help with the review? Any suggestions would be greatly appreciated

Copy link
Copy Markdown
Contributor

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for missing this in the last review — that's on me. I re-reviewed the latest head from scratch, and I want to flag one more blocker that I should have caught earlier. This is a carryover from the previous version, not a newly introduced issue.

Thanks for the latest update. The current head does close the earlier docs/runtime mismatch around jobId -> workerHost:port discovery: the docs now stick to the real supported path (endpoint for observability plus tag_filter pinning), and the code no longer pretends there is an automatic ingress discovery API.

What This PR Fixes

  • User pain: an external edge collector needs a lightweight socket ingress into a running Zeta job.
  • Fix approach: this PR adds the EdgeSocket source, collector protocol, queue/checkpoint handling, docs, and E2E coverage around the worker-side socket ingress path.
  • One-line summary: the source-side direction is reasonable, but the current ingress contract still breaks down if more than one collector targets the same source.

Runtime Chain Rechecked

worker-side ingress
  -> EdgeSocketSourceReader.openServerSocketWithRetry() [EdgeSocketSourceReader.java:197-248]
      -> bind 0.0.0.0:<port>
  -> receiveLoop() [250-335]
      -> accept one collector socket
      -> authenticateCollector(...)
      -> receiveFromCollector(reader, writer) [374-387]
          -> stay in that socket loop until EOF / close

collector ACK state
  -> handleCollectorRequest(...) [390-412]
  -> enqueueIncomingRecord(batchId, payload) [414-432]
      -> update latestReceivedBatchId + pendingBatchRecordCounts
  -> buildBatchCommitResponse(batchId) [457-467]
      -> compare against latestCheckpointedBatchId / latestReceivedBatchId
  -> snapshotState / notifyCheckpointComplete [162-188, 515-597]
      -> persist one global watermark and one global batch-state map for the reader

Findings

Issue 1: the current ingress contract is still effectively single-collector, but the runtime and protocol do not state or enforce that boundary

  • Location: EdgeSocketSourceReader.java:250-307, EdgeSocketSourceReader.java:457-513, docs/en/connectors/source/EdgeSocket.md:25-31, docs/en/connectors/source/EdgeSocket.md:176-233
  • Severity: High

Why this is a problem:

  1. receiveLoop() accepts one socket and then stays inside receiveFromCollector(...) until that collector disconnects. That means one long-lived collector monopolizes the reader thread and other collectors cannot be served concurrently on the same ingress.
  2. The ACK / checkpoint state is global to the whole reader (latestReceivedBatchId, latestCheckpointedBatchId, pendingBatchRecordCounts, drainedBatchIds). There is no collector/session identity in the protocol or state model.
  3. Because of that, even if multiple collectors are pointed at the same ingress, they would share one batch-id namespace and one checkpoint watermark. A collector polling __COMMIT__:N can therefore observe ACK/PENDING state influenced by another collector's batches.

Simple example:

  • Collector A sends batch 1 and keeps the socket open.
  • Collector B also wants to send batch 1 to the same EdgeSocket ingress.
  • Today B cannot be served concurrently because A already occupies the single receive loop.
  • Even if B were eventually connected later, the source only keeps one global batch watermark, so ACK:1 / PENDING would not be scoped to A vs B.

Why this matters on the normal path:

  • The docs and feature framing talk about "edge collectors" in plural, and the connector is presented as a general ingress source rather than a single dedicated point-to-point tunnel.
  • So this is not a corner case; it is the first scalability / correctness question operators will hit as soon as more than one collector targets the same source.

Best improvement suggestions:

  • Option A (smallest and safest for phase 1): explicitly scope v1 to exactly one active collector per EdgeSocket source. Enforce that in runtime (reject a second connection clearly), state it in docs, and keep the current global batch watermark model.
  • Option B (broader but more future-proof): add collector/session identity to the protocol and maintain per-collector ACK/checkpoint state, plus concurrent connection handling instead of a single blocking collector loop.
  • I would recommend Option A for this PR unless you are ready to widen the protocol and recovery model right now.

Compatibility / Side Effects

  • The docs/runtime mismatch around jobId discovery is fixed on the current head.
  • The remaining problem is not config compatibility; it is the runtime contract of the ingress itself.
  • Because the current ACK watermark is global, this is a correctness issue, not just a throughput optimization.

Tests / CI

  • The new unit tests and E2E tests cover authentication, queue backpressure, checkpoint restore, and packet framing for a single collector path.
  • They do not cover multi-collector concurrency or per-collector ACK isolation, which is exactly where the current blocker lives.
  • The current top-level Build is also still red on the reviewed head, with visible failures including unit-test (11, windows-latest) and several non-success follow-on jobs in the linked fork run.

Conclusion: can merge after fixes

Blocking items:

  1. Issue 1: clarify and enforce the collector contract. Right now the runtime behaves like a single-collector ingress, but the protocol/state model does not declare that boundary, and it is not safe for multiple independent collectors to share one ingress.
  2. Please get the latest Build back to green before merge.

Suggested non-blocking follow-up:

  • Once the collector contract is explicit, add one regression test that proves the chosen model: either "second collector is rejected clearly" (Option A) or "two collectors have isolated ACK state" (Option B).

I like the overall direction here, and the unsupported jobId discovery path is no longer being advertised. The current blocker is that the ingress protocol/state model still does not match a multi-collector interpretation of the feature.

@nzw921rx nzw921rx requested a review from DanielLeens May 14, 2026 06:26
Copy link
Copy Markdown
Contributor

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this. I re-reviewed the latest head from scratch against the real runtime path again.

What This PR Fixes

  • User pain: an edge collector needs a lightweight way to push data into a running Zeta job, but Phase 1 also needs a safe contract so concurrent collectors do not corrupt the reader's batch / checkpoint state.
  • Fix approach: the latest head explicitly scopes Phase 1 to a single collector, and enforces that in EdgeSocketSourceReader by draining backlog connections and suspending the listening socket after the first collector is accepted.
  • One-line summary: the single-collector runtime contract now looks much safer, but the current latest head still breaks a core unit test and is not merge-ready yet.

Runtime Chain Rechecked

collector connects
  -> EdgeSocketSourceReader.open() [136-146]
      -> startReceiverLoop()
          -> receiveLoop() [282-364]
              -> openServerSocketWithRetry() [229-280]
              -> accept collector
              -> hasActiveCollector = true [352]
              -> drainBacklogConnections(...) [685-719]
              -> suspendServerSocket() [722-731]
              -> authenticateCollector(...)
              -> receiveFromCollector(...)

checkpoint ACK path
  -> snapshotStateToBytes() [195-200]
  -> notifyCheckpointComplete() [209-219]
  -> collector re-sends __COMMIT__
  -> ACK:{batchId}

current failing test
  -> EdgeSocketSourceReaderTest.shouldAckBatchAfterCheckpointComplete() [56-93]
      -> reader.open()
      -> immediate new Socket("127.0.0.1", port) [61]
      -> no wait for listener readiness

Key Findings

  • The normal ingestion path absolutely hits this new logic. The single-collector restriction is no longer just documentation; it is enforced in the real reader lifecycle.
  • I do not see the earlier multi-collector state-corruption blocker reopened on the latest head.
  • The current blocker is in the updated test contract: the runtime lifecycle changed, but one core UT still assumes the old "open() means listener is immediately connectable" behavior.

Findings

Issue 1: the latest head already breaks a core EdgeSocket unit test on CI

  • Location: seatunnel-connectors-v2/connector-edge-socket/src/test/java/org/apache/seatunnel/connectors/seatunnel/edgesocket/source/EdgeSocketSourceReaderTest.java:61, :318
  • Problem:
    • shouldAckBatchAfterCheckpointComplete() opens the reader and immediately connects on line 61, without waiting for the listener to become ready.
    • But the current implementation now binds the ServerSocket asynchronously inside receiveLoop(), and other tests in the same class already had to switch to awaitServerReady(reader) for that reason.
    • On the current unit-test (11, ubuntu-latest) run, this exact mismatch already fails with java.net.ConnectException: Connection refused.
    • The same test class still also uses a fixed Thread.sleep(200) on line 318 for the second-collector rejection assertion, which leaves another timing-sensitive path behind.
  • Risk:
    This is a merge-blocking test stability issue. The current head is not just theoretically flaky; it is already red on CI.
  • Suggested fix:
    • At minimum, make shouldAckBatchAfterCheckpointComplete() wait on awaitServerReady(reader) before the first socket connect.
    • I would also strongly recommend replacing the fixed Thread.sleep(200) with a condition-based wait, so the single-collector test does not keep a timing race in CI.
  • Severity: High

Test Stability Assessment

  • Rating: High risk
  • Basis:
    • EdgeSocketSourceReaderTest.java:61 now fails on the latest CI because the test does not wait for listener readiness.
    • EdgeSocketSourceReaderTest.java:318 still depends on a fixed sleep for a lifecycle transition.

Conclusion: can merge after fixes

  1. Blocking items
  • Issue 1: align EdgeSocketSourceReaderTest with the new listener lifecycle and get the latest Build green again.
  1. Suggested follow-up
  • No additional source-level blocker from my side beyond fixing the current test contract mismatch.

Overall, the runtime direction is much better now, and the explicit single-collector scope is the right Phase 1 boundary. The remaining problem is concentrated in the test layer, but it needs to be fixed before merge because the latest head is already red on CI.

Copy link
Copy Markdown
Contributor

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. I re-reviewed the latest head from scratch after 4b3059d5385c2cb26c1d6800796aedd48245b323, and I reopened the real runtime path instead of only reading the added tests and docs.

What This PR Fixes

  • User pain: an edge collector source is only useful if the real runtime contract is clear around single-collector ownership, checkpoint ACK semantics, and reconnect behavior after restarts.
  • Fix approach: this head splits the old mixed reader logic into EdgeSocketIngressServer for connection lifecycle and EdgeSocketSourceState for checkpoint / watermark state, then adds targeted UT/E2E plus fuller docs.
  • One-line summary: the earlier "is the single-collector / checkpoint contract really enforced, or only described?" blocker is now implemented in code, state, and tests.

Runtime Chain Rechecked

source creation
  -> EdgeSocketSource.createReader(...)
  -> EdgeSocketSourceReader.open()
     -> EdgeSocketIngressServer.start()
        -> receiveLoop()
           -> accept one collector
           -> drainBacklogConnections()
           -> suspendServerSocket()
           -> authenticateCollector()
           -> receiveFromCollector()
              -> __BATCH__  -> handleBatchRecord()
              -> __COMMIT__ -> handleCommitRequest()

state path
  -> EdgeSocketSourceReader.handleBatchRecord()
     -> recordQueue.offer(decoded)
     -> EdgeSocketSourceState.markRecordReceived(batchId)

checkpoint path
  -> EdgeSocketSourceReader.snapshotState(checkpointId)
     -> EdgeSocketSourceState.snapshotState(checkpointId, queueSnapshot)
  -> EdgeSocketSourceReader.notifyCheckpointComplete(checkpointId)
     -> EdgeSocketSourceState.notifyCheckpointComplete(checkpointId)
  -> collector polls __COMMIT__:batchId
     -> EdgeSocketSourceState.buildCommitResponse(batchId)
        -> PENDING or ACK:<watermark>

Findings

  1. The previous blocker around the single-collector contract is materially improved on the current head. The runtime path now enforces it explicitly instead of leaving it as an implicit assumption.
  2. The checkpoint ACK path is now coherent: the collector only gets ACK:<watermark> after completed-checkpoint progress is reflected in EdgeSocketSourceState.
  3. The new tests cover the key runtime boundaries I was worried about before: single-collector ownership, queue replay after restore, and packet encryption/compression handling.
  4. I do not see a reopened source-level blocker on the current revision.

Compatibility / Side Effects

  • This is additive new connector behavior; I do not see a regression against existing connectors or a public API break.
  • Queue / socket / state responsibilities are clearer than in the earlier head, which is good for maintainability.
  • I do not see a new obvious concurrency or resource-release bug in the current runtime chain.

Issue

  1. docs/en/connectors/source/EdgeSocket.md:250, docs/en/connectors/source/EdgeSocket.md:314, EdgeSocketIngressServer.java:244, EdgeSocketIngressServer.java:381
    • The docs currently say that when another collector is already active, the source "immediately replies REJECTED".
    • The actual runtime behavior is slightly different: once one collector is active, the server socket is suspended, so a second collector will usually fail at TCP connect time; REJECTED is mainly the backlog-race fallback path drained by drainBacklogConnections().
    • This is not a merge blocker from my side, but the collector-facing protocol docs should describe both outcomes so clients implement the right retry / occupancy handling.
    • Severity: medium.

Tests / CI

  • The visible Build is still red.
  • The current public summary only exposes the umbrella Build matrix failure, not a newly isolated EdgeSocket-only failing job, so I do not yet have evidence of a reopened source-level regression tied specifically to the logic above.
  • I did recheck the code path itself on the latest head, and that source-level recheck is why I am not carrying my earlier blocker forward on this revision.

Conclusion: can merge after CI is green

Blocking items:

  • No reopened source-level blocker from this review.
  • The current visible gate is still the red Build, so I would wait for green CI (or a maintainer-confirmed unrelated rerun result) before merging.

Suggested non-blocking follow-up:

  • Update the collector protocol docs so they reflect the real "TCP connect refusal most of the time, REJECTED in the backlog-race window" behavior.

Overall, this head is materially stronger than the one I blocked earlier. The key runtime contract is now expressed in code, state, docs, and tests, and I do not see a new source-path reason to keep request-changes on the current revision.

@nzw921rx nzw921rx requested a review from DanielLeens May 15, 2026 03:25
@github-actions github-actions Bot added the core SeaTunnel core module label May 15, 2026
Copy link
Copy Markdown
Contributor

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for missing these in the last review — that's on me. I re-reviewed the latest head from scratch, and I want to flag two more items I should have caught earlier. Both are carryovers from the previous version, not newly introduced issues.

Thanks for the latest update. The earlier single-collector/runtime-contract blocker looks materially better on the current head: the server-side lifecycle is now explicit in EdgeSocketIngressServer, and the collector-facing docs are much clearer about the Connection refused vs REJECTED behavior.

What This PR Fixes

  • User pain: an external edge collector needs a lightweight socket ingress into a running Zeta job.
  • Fix approach: this PR adds the EdgeSocket source, collector protocol, checkpoint polling semantics, and the corresponding docs / UT / E2E coverage.
  • One-line summary: the Phase 1 direction is much clearer now, but the latest head still has one source-level correctness blocker in the checkpoint ACK contract, plus one remaining config-contract mismatch.

Runtime Chain Rechecked

collector request path
  -> EdgeSocketIngressServer.dispatchRequest(...)
      -> __BATCH__  -> EdgeSocketSourceReader.handleBatchRecord(...)
      -> __COMMIT__ -> EdgeSocketSourceReader.handleCommitRequest(...)
          -> EdgeSocketSourceState.buildCommitResponse(batchId)

checkpoint / restore path
  -> EdgeSocketSourceReader.snapshotStateToBytes(checkpointId)
      -> EdgeSocketSourceState.snapshotState(...)
  -> EdgeSocketSourceReader.notifyCheckpointComplete(checkpointId)
      -> EdgeSocketSourceState.notifyCheckpointComplete(...)
  -> EdgeSocketSourceReader.restoreState(...)
      -> EdgeSocketSourceState.restoreState(...)

config / metadata path
  -> EdgeSocketSourceFactory.optionRule()
  -> SeaTunnelConfValidateCommand.validate(...)
  -> MetadataExportCommand discovery/export path

Key Findings

  • The normal __COMMIT__ path absolutely hits EdgeSocketSourceState.buildCommitResponse(...) on every collector-side checkpoint poll.
  • The current state model still keeps one global lastReceivedBatchId / lastCommittedBatchId namespace, but the protocol docs still do not define batchId as globally monotonic across reconnects / restores.
  • The current E2E helper still restarts batchId from 1 for each new socket session, which shows the published contract is still not explicit enough.
  • Two documented backpressure options are still implemented in runtime but missing from EdgeSocketSourceFactory.optionRule(), so validation / metadata export still cannot describe the real connector contract end to end.

Findings

Issue 1: the checkpoint ACK watermark is still global, but the protocol does not scope batchId to a globally monotonic namespace

  • Location: EdgeSocketSourceState.buildCommitResponse() / restoreState(), EdgeSocketSourceReader.handleCommitRequest(), AbstractEdgeSocketIT.sendRecordsThroughCollector() / sendPacketRecords() / sendEncryptedPacketRecords(), and the __COMMIT__ section in docs/en/connectors/source/EdgeSocket.md.
  • Why this is a problem:
    1. EdgeSocketSourceState persists one global lastCommittedBatchId and one global lastReceivedBatchId.
    2. buildCommitResponse(batchId) returns ACK:<watermark> as soon as batchId <= lastCommittedBatchId.
    3. restoreState(...) brings that committed watermark back after restart.
    4. But the docs only say that batchId is a positive long; they do not say it must remain globally monotonic across reconnects / restarts.
    5. The current E2E helpers still restart batchId from 1 for each new socket session, which is exactly the ambiguity here.
  • Simple example:
    • Session A previously committed up to watermark 100.
    • The worker restores that state.
    • A new collector session starts again with __BATCH__:1:... and later polls __COMMIT__:1.
    • Today buildCommitResponse(1) can immediately return ACK:100, and the collector docs tell the sender to evict every buffered batch whose batchId <= 100.
    • That means a brand-new session can be mistaken for already checkpoint-confirmed data.
  • Risk: this is a real correctness blocker on the main checkpoint-ACK path. In the reconnect / restore case, the collector can discard not-yet-confirmed data based on an old watermark.
  • Suggested fix:
    • Option A (smaller Phase 1 patch): explicitly require batchId to stay globally monotonic for the full logical source lifetime, including reconnect / restore, and then align the docs, examples, and E2E helpers with that contract.
    • Option B (more future-proof): add a session / epoch dimension to the protocol and keep ACK state scoped by that identity instead of one global batch namespace.
    • For this PR, I would accept Option A as the smallest safe boundary, but the current head needs one of these two contracts to be explicit and consistently implemented.
  • Severity: High

Issue 2: two documented runtime options are still missing from EdgeSocketSourceFactory.optionRule()

  • Location: EdgeSocketSourceFactory.optionRule(), EdgeSocketSourceOptions, docs/en/connectors/source/EdgeSocket.md, SeaTunnelConfValidateCommand, and MetadataExportCommand.
  • Problem:
    • queue_backpressure_watermark_ratio and queue_full_retry_after_ms are both documented and used by the runtime config / queue logic.
    • But they are still not listed in EdgeSocketSourceFactory.optionRule().
    • SeaTunnel config validation, unknown-key checks, and metadata export all depend on that OptionRule contract.
  • Risk: users can follow the docs and still hit validation / metadata mismatches, and SeaTunnel Web / metadata consumers still cannot see the full real connector contract.
  • Suggested fix: add both options to optionRule().optional(...) and keep one focused regression test so the factory contract cannot silently drift away from the runtime/doc contract again.
  • Severity: Medium

Tests / CI

  • The latest Build check is still queued as I write this review.
  • My blocking conclusion above is source-level and independent of that CI result.

Conclusion: can merge after fixes

  1. Blocking items
  • Issue 1: make the batchId / ACK:<watermark> contract safe across reconnect and restore. Right now the protocol/state model still allows a new session to be acknowledged by an old committed watermark.
  1. Suggested follow-up
  • Issue 2: complete the OptionRule contract so validation / metadata export match the runtime and docs.
  • Merge only from a green head once the current Build finishes.

Overall, I think this head is significantly better than the earlier revisions, and I do not see the previous single-collector blocker reopened. The remaining blocker is narrower now, but it is still a correctness issue on the checkpoint-ACK contract, so I do not think this is merge-ready yet.

Copy link
Copy Markdown
Contributor

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. I re-reviewed the latest head and retraced the EdgeSocket commit/restore path. On the current head I do not see a new source-level blocker, but the latest Build is still red, so this is still waiting on CI rather than source corrections from my side.

What this PR solves

  • User pain point
    Edge collectors need unambiguous feedback after reconnects and restores: whether a batch is already covered, still pending, unknown, or must be resent.
  • Fix approach
    The latest head restores the missing backpressure options in OptionRule, adds sessionFloorWatermark / RESEND handling in EdgeSocketSourceState, and updates the protocol docs.
  • One-line summary
    This revision makes the EdgeSocket commit/restore protocol much clearer, and I do not see a new source-level blocker on the latest head.

Runtime path I checked

Collector sends __BATCH__ / __COMMIT__
  -> EdgeSocketSourceState.markRecordReceived()/markRecordEmitted()
  -> checkpoint snapshotState(...)
  -> restoreState(...) restores watermark + pending/drained state
  -> buildCommitResponse(batchId)
      -> ACK / PENDING / RETRY / RESEND

Key findings

  1. The main collector protocol definitely hits buildCommitResponse(...); this is not an edge path.
  2. The missing config-contract gap is fixed: EdgeSocketSourceFactory.optionRule() now includes queue_backpressure_watermark_ratio and queue_full_retry_after_ms (EdgeSocketSourceFactory.java:43-58).
  3. The restored-session behavior is clearer now: restoreState() records a sessionFloorWatermark, and buildCommitResponse() can return RESEND for in-flight batches from the previous session that were not re-received yet (EdgeSocketSourceState.java:78-168).
  4. The new tests cover the option-rule contract and several restore/watermark cases (EdgeSocketFactoryTest.java:47-60, EdgeSocketSourceStateTest.java:145-204).
  5. I did not find a new source-level blocker on the latest head.

Test / CI notes

  • The tests I reviewed are in-memory state-machine checks and do not introduce an obvious flaky pattern.
  • The latest contributor Build is still red across multiple lanes, including unit-test (8, ubuntu-latest), kafka-connector-it (11, ubuntu-latest), and paimon-connector-it (11, ubuntu-latest), so CI still needs to be cleaned up before merge.

Conclusion: can merge after fixes

  1. Blocking items
  • Please get the latest Build green before merge.
  1. Suggested but non-blocking improvements
  • A focused unit test for the explicit RESEND branch would still be nice to add later, but I am not blocking on that in this revision.

Overall, the latest head closes the main config-contract and restore-semantics gaps I was worried about. From the source side, I do not have a new blocker on this revision; the remaining gate is the red CI matrix.

@nzw921rx nzw921rx requested a review from DanielLeens May 16, 2026 12:12
Copy link
Copy Markdown
Contributor

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. I re-reviewed the latest head. Compared with the last Daniel-reviewed head 917ec0dbe0b6bf7c044b3eb6dbf05a97e4eeea34, the new commit only updates the in-source comments around EdgeSocketSourceState; I did not find a new source-level blocker in the current implementation.

What problem this PR solves

  • User pain point
    EdgeSocket ingress needs a recoverable batch protocol so the collector can distinguish ACK, PENDING, RETRY, and RESEND across checkpoint, failover, and reader restart.
  • Fix approach
    The broader PR series already fixed the ingress state-machine behavior. This latest head mainly adds explanatory comments to the core state holder so the restore/commit semantics are easier to maintain correctly.
  • One-line summary
    The latest incremental update is a readability improvement on top of the already-fixed EdgeSocket ingress state machine.

1. Code change review

1.1 Core logic analysis

The main runtime path is still:

__BATCH__
  -> markRecordReceived(batchId)
     -> pending count / lastReceivedBatchId

pollNext
  -> markRecordEmitted(batchId)
     -> drainedBatchIds

checkpoint
  -> snapshotState(checkpointId, queueSnapshot)

restart
  -> restoreState(bytes)
     -> sessionFloorWatermark + queued records re-offer

__COMMIT__
  -> buildCommitResponse(batchId)
     -> ACK / PENDING / RETRY / RESEND

On the latest head, the decision code in buildCommitResponse, snapshotState, restoreState, and checkpoint watermark handling is unchanged relative to the last current-head review. The new diff is comment-only and explains the same behavior more clearly.

1.2 Compatibility impact

Fully compatible relative to the previously reviewed current head. No protocol field, default, serialization layout, or restore behavior changed in this new delta.

1.3 Performance / side effects

No new CPU, memory, GC, network, concurrency, retry, idempotency, or resource-release risk from the latest head delta.

1.4 Error handling and logging

No new source-level blocking issue found.

2. Code quality evaluation

2.1 Code style

These comments are actually helpful because they explain the recovery-sensitive parts of the state machine instead of adding noise.

2.2 Test coverage and stability

The latest incremental commit does not add new tests, but it also does not change the behavior that was previously reviewed.

2.3 Documentation

No additional docs update is required for this comment-only delta.

3. Architecture

3.1 Solution quality

This latest delta is a maintainability improvement, not a new architectural change.

3.2 Maintainability

Better than before. This is exactly the kind of checkpoint/retry state logic that benefits from explicit source comments.

3.3 Extensibility

No direct functional extension, but it lowers the risk of future regressions in the ingress state machine.

3.4 Historical compatibility

No new compatibility risk introduced by the latest head delta.

4. Issue summary

No new source-level blocking issue found.

5. Merge conclusion

Conclusion: can merge

  1. Blocking items
    No source blocker from the latest head delta.

  2. Suggested follow-ups
    None.

From the source-review perspective, the latest head looks good to me. The remaining gate is just waiting for the Build check to finish.

@nzw921rx nzw921rx force-pushed the feature/stip24_edge_socket branch 2 times, most recently from a37bdbd to d2dc782 Compare May 16, 2026 17:03
@nzw921rx nzw921rx force-pushed the feature/stip24_edge_socket branch from d2dc782 to c8787a2 Compare May 16, 2026 17:07
@github-actions github-actions Bot removed the Zeta label May 16, 2026
@nzw921rx nzw921rx requested a review from DanielLeens May 16, 2026 17:20
Copy link
Copy Markdown
Contributor

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. I re-reviewed the latest head from scratch.

Compared with the head Daniel reviewed previously (f9f86eca5e20aa336f46605a6ec0697e859eb29c), the latest delta is only comment cleanup and newline normalization:

DefaultEdgeSocketRecordQueue.java    -1
EdgeSocketIngressServer.java         -5
EdgeSocketResponseCode.java          -1
IncomingRecordHandler.java           -4
EdgeSocketSourceReader.java          -5
EdgeSocketLogUtils.java              -6
log4j2-test.properties              +1/-1
SeaTunnelEngine.yaml                +1/-1

Even so, I retraced the full runtime path end to end on the current head.

What this PR fixes

  • User pain point: Zeta did not have a lightweight, checkpoint-aware edge-ingress source for remote collectors.
  • Fix approach: this PR adds the EdgeSocket source runtime, including TCP ingress, single-collector enforcement, batch receive / commit polling, checkpoint-aware watermarking, docs, and E2E coverage.
  • One-line summary: this introduces a real edge-collector ingestion path into Zeta, with the protocol and checkpoint semantics defined in code instead of left implicit.

Full runtime chain I rechecked

Edge collector connection
  -> EdgeSocketSourceReader.open() [EdgeSocketSourceReader.java:80-83]
      -> EdgeSocketIngressServer.start() [EdgeSocketIngressServer.java:66-76]
          -> receiveLoop() [176-260]
              -> openServerSocketWithRetry() [103-154]
              -> accept collector
              -> authenticateCollector() [281-309]

Batch ingestion
  -> "__BATCH__:<batchId>:<payload>"
      -> EdgeSocketIngressServer.dispatchRequest() [335-369]
      -> EdgeSocketSourceReader.handleBatchRecord() [162-220]
          -> decode payload
          -> enqueue record
          -> sourceState.markRecordReceived(batchId)
          -> return RECEIVED / QUEUE_FULL / RETRY / DECRYPT_FAILED

Checkpoint coordination
  -> pollNext() [95-106]
      -> emitRecordSafely() [108-123]
      -> sourceState.markRecordEmitted(batchId)
  -> snapshotStateToBytes(checkpointId) [126-130]
      -> sourceState.snapshotState(...) [158-187]
  -> notifyCheckpointComplete(checkpointId) [148-152]
      -> sourceState.notifyCheckpointComplete(checkpointId) [255-264]

Collector commit polling
  -> "__COMMIT__:<batchId>"
      -> EdgeSocketIngressServer.dispatchRequest() [337-344]
      -> EdgeSocketSourceReader.handleCommitRequest() [222-226]
      -> EdgeSocketSourceState.buildCommitResponse() [127-144]
          -> ACK:<watermark> / PENDING / RETRY / RESEND

Findings

  1. The normal runtime path definitely hits the new code. This is not dead or purely auxiliary logic.
  2. The single-collector restriction is intentional and consistently enforced in both code and docs.
  3. The ACK / PENDING / RETRY / RESEND / QUEUE_FULL / DECRYPT_FAILED contract is aligned across runtime code, unit tests, E2E, and docs.
  4. I did not find a reopened source-level blocker on the current head.

Tests

  • EdgeSocketSourceReaderTest covers the important reader-side checkpoint and single-collector paths.
  • The tests use explicit condition waiting (Awaitility) instead of fragile fixed sleeps for correctness checks.
  • The E2E retry loops are deadline-bounded retry/backoff loops, not one-shot timing assumptions.
  • My stability verdict for the new UT/E2E coverage is: Stable.

Compatibility / side effects

  • Existing jobs are unaffected. This is an additive connector capability.
  • The main operational tradeoff is explicit and documented: one collector at a time, and parallelism = 1.
  • Backpressure is handled through bounded queue capacity plus QUEUE_FULL:<ms> responses, which is the right shape for this phase.

Conclusion: can merge after fixes

Blocking items:

  1. I do not see a new source-level blocker on the current head.
  2. The latest GitHub Build on f7c56f60d2f6f707229f61225043cc6fb6d1661e is cancelled, so please rerun/get a green Build on the current head before merge.

Non-blocking follow-up:

  1. None from my side on the current runtime path.

Overall, the feature design still holds up on the latest head, and the post-review delta does not reopen the runtime logic Daniel already validated. The remaining gate is CI state on the current head rather than a code-path blocker.

…rification and use EdgeSocketConfig for built-in verification uniformly
Copy link
Copy Markdown
Contributor

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. I re-reviewed the latest head from scratch.

What problem this PR solves

  • User pain point
    Edge collectors need a checkpoint-aware ingress protocol so they can distinguish ACK, PENDING, RETRY, and RESEND correctly across restart and recovery.
  • Fix approach
    This PR adds the EdgeSocket source runtime, ingress server, queueing / checkpoint state, docs, and E2E coverage.
  • One sentence
    The latest delta is small and does not reopen the runtime semantics Daniel already checked in depth.

Compared with the last Daniel-reviewed head, the new commit only removes a redundant queue verification / matching test lines. I still retraced the real runtime path:

collector connection
  -> EdgeSocketIngressServer.start()
  -> authenticateCollector()

batch ingestion
  -> handleBatchRecord()
      -> queue.offer(...)
      -> sourceState.markRecordReceived(batchId)

checkpoint / recovery
  -> pollNext()
      -> sourceState.markRecordEmitted(batchId)
  -> snapshotState(...)
  -> notifyCheckpointComplete(...)
  -> buildCommitResponse()
      -> ACK / PENDING / RETRY / RESEND

Key findings:

  1. The normal runtime path still hits the same state machine Daniel previously validated.
  2. The latest delta does not change the queue semantics, commit protocol, or checkpoint state layout.
  3. I do not see a reopened source-level blocker on the current head.

1. Code change review

1.1 Core logic analysis

DefaultEdgeSocketRecordQueue.offer(...), snapshot(), and isBackpressure() still keep the same semantics on the latest head. The current docs also still correctly explain the passive source deployment constraint and the tag_filter pinning path.

1.2 Compatibility impact

Fully compatible relative to the previously reviewed current implementation. No protocol/default/serialization contract changed in this latest delta.

1.3 Performance / side effects

No new CPU, memory, GC, network, concurrency, retry, idempotency, or resource-release risk surfaced from the latest change.

1.4 Error handling and logging

No new source-level blocking issue found.

2. Code quality evaluation

2.1 Code style

The current head is slightly cleaner after removing the redundant verification.

2.2 Test coverage and stability

The existing unit/E2E coverage Daniel reviewed earlier still stands, and I do not see a new flaky-test pattern introduced by the latest delta.

Test stability rating: Stable.

2.3 Documentation

No additional documentation blocker from this round.

3. Architecture

3.1 Solution quality

The feature design still holds up on the latest head.

3.2 Maintainability

The latest cleanup slightly improves maintainability without changing behavior.

3.3 Extensibility

No new extensibility concern surfaced in this round.

3.4 Historical compatibility

No new compatibility risk introduced by the current delta.

4. Issue summary

No new source-level blocking issue found.

5. Merge conclusion

Conclusion: can merge

  1. Blocking items
  • No code blocker from my side on the latest head.
  1. Suggested but non-blocking follow-ups
  • The latest GitHub Build is still pending, so please wait for the updated Build result before the actual merge.

From the source-review side, the current head still looks good to me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants