[Feature][Zeta] STIP-24 Phase 1: Support lightweight EdgeSocket ingress for edge collector MVP#10878
[Feature][Zeta] STIP-24 Phase 1: Support lightweight EdgeSocket ingress for edge collector MVP#10878nzw921rx wants to merge 7 commits into
Conversation
6a92b2f to
c9e9952
Compare
DanielLeens
left a comment
There was a problem hiding this comment.
Thanks for working on this. I went through the full diff and rechecked the latest head from the real runtime path, not just the connector classes. The direction makes sense, but I found one main-path blocker in the automatic-discovery contract.
What This PR Fixes
- User pain: an external edge collector needs a lightweight way to push data into a running EdgeSocket source in Zeta.
- Fix approach: the PR adds the
connector-edge-socketsource plus a new engine client/server API,getJobTaskGroupAddresses(jobId), so a collector can discover worker addresses by job. - One-line summary: the feature direction is good, but the current “discover ingress by jobId” path still cannot uniquely identify the worker that actually hosts the EdgeSocket source.
Runtime Chain Rechecked
Automatic discovery mode in docs
-> docs/en/connectors/source/EdgeSocket.md:115-123
-> omit endpoint
-> resolve workerHost:port by jobId
Client / server path
-> JobClient.getJobTaskGroupAddresses(...) [JobClient.java:130-133]
-> CoordinatorService.queryJobTaskGroupAddresses(...) [CoordinatorService.java:458-463]
-> JobMaster.queryTaskGroupAddresses() [JobMaster.java:836-846]
-> returns all task-group worker addresses for the job
Current E2E helper
-> AbstractEdgeSocketIT.resolveEdgeIngressHostByJobClient() [AbstractEdgeSocketIT.java:320-351]
-> parse the address array
-> return the first host it sees
Key Findings
- The normal path absolutely hits this logic because the docs present jobId-based automatic discovery as a recommended mode.
- But
getJobTaskGroupAddresses(jobId)returns all task-group addresses for the job, not “the EdgeSocket source ingress worker”. - The current E2E helper exposes the same gap: it just takes the first host from the returned array, with no source-role filtering at all.
- So once source / transform / sink task groups are placed on different workers, the collector can connect to a worker that does not host the EdgeSocket ingress listener.
Findings
Issue 1: jobId-based discovery returns the whole job’s task-group address set, not the EdgeSocket ingress target
- Location:
JobClient.java:130-133;CoordinatorService.java:458-463;JobMaster.java:836-846;AbstractEdgeSocketIT.java:320-351 - Why it matters: the automatic-discovery main path currently treats “all task-group worker addresses for the job” as if that were “the ingress address for the EdgeSocket source”.
- Risk: real main-path failure. A collector can connect to a transform/sink worker that is not listening for EdgeSocket ingress at all.
- Suggested fix:
- Option A: have the engine return only the EdgeSocket source ingress address set.
- Option B: enrich the returned JSON with enough role metadata for the client to filter the EdgeSocket source task group deterministically.
- Severity: High
Issue 2: there is still no regression test proving discovery picks the source ingress in a multi-task-group / multi-worker layout
- Location:
AbstractEdgeSocketIT.java:320-351;edge_socket_source_to_mysql84.conf - Why it matters: the current tests cover the happy path, but they do not pin the exact contract this PR relies on most.
- Suggested fix: add a focused discovery regression case where source / transform / sink do not all collapse onto the same worker, and assert that discovery still resolves the actual EdgeSocket source ingress.
- Severity: Medium
Issue 3: the latest Build check is still pending
- Location: GitHub check
Build - Severity: Low
Merge Decision
Conclusion: can merge after fixes
- Blocking items
- Issue 1: make the automatic-discovery contract uniquely identify the EdgeSocket source ingress target.
- Suggested follow-up
- Issue 2: add a regression test for the multi-task-group discovery case.
- Issue 3: wait for the latest
Buildto turn green.
Overall, I like the direction, but I do not think Phase 1 is closed yet. As long as the discovery API only returns the whole job’s task-group address set, the automatic-ingress path is still guessing.
DanielLeens
left a comment
There was a problem hiding this comment.
Thanks for working on this. I re-reviewed the latest head from scratch against upstream/dev.
What This PR Fixes
- User pain: an external collector needs a lightweight ingress path into a running Zeta job.
- Fix approach: this PR adds the
EdgeSocketsource, plus docs/tests around how a collector is supposed to reach the worker-side socket. - One-line summary: the source-side direction is reasonable, but the latest head has already removed the
jobId -> workerHost:portdiscovery implementation while the docs and test story still present that path as supported.
Simple example:
- Before this PR family, there was no dedicated socket source for a collector to push records into.
- In the current head, the source can bind
0.0.0.0:<port>on a worker, but if an operator follows the docs and omitsendpoint, there is no longer any runtime API that can actually resolve the target worker address fromjobId.
Runtime Chain Rechecked
Job submission
-> EdgeSocketSourceFactory.optionRule() [EdgeSocketSourceFactory.java:42-56]
-> accepts endpoint / port / auth_token / retry options
Real source runtime
-> EdgeSocketSourceReader.openServerSocketWithRetry() [EdgeSocketSourceReader.java:199-218]
-> bind 0.0.0.0:<port>
-> log endpoint as metadata only
What the docs still advertise
-> docs/en/connectors/source/EdgeSocket.md:109-123
-> omit endpoint
-> resolve workerHost:port by jobId
What the current E2E actually does
-> AbstractEdgeSocketIT.ensureEdgeSocketForwarderByJobId() [114-116]
-> directly use EDGE_INGRESS_HOST
-> no jobId-based lookup anymore
Key Findings
- The normal main path absolutely hits
EdgeSocketSourceReader.openServerSocketWithRetry(). - The current head no longer exposes any
jobIddiscovery API for ingress resolution. - The docs still present automatic discovery as a supported access mode, and the helper method name still suggests that too, but the code path is gone.
- The optional
endpointis currently only logged by the source; there is no client/API surface consuming it.
Findings
Issue 1: the latest head removed automatic discovery, but the user-facing contract still says it exists
- Location:
docs/en/connectors/source/EdgeSocket.md:109-123,docs/zh/connectors/source/EdgeSocket.md:104-123,AbstractEdgeSocketIT.java:114-116,EdgeSocketSourceReader.java:207-216 - Why it matters: the real runtime now only binds a local worker port. The docs still tell users that omitting
endpointis fine becauseworkerHost:portcan be discovered fromjobId, but that implementation path has been deleted on the current head. - Risk: main-path user failure. A collector/operator following the published docs still expects a discovery flow that no longer exists.
- Suggested fix:
- Option A: explicitly narrow this PR to direct/manual ingress only, and remove the automatic-discovery wording/examples/tests in one pass.
- Option B: if automatic discovery is still part of the intended contract, add back a real discovery surface instead of leaving it as documentation only.
- Severity: High
Issue 2: the new tests still do not pin the published access-mode contract, and one unit test uses a fixed sleep for async failure timing
- Location:
EdgeSocketSourceReaderTest.java:95-109,AbstractEdgeSocketIT.java:114-116 - Why it matters:
shouldFailWhenRetryBudgetExhausted()waits withThread.sleep(300)before asserting an async failure path, which is timing-sensitive on slower CI. At the same time, the E2E helper no longer verifies the access mode that the docs describe most prominently. - Risk: medium test-stability / coverage risk. The suite can miss future regressions around the actual published ingress contract.
- Suggested fix: replace the fixed sleep with an explicit condition wait, and align the tests with the real access mode that this PR intends to support.
- Severity: Medium
Issue 3: the latest Build check is still not green
- Location: GitHub check
Build - Severity: Low
Merge Decision
Conclusion: can merge after fixes
- Blocking items
- Issue 1: make the access-mode contract consistent. Either this PR is direct-connect only, or it needs a real discovery surface again.
- Suggested follow-up
- Issue 2: tighten the async test waiting strategy and align coverage with the real supported ingress mode.
- Issue 3: rerun the latest
Buildand merge only from a green head.
Overall, I think the latest head is closer to a workable Phase 1 than the earlier version, because it stops pretending the old discovery API was reliable. But it still is not merge-ready until the user-facing contract is cleaned up end to end.
|
@davidzollo Can you help with the review? Any suggestions would be greatly appreciated |
DanielLeens
left a comment
There was a problem hiding this comment.
Sorry for missing this in the last review — that's on me. I re-reviewed the latest head from scratch, and I want to flag one more blocker that I should have caught earlier. This is a carryover from the previous version, not a newly introduced issue.
Thanks for the latest update. The current head does close the earlier docs/runtime mismatch around jobId -> workerHost:port discovery: the docs now stick to the real supported path (endpoint for observability plus tag_filter pinning), and the code no longer pretends there is an automatic ingress discovery API.
What This PR Fixes
- User pain: an external edge collector needs a lightweight socket ingress into a running Zeta job.
- Fix approach: this PR adds the
EdgeSocketsource, collector protocol, queue/checkpoint handling, docs, and E2E coverage around the worker-side socket ingress path. - One-line summary: the source-side direction is reasonable, but the current ingress contract still breaks down if more than one collector targets the same source.
Runtime Chain Rechecked
worker-side ingress
-> EdgeSocketSourceReader.openServerSocketWithRetry() [EdgeSocketSourceReader.java:197-248]
-> bind 0.0.0.0:<port>
-> receiveLoop() [250-335]
-> accept one collector socket
-> authenticateCollector(...)
-> receiveFromCollector(reader, writer) [374-387]
-> stay in that socket loop until EOF / close
collector ACK state
-> handleCollectorRequest(...) [390-412]
-> enqueueIncomingRecord(batchId, payload) [414-432]
-> update latestReceivedBatchId + pendingBatchRecordCounts
-> buildBatchCommitResponse(batchId) [457-467]
-> compare against latestCheckpointedBatchId / latestReceivedBatchId
-> snapshotState / notifyCheckpointComplete [162-188, 515-597]
-> persist one global watermark and one global batch-state map for the reader
Findings
Issue 1: the current ingress contract is still effectively single-collector, but the runtime and protocol do not state or enforce that boundary
- Location:
EdgeSocketSourceReader.java:250-307,EdgeSocketSourceReader.java:457-513,docs/en/connectors/source/EdgeSocket.md:25-31,docs/en/connectors/source/EdgeSocket.md:176-233 - Severity: High
Why this is a problem:
receiveLoop()accepts one socket and then stays insidereceiveFromCollector(...)until that collector disconnects. That means one long-lived collector monopolizes the reader thread and other collectors cannot be served concurrently on the same ingress.- The ACK / checkpoint state is global to the whole reader (
latestReceivedBatchId,latestCheckpointedBatchId,pendingBatchRecordCounts,drainedBatchIds). There is no collector/session identity in the protocol or state model. - Because of that, even if multiple collectors are pointed at the same ingress, they would share one batch-id namespace and one checkpoint watermark. A collector polling
__COMMIT__:Ncan therefore observe ACK/PENDING state influenced by another collector's batches.
Simple example:
- Collector A sends batch
1and keeps the socket open. - Collector B also wants to send batch
1to the same EdgeSocket ingress. - Today B cannot be served concurrently because A already occupies the single receive loop.
- Even if B were eventually connected later, the source only keeps one global batch watermark, so
ACK:1/PENDINGwould not be scoped to A vs B.
Why this matters on the normal path:
- The docs and feature framing talk about "edge collectors" in plural, and the connector is presented as a general ingress source rather than a single dedicated point-to-point tunnel.
- So this is not a corner case; it is the first scalability / correctness question operators will hit as soon as more than one collector targets the same source.
Best improvement suggestions:
- Option A (smallest and safest for phase 1): explicitly scope v1 to exactly one active collector per EdgeSocket source. Enforce that in runtime (reject a second connection clearly), state it in docs, and keep the current global batch watermark model.
- Option B (broader but more future-proof): add collector/session identity to the protocol and maintain per-collector ACK/checkpoint state, plus concurrent connection handling instead of a single blocking collector loop.
- I would recommend Option A for this PR unless you are ready to widen the protocol and recovery model right now.
Compatibility / Side Effects
- The docs/runtime mismatch around
jobIddiscovery is fixed on the current head. - The remaining problem is not config compatibility; it is the runtime contract of the ingress itself.
- Because the current ACK watermark is global, this is a correctness issue, not just a throughput optimization.
Tests / CI
- The new unit tests and E2E tests cover authentication, queue backpressure, checkpoint restore, and packet framing for a single collector path.
- They do not cover multi-collector concurrency or per-collector ACK isolation, which is exactly where the current blocker lives.
- The current top-level
Buildis also still red on the reviewed head, with visible failures includingunit-test (11, windows-latest)and several non-success follow-on jobs in the linked fork run.
Conclusion: can merge after fixes
Blocking items:
- Issue 1: clarify and enforce the collector contract. Right now the runtime behaves like a single-collector ingress, but the protocol/state model does not declare that boundary, and it is not safe for multiple independent collectors to share one ingress.
- Please get the latest
Buildback to green before merge.
Suggested non-blocking follow-up:
- Once the collector contract is explicit, add one regression test that proves the chosen model: either "second collector is rejected clearly" (Option A) or "two collectors have isolated ACK state" (Option B).
I like the overall direction here, and the unsupported jobId discovery path is no longer being advertised. The current blocker is that the ingress protocol/state model still does not match a multi-collector interpretation of the feature.
DanielLeens
left a comment
There was a problem hiding this comment.
Thanks for working on this. I re-reviewed the latest head from scratch against the real runtime path again.
What This PR Fixes
- User pain: an edge collector needs a lightweight way to push data into a running Zeta job, but Phase 1 also needs a safe contract so concurrent collectors do not corrupt the reader's batch / checkpoint state.
- Fix approach: the latest head explicitly scopes Phase 1 to a single collector, and enforces that in
EdgeSocketSourceReaderby draining backlog connections and suspending the listening socket after the first collector is accepted. - One-line summary: the single-collector runtime contract now looks much safer, but the current latest head still breaks a core unit test and is not merge-ready yet.
Runtime Chain Rechecked
collector connects
-> EdgeSocketSourceReader.open() [136-146]
-> startReceiverLoop()
-> receiveLoop() [282-364]
-> openServerSocketWithRetry() [229-280]
-> accept collector
-> hasActiveCollector = true [352]
-> drainBacklogConnections(...) [685-719]
-> suspendServerSocket() [722-731]
-> authenticateCollector(...)
-> receiveFromCollector(...)
checkpoint ACK path
-> snapshotStateToBytes() [195-200]
-> notifyCheckpointComplete() [209-219]
-> collector re-sends __COMMIT__
-> ACK:{batchId}
current failing test
-> EdgeSocketSourceReaderTest.shouldAckBatchAfterCheckpointComplete() [56-93]
-> reader.open()
-> immediate new Socket("127.0.0.1", port) [61]
-> no wait for listener readiness
Key Findings
- The normal ingestion path absolutely hits this new logic. The single-collector restriction is no longer just documentation; it is enforced in the real reader lifecycle.
- I do not see the earlier multi-collector state-corruption blocker reopened on the latest head.
- The current blocker is in the updated test contract: the runtime lifecycle changed, but one core UT still assumes the old "open() means listener is immediately connectable" behavior.
Findings
Issue 1: the latest head already breaks a core EdgeSocket unit test on CI
- Location:
seatunnel-connectors-v2/connector-edge-socket/src/test/java/org/apache/seatunnel/connectors/seatunnel/edgesocket/source/EdgeSocketSourceReaderTest.java:61,:318 - Problem:
shouldAckBatchAfterCheckpointComplete()opens the reader and immediately connects on line 61, without waiting for the listener to become ready.- But the current implementation now binds the
ServerSocketasynchronously insidereceiveLoop(), and other tests in the same class already had to switch toawaitServerReady(reader)for that reason. - On the current
unit-test (11, ubuntu-latest)run, this exact mismatch already fails withjava.net.ConnectException: Connection refused. - The same test class still also uses a fixed
Thread.sleep(200)on line 318 for the second-collector rejection assertion, which leaves another timing-sensitive path behind.
- Risk:
This is a merge-blocking test stability issue. The current head is not just theoretically flaky; it is already red on CI. - Suggested fix:
- At minimum, make
shouldAckBatchAfterCheckpointComplete()wait onawaitServerReady(reader)before the first socket connect. - I would also strongly recommend replacing the fixed
Thread.sleep(200)with a condition-based wait, so the single-collector test does not keep a timing race in CI.
- At minimum, make
- Severity: High
Test Stability Assessment
- Rating: High risk
- Basis:
EdgeSocketSourceReaderTest.java:61now fails on the latest CI because the test does not wait for listener readiness.EdgeSocketSourceReaderTest.java:318still depends on a fixed sleep for a lifecycle transition.
Conclusion: can merge after fixes
- Blocking items
- Issue 1: align
EdgeSocketSourceReaderTestwith the new listener lifecycle and get the latest Build green again.
- Suggested follow-up
- No additional source-level blocker from my side beyond fixing the current test contract mismatch.
Overall, the runtime direction is much better now, and the explicit single-collector scope is the right Phase 1 boundary. The remaining problem is concentrated in the test layer, but it needs to be fixed before merge because the latest head is already red on CI.
DanielLeens
left a comment
There was a problem hiding this comment.
Thanks for the update. I re-reviewed the latest head from scratch after 4b3059d5385c2cb26c1d6800796aedd48245b323, and I reopened the real runtime path instead of only reading the added tests and docs.
What This PR Fixes
- User pain: an edge collector source is only useful if the real runtime contract is clear around single-collector ownership, checkpoint ACK semantics, and reconnect behavior after restarts.
- Fix approach: this head splits the old mixed reader logic into
EdgeSocketIngressServerfor connection lifecycle andEdgeSocketSourceStatefor checkpoint / watermark state, then adds targeted UT/E2E plus fuller docs. - One-line summary: the earlier "is the single-collector / checkpoint contract really enforced, or only described?" blocker is now implemented in code, state, and tests.
Runtime Chain Rechecked
source creation
-> EdgeSocketSource.createReader(...)
-> EdgeSocketSourceReader.open()
-> EdgeSocketIngressServer.start()
-> receiveLoop()
-> accept one collector
-> drainBacklogConnections()
-> suspendServerSocket()
-> authenticateCollector()
-> receiveFromCollector()
-> __BATCH__ -> handleBatchRecord()
-> __COMMIT__ -> handleCommitRequest()
state path
-> EdgeSocketSourceReader.handleBatchRecord()
-> recordQueue.offer(decoded)
-> EdgeSocketSourceState.markRecordReceived(batchId)
checkpoint path
-> EdgeSocketSourceReader.snapshotState(checkpointId)
-> EdgeSocketSourceState.snapshotState(checkpointId, queueSnapshot)
-> EdgeSocketSourceReader.notifyCheckpointComplete(checkpointId)
-> EdgeSocketSourceState.notifyCheckpointComplete(checkpointId)
-> collector polls __COMMIT__:batchId
-> EdgeSocketSourceState.buildCommitResponse(batchId)
-> PENDING or ACK:<watermark>
Findings
- The previous blocker around the single-collector contract is materially improved on the current head. The runtime path now enforces it explicitly instead of leaving it as an implicit assumption.
- The checkpoint ACK path is now coherent: the collector only gets
ACK:<watermark>after completed-checkpoint progress is reflected inEdgeSocketSourceState. - The new tests cover the key runtime boundaries I was worried about before: single-collector ownership, queue replay after restore, and packet encryption/compression handling.
- I do not see a reopened source-level blocker on the current revision.
Compatibility / Side Effects
- This is additive new connector behavior; I do not see a regression against existing connectors or a public API break.
- Queue / socket / state responsibilities are clearer than in the earlier head, which is good for maintainability.
- I do not see a new obvious concurrency or resource-release bug in the current runtime chain.
Issue
docs/en/connectors/source/EdgeSocket.md:250,docs/en/connectors/source/EdgeSocket.md:314,EdgeSocketIngressServer.java:244,EdgeSocketIngressServer.java:381- The docs currently say that when another collector is already active, the source "immediately replies
REJECTED". - The actual runtime behavior is slightly different: once one collector is active, the server socket is suspended, so a second collector will usually fail at TCP connect time;
REJECTEDis mainly the backlog-race fallback path drained bydrainBacklogConnections(). - This is not a merge blocker from my side, but the collector-facing protocol docs should describe both outcomes so clients implement the right retry / occupancy handling.
- Severity: medium.
- The docs currently say that when another collector is already active, the source "immediately replies
Tests / CI
- The visible
Buildis still red. - The current public summary only exposes the umbrella
Buildmatrix failure, not a newly isolated EdgeSocket-only failing job, so I do not yet have evidence of a reopened source-level regression tied specifically to the logic above. - I did recheck the code path itself on the latest head, and that source-level recheck is why I am not carrying my earlier blocker forward on this revision.
Conclusion: can merge after CI is green
Blocking items:
- No reopened source-level blocker from this review.
- The current visible gate is still the red
Build, so I would wait for green CI (or a maintainer-confirmed unrelated rerun result) before merging.
Suggested non-blocking follow-up:
- Update the collector protocol docs so they reflect the real "TCP connect refusal most of the time,
REJECTEDin the backlog-race window" behavior.
Overall, this head is materially stronger than the one I blocked earlier. The key runtime contract is now expressed in code, state, docs, and tests, and I do not see a new source-path reason to keep request-changes on the current revision.
DanielLeens
left a comment
There was a problem hiding this comment.
Sorry for missing these in the last review — that's on me. I re-reviewed the latest head from scratch, and I want to flag two more items I should have caught earlier. Both are carryovers from the previous version, not newly introduced issues.
Thanks for the latest update. The earlier single-collector/runtime-contract blocker looks materially better on the current head: the server-side lifecycle is now explicit in EdgeSocketIngressServer, and the collector-facing docs are much clearer about the Connection refused vs REJECTED behavior.
What This PR Fixes
- User pain: an external edge collector needs a lightweight socket ingress into a running Zeta job.
- Fix approach: this PR adds the
EdgeSocketsource, collector protocol, checkpoint polling semantics, and the corresponding docs / UT / E2E coverage. - One-line summary: the Phase 1 direction is much clearer now, but the latest head still has one source-level correctness blocker in the checkpoint ACK contract, plus one remaining config-contract mismatch.
Runtime Chain Rechecked
collector request path
-> EdgeSocketIngressServer.dispatchRequest(...)
-> __BATCH__ -> EdgeSocketSourceReader.handleBatchRecord(...)
-> __COMMIT__ -> EdgeSocketSourceReader.handleCommitRequest(...)
-> EdgeSocketSourceState.buildCommitResponse(batchId)
checkpoint / restore path
-> EdgeSocketSourceReader.snapshotStateToBytes(checkpointId)
-> EdgeSocketSourceState.snapshotState(...)
-> EdgeSocketSourceReader.notifyCheckpointComplete(checkpointId)
-> EdgeSocketSourceState.notifyCheckpointComplete(...)
-> EdgeSocketSourceReader.restoreState(...)
-> EdgeSocketSourceState.restoreState(...)
config / metadata path
-> EdgeSocketSourceFactory.optionRule()
-> SeaTunnelConfValidateCommand.validate(...)
-> MetadataExportCommand discovery/export path
Key Findings
- The normal
__COMMIT__path absolutely hitsEdgeSocketSourceState.buildCommitResponse(...)on every collector-side checkpoint poll. - The current state model still keeps one global
lastReceivedBatchId/lastCommittedBatchIdnamespace, but the protocol docs still do not definebatchIdas globally monotonic across reconnects / restores. - The current E2E helper still restarts
batchIdfrom1for each new socket session, which shows the published contract is still not explicit enough. - Two documented backpressure options are still implemented in runtime but missing from
EdgeSocketSourceFactory.optionRule(), so validation / metadata export still cannot describe the real connector contract end to end.
Findings
Issue 1: the checkpoint ACK watermark is still global, but the protocol does not scope batchId to a globally monotonic namespace
- Location:
EdgeSocketSourceState.buildCommitResponse()/restoreState(),EdgeSocketSourceReader.handleCommitRequest(),AbstractEdgeSocketIT.sendRecordsThroughCollector()/sendPacketRecords()/sendEncryptedPacketRecords(), and the__COMMIT__section indocs/en/connectors/source/EdgeSocket.md. - Why this is a problem:
EdgeSocketSourceStatepersists one globallastCommittedBatchIdand one globallastReceivedBatchId.buildCommitResponse(batchId)returnsACK:<watermark>as soon asbatchId <= lastCommittedBatchId.restoreState(...)brings that committed watermark back after restart.- But the docs only say that
batchIdis a positivelong; they do not say it must remain globally monotonic across reconnects / restarts. - The current E2E helpers still restart
batchIdfrom1for each new socket session, which is exactly the ambiguity here.
- Simple example:
- Session A previously committed up to watermark
100. - The worker restores that state.
- A new collector session starts again with
__BATCH__:1:...and later polls__COMMIT__:1. - Today
buildCommitResponse(1)can immediately returnACK:100, and the collector docs tell the sender to evict every buffered batch whosebatchId <= 100. - That means a brand-new session can be mistaken for already checkpoint-confirmed data.
- Session A previously committed up to watermark
- Risk: this is a real correctness blocker on the main checkpoint-ACK path. In the reconnect / restore case, the collector can discard not-yet-confirmed data based on an old watermark.
- Suggested fix:
- Option A (smaller Phase 1 patch): explicitly require
batchIdto stay globally monotonic for the full logical source lifetime, including reconnect / restore, and then align the docs, examples, and E2E helpers with that contract. - Option B (more future-proof): add a session / epoch dimension to the protocol and keep ACK state scoped by that identity instead of one global batch namespace.
- For this PR, I would accept Option A as the smallest safe boundary, but the current head needs one of these two contracts to be explicit and consistently implemented.
- Option A (smaller Phase 1 patch): explicitly require
- Severity: High
Issue 2: two documented runtime options are still missing from EdgeSocketSourceFactory.optionRule()
- Location:
EdgeSocketSourceFactory.optionRule(),EdgeSocketSourceOptions,docs/en/connectors/source/EdgeSocket.md,SeaTunnelConfValidateCommand, andMetadataExportCommand. - Problem:
queue_backpressure_watermark_ratioandqueue_full_retry_after_msare both documented and used by the runtime config / queue logic.- But they are still not listed in
EdgeSocketSourceFactory.optionRule(). - SeaTunnel config validation, unknown-key checks, and metadata export all depend on that
OptionRulecontract.
- Risk: users can follow the docs and still hit validation / metadata mismatches, and SeaTunnel Web / metadata consumers still cannot see the full real connector contract.
- Suggested fix: add both options to
optionRule().optional(...)and keep one focused regression test so the factory contract cannot silently drift away from the runtime/doc contract again. - Severity: Medium
Tests / CI
- The latest
Buildcheck is still queued as I write this review. - My blocking conclusion above is source-level and independent of that CI result.
Conclusion: can merge after fixes
- Blocking items
- Issue 1: make the
batchId/ACK:<watermark>contract safe across reconnect and restore. Right now the protocol/state model still allows a new session to be acknowledged by an old committed watermark.
- Suggested follow-up
- Issue 2: complete the
OptionRulecontract so validation / metadata export match the runtime and docs. - Merge only from a green head once the current
Buildfinishes.
Overall, I think this head is significantly better than the earlier revisions, and I do not see the previous single-collector blocker reopened. The remaining blocker is narrower now, but it is still a correctness issue on the checkpoint-ACK contract, so I do not think this is merge-ready yet.
DanielLeens
left a comment
There was a problem hiding this comment.
Thanks for the update. I re-reviewed the latest head and retraced the EdgeSocket commit/restore path. On the current head I do not see a new source-level blocker, but the latest Build is still red, so this is still waiting on CI rather than source corrections from my side.
What this PR solves
- User pain point
Edge collectors need unambiguous feedback after reconnects and restores: whether a batch is already covered, still pending, unknown, or must be resent. - Fix approach
The latest head restores the missing backpressure options inOptionRule, addssessionFloorWatermark/RESENDhandling inEdgeSocketSourceState, and updates the protocol docs. - One-line summary
This revision makes the EdgeSocket commit/restore protocol much clearer, and I do not see a new source-level blocker on the latest head.
Runtime path I checked
Collector sends __BATCH__ / __COMMIT__
-> EdgeSocketSourceState.markRecordReceived()/markRecordEmitted()
-> checkpoint snapshotState(...)
-> restoreState(...) restores watermark + pending/drained state
-> buildCommitResponse(batchId)
-> ACK / PENDING / RETRY / RESEND
Key findings
- The main collector protocol definitely hits
buildCommitResponse(...); this is not an edge path. - The missing config-contract gap is fixed:
EdgeSocketSourceFactory.optionRule()now includesqueue_backpressure_watermark_ratioandqueue_full_retry_after_ms(EdgeSocketSourceFactory.java:43-58). - The restored-session behavior is clearer now:
restoreState()records asessionFloorWatermark, andbuildCommitResponse()can returnRESENDfor in-flight batches from the previous session that were not re-received yet (EdgeSocketSourceState.java:78-168). - The new tests cover the option-rule contract and several restore/watermark cases (
EdgeSocketFactoryTest.java:47-60,EdgeSocketSourceStateTest.java:145-204). - I did not find a new source-level blocker on the latest head.
Test / CI notes
- The tests I reviewed are in-memory state-machine checks and do not introduce an obvious flaky pattern.
- The latest contributor
Buildis still red across multiple lanes, includingunit-test (8, ubuntu-latest),kafka-connector-it (11, ubuntu-latest), andpaimon-connector-it (11, ubuntu-latest), so CI still needs to be cleaned up before merge.
Conclusion: can merge after fixes
- Blocking items
- Please get the latest
Buildgreen before merge.
- Suggested but non-blocking improvements
- A focused unit test for the explicit
RESENDbranch would still be nice to add later, but I am not blocking on that in this revision.
Overall, the latest head closes the main config-contract and restore-semantics gaps I was worried about. From the source side, I do not have a new blocker on this revision; the remaining gate is the red CI matrix.
DanielLeens
left a comment
There was a problem hiding this comment.
Thanks for the update. I re-reviewed the latest head. Compared with the last Daniel-reviewed head 917ec0dbe0b6bf7c044b3eb6dbf05a97e4eeea34, the new commit only updates the in-source comments around EdgeSocketSourceState; I did not find a new source-level blocker in the current implementation.
What problem this PR solves
- User pain point
EdgeSocket ingress needs a recoverable batch protocol so the collector can distinguishACK,PENDING,RETRY, andRESENDacross checkpoint, failover, and reader restart. - Fix approach
The broader PR series already fixed the ingress state-machine behavior. This latest head mainly adds explanatory comments to the core state holder so the restore/commit semantics are easier to maintain correctly. - One-line summary
The latest incremental update is a readability improvement on top of the already-fixed EdgeSocket ingress state machine.
1. Code change review
1.1 Core logic analysis
The main runtime path is still:
__BATCH__
-> markRecordReceived(batchId)
-> pending count / lastReceivedBatchId
pollNext
-> markRecordEmitted(batchId)
-> drainedBatchIds
checkpoint
-> snapshotState(checkpointId, queueSnapshot)
restart
-> restoreState(bytes)
-> sessionFloorWatermark + queued records re-offer
__COMMIT__
-> buildCommitResponse(batchId)
-> ACK / PENDING / RETRY / RESEND
On the latest head, the decision code in buildCommitResponse, snapshotState, restoreState, and checkpoint watermark handling is unchanged relative to the last current-head review. The new diff is comment-only and explains the same behavior more clearly.
1.2 Compatibility impact
Fully compatible relative to the previously reviewed current head. No protocol field, default, serialization layout, or restore behavior changed in this new delta.
1.3 Performance / side effects
No new CPU, memory, GC, network, concurrency, retry, idempotency, or resource-release risk from the latest head delta.
1.4 Error handling and logging
No new source-level blocking issue found.
2. Code quality evaluation
2.1 Code style
These comments are actually helpful because they explain the recovery-sensitive parts of the state machine instead of adding noise.
2.2 Test coverage and stability
The latest incremental commit does not add new tests, but it also does not change the behavior that was previously reviewed.
2.3 Documentation
No additional docs update is required for this comment-only delta.
3. Architecture
3.1 Solution quality
This latest delta is a maintainability improvement, not a new architectural change.
3.2 Maintainability
Better than before. This is exactly the kind of checkpoint/retry state logic that benefits from explicit source comments.
3.3 Extensibility
No direct functional extension, but it lowers the risk of future regressions in the ingress state machine.
3.4 Historical compatibility
No new compatibility risk introduced by the latest head delta.
4. Issue summary
No new source-level blocking issue found.
5. Merge conclusion
Conclusion: can merge
-
Blocking items
No source blocker from the latest head delta. -
Suggested follow-ups
None.
From the source-review perspective, the latest head looks good to me. The remaining gate is just waiting for the Build check to finish.
a37bdbd to
d2dc782
Compare
d2dc782 to
c8787a2
Compare
DanielLeens
left a comment
There was a problem hiding this comment.
Thanks for the update. I re-reviewed the latest head from scratch.
Compared with the head Daniel reviewed previously (f9f86eca5e20aa336f46605a6ec0697e859eb29c), the latest delta is only comment cleanup and newline normalization:
DefaultEdgeSocketRecordQueue.java -1
EdgeSocketIngressServer.java -5
EdgeSocketResponseCode.java -1
IncomingRecordHandler.java -4
EdgeSocketSourceReader.java -5
EdgeSocketLogUtils.java -6
log4j2-test.properties +1/-1
SeaTunnelEngine.yaml +1/-1
Even so, I retraced the full runtime path end to end on the current head.
What this PR fixes
- User pain point: Zeta did not have a lightweight, checkpoint-aware edge-ingress source for remote collectors.
- Fix approach: this PR adds the
EdgeSocketsource runtime, including TCP ingress, single-collector enforcement, batch receive / commit polling, checkpoint-aware watermarking, docs, and E2E coverage. - One-line summary: this introduces a real edge-collector ingestion path into Zeta, with the protocol and checkpoint semantics defined in code instead of left implicit.
Full runtime chain I rechecked
Edge collector connection
-> EdgeSocketSourceReader.open() [EdgeSocketSourceReader.java:80-83]
-> EdgeSocketIngressServer.start() [EdgeSocketIngressServer.java:66-76]
-> receiveLoop() [176-260]
-> openServerSocketWithRetry() [103-154]
-> accept collector
-> authenticateCollector() [281-309]
Batch ingestion
-> "__BATCH__:<batchId>:<payload>"
-> EdgeSocketIngressServer.dispatchRequest() [335-369]
-> EdgeSocketSourceReader.handleBatchRecord() [162-220]
-> decode payload
-> enqueue record
-> sourceState.markRecordReceived(batchId)
-> return RECEIVED / QUEUE_FULL / RETRY / DECRYPT_FAILED
Checkpoint coordination
-> pollNext() [95-106]
-> emitRecordSafely() [108-123]
-> sourceState.markRecordEmitted(batchId)
-> snapshotStateToBytes(checkpointId) [126-130]
-> sourceState.snapshotState(...) [158-187]
-> notifyCheckpointComplete(checkpointId) [148-152]
-> sourceState.notifyCheckpointComplete(checkpointId) [255-264]
Collector commit polling
-> "__COMMIT__:<batchId>"
-> EdgeSocketIngressServer.dispatchRequest() [337-344]
-> EdgeSocketSourceReader.handleCommitRequest() [222-226]
-> EdgeSocketSourceState.buildCommitResponse() [127-144]
-> ACK:<watermark> / PENDING / RETRY / RESEND
Findings
- The normal runtime path definitely hits the new code. This is not dead or purely auxiliary logic.
- The single-collector restriction is intentional and consistently enforced in both code and docs.
- The
ACK / PENDING / RETRY / RESEND / QUEUE_FULL / DECRYPT_FAILEDcontract is aligned across runtime code, unit tests, E2E, and docs. - I did not find a reopened source-level blocker on the current head.
Tests
EdgeSocketSourceReaderTestcovers the important reader-side checkpoint and single-collector paths.- The tests use explicit condition waiting (
Awaitility) instead of fragile fixed sleeps for correctness checks. - The E2E retry loops are deadline-bounded retry/backoff loops, not one-shot timing assumptions.
- My stability verdict for the new UT/E2E coverage is: Stable.
Compatibility / side effects
- Existing jobs are unaffected. This is an additive connector capability.
- The main operational tradeoff is explicit and documented: one collector at a time, and
parallelism = 1. - Backpressure is handled through bounded queue capacity plus
QUEUE_FULL:<ms>responses, which is the right shape for this phase.
Conclusion: can merge after fixes
Blocking items:
- I do not see a new source-level blocker on the current head.
- The latest GitHub
Buildonf7c56f60d2f6f707229f61225043cc6fb6d1661eiscancelled, so please rerun/get a green Build on the current head before merge.
Non-blocking follow-up:
- None from my side on the current runtime path.
Overall, the feature design still holds up on the latest head, and the post-review delta does not reopen the runtime logic Daniel already validated. The remaining gate is CI state on the current head rather than a code-path blocker.
…rification and use EdgeSocketConfig for built-in verification uniformly
DanielLeens
left a comment
There was a problem hiding this comment.
Thanks for the update. I re-reviewed the latest head from scratch.
What problem this PR solves
- User pain point
Edge collectors need a checkpoint-aware ingress protocol so they can distinguishACK,PENDING,RETRY, andRESENDcorrectly across restart and recovery. - Fix approach
This PR adds the EdgeSocket source runtime, ingress server, queueing / checkpoint state, docs, and E2E coverage. - One sentence
The latest delta is small and does not reopen the runtime semantics Daniel already checked in depth.
Compared with the last Daniel-reviewed head, the new commit only removes a redundant queue verification / matching test lines. I still retraced the real runtime path:
collector connection
-> EdgeSocketIngressServer.start()
-> authenticateCollector()
batch ingestion
-> handleBatchRecord()
-> queue.offer(...)
-> sourceState.markRecordReceived(batchId)
checkpoint / recovery
-> pollNext()
-> sourceState.markRecordEmitted(batchId)
-> snapshotState(...)
-> notifyCheckpointComplete(...)
-> buildCommitResponse()
-> ACK / PENDING / RETRY / RESEND
Key findings:
- The normal runtime path still hits the same state machine Daniel previously validated.
- The latest delta does not change the queue semantics, commit protocol, or checkpoint state layout.
- I do not see a reopened source-level blocker on the current head.
1. Code change review
1.1 Core logic analysis
DefaultEdgeSocketRecordQueue.offer(...), snapshot(), and isBackpressure() still keep the same semantics on the latest head. The current docs also still correctly explain the passive source deployment constraint and the tag_filter pinning path.
1.2 Compatibility impact
Fully compatible relative to the previously reviewed current implementation. No protocol/default/serialization contract changed in this latest delta.
1.3 Performance / side effects
No new CPU, memory, GC, network, concurrency, retry, idempotency, or resource-release risk surfaced from the latest change.
1.4 Error handling and logging
No new source-level blocking issue found.
2. Code quality evaluation
2.1 Code style
The current head is slightly cleaner after removing the redundant verification.
2.2 Test coverage and stability
The existing unit/E2E coverage Daniel reviewed earlier still stands, and I do not see a new flaky-test pattern introduced by the latest delta.
Test stability rating: Stable.
2.3 Documentation
No additional documentation blocker from this round.
3. Architecture
3.1 Solution quality
The feature design still holds up on the latest head.
3.2 Maintainability
The latest cleanup slightly improves maintainability without changing behavior.
3.3 Extensibility
No new extensibility concern surfaced in this round.
3.4 Historical compatibility
No new compatibility risk introduced by the current delta.
4. Issue summary
No new source-level blocking issue found.
5. Merge conclusion
Conclusion: can merge
- Blocking items
- No code blocker from my side on the latest head.
- Suggested but non-blocking follow-ups
- The latest GitHub
Buildis still pending, so please wait for the updated Build result before the actual merge.
From the source-review side, the current head still looks good to me.
Purpose of this pull request
Related: #10666
Phase 2 of STIP-24: lightweight edge collector ingress in Zeta, implemented as the EdgeSocket source for remote collection.
This PR delivers the engine-side MVP data path: edge client connects → token auth → payload deserialize → existing transform/sink pipeline in Zeta.
Out of scope for this phase: edge-side collector runtime (standalone agent), edge sink, and edge-cluster control plane.
Changes:
EdgeSocketsource connector for Zeta (factory, reader, deserialization, option model).seatunnel.source.EdgeSocket.connector-edge-socket-e2e(MySQL 8.4 sink + SQL transform; container bootstrap + JDBC driver injection).docs/en,docs/zh).Does this PR introduce any user-facing change?
Yes — no impact on existing jobs.
EdgeSocketsource for edge ingestion.seatunnel.source.EdgeSocket.How was this patch tested?
EdgeSocketFactoryTest(factory and config validation).EdgeSocketMysql8_4IT): start Zeta job with EdgeSocket source; simulated edge collector sends records; SQL transform; write to MySQL sink and verify rows.