[Improve][Transform-V2] Improve embedding model invocation reliability#10863
[Improve][Transform-V2] Improve embedding model invocation reliability#10863yzeng1618 wants to merge 6 commits into
Conversation
Add a shared nlpmodel invocation runtime with retry, timeout, error classification, response count validation, safe logging, metrics hooks, and cache boundary. Route OpenAI, Doubao, Qianfan, Custom, and Zhipu embedding calls through ProviderAdapter and ModelInvocationRuntime while keeping default retry behavior compatible. Document request-level batching, reliability options, idempotency boundaries, and cache/logging behavior for English and Chinese Embedding docs.
…nd add comments for RAG metadata contract
…nd add comments for RAG metadata contract
DanielLeens
left a comment
There was a problem hiding this comment.
Thanks for the contribution! I went through the full diff and the direction makes sense. The new common invocation runtime is wired into the normal embedding path correctly, the new retry/timeout options are actually exposed through the factory contract, and I did not find a blocking issue in the current revision.
What problem this PR solves
- User pain point
Embedding providers currently handle failures in a fragmented way. Timeout behavior, retry behavior, response-count validation, and safe diagnostics are not centralized, so users get inconsistent behavior across providers when they hit rate limits, transient 5xx responses, or mismatched output sizes. - Fix approach
This PR introduces a sharedModelInvocationRuntime,ModelInvocationOptions, andProviderAdapterboundary. Providers still own provider-specific request/response logic, but retry, timeout propagation, output-count validation, and safe logging move into one common runtime path. - One-line summary
This turns embedding invocation from “each provider handles reliability separately” into “shared runtime reliability with provider-specific protocol adapters”.
1. Code review
1.1 Core logic analysis
Precise change scope
The main pieces are:
-
common runtime layer
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/ModelInvocationRuntime.java:28-196
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/ProviderAdapter.java:22-38 -
config exposure
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/ModelTransformConfig.java:81-113
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embedding/EmbeddingTransformFactory.java:42-84 -
runtime entry points
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embedding/EmbeddingTransform.java:89-203
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embedding/remote/AbstractModel.java:43-77 -
provider integration and tests
.../remote/openai/OpenAIModel.java
.../remote/doubao/DoubaoModel.java
.../remote/qianfan/QianfanModel.java
.../remote/custom/CustomModel.java
seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/nlpmodel/ModelInvocationRuntimeTest.java:31-199
seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/embedding/EmbeddingModelDimensionTest.java:51-214
Before / after
The new common entry point is:
for (int attempt = 1; attempt <= options.getRetryMaxAttempts(); attempt++) {
ModelInvocationContext context = new ModelInvocationContext(...);
try {
T output = adapter.invoke(inputs, context);
validateOutputCount(...);
return output;
} catch (IOException e) {
handleInvocationException(normalize(e, context), context, attempt, start);
} catch (RuntimeException e) {
handleInvocationException(normalize(e, context), context, attempt, start);
}
}And the embedding factory now really exposes the new options:
.optional(
EmbeddingTransformConfig.API_PATH,
EmbeddingTransformConfig.SINGLE_VECTORIZED_INPUT_NUMBER,
EmbeddingTransformConfig.PROCESS_BATCH_SIZE,
ModelTransformConfig.MODEL_RETRY_MAX_ATTEMPTS,
ModelTransformConfig.MODEL_RETRY_BACKOFF_MS,
ModelTransformConfig.MODEL_RETRY_MAX_BACKOFF_MS,
ModelTransformConfig.MODEL_REQUEST_TIMEOUT_MS)Key findings
- The normal path definitely hits the new runtime, because
EmbeddingTransform.open()buildsModelInvocationOptionsandAbstractModelnow ownsModelInvocationRuntime. - The current version centralizes the three most important reliability behaviors: request timeout, retry for retryable failures, and response-count validation.
- The latest fix for dimension initialization is effective: dimension probing now bypasses the batch-size response-count mismatch trap where appropriate.
- The docs, option definitions, and factory contract are now aligned for the new retry/timeout options.
Full runtime chain
Transform initialization
-> EmbeddingTransform.open() [89-203]
-> read provider / model / api_path / retry / timeout from config [91-124]
-> build the concrete provider model [97-187]
-> probe model.dimension() for output dimension [198]
Shared invocation layer
-> AbstractModel.vectorization(...) [49-77]
-> batchProcess(fields, singleVectorizedInputNumber) [62-76]
-> vector(batch)
-> provider enters invocationRuntime.invoke(...)
Common runtime
-> ModelInvocationRuntime.invoke(...) [47-81]
-> create ModelInvocationContext [49-57]
-> adapter.invoke(inputs, context) [60]
-> validateOutputCount(...) [61, 87-103]
-> on failure normalize(...) + handleInvocationException(...) [69-72, 105-156]
-> timeout / rate limit / temporary server failures can retry
-> parse failures / auth failures / count mismatches do not retry
Contract exposure
-> ModelTransformConfig defines retry / timeout options [81-113]
-> EmbeddingTransformFactory.optionRule() exposes them [42-84]
-> docs/en/transforms/embedding.md:31-89
-> docs/zh/transforms/embedding.md
1.2 Compatibility impact
Conclusion: fully compatible.
- API: no breaking API change
- Configs: only optional new configs added
- Defaults:
model_retry_max_attempts = 1, so the old no-retry behavior stays intact by default - Protocol: providers still own their provider-specific request/response protocol
- Serialization: unchanged
- Historical behavior: existing jobs keep the same behavior unless they explicitly opt into retries/backoff
1.3 Performance / side effects
- CPU / memory / GC: small added runtime overhead only at the remote invocation boundary
- Network: retries are bounded and disabled by default
- Concurrency: no new global mutable state introduced
- Idempotency: the docs are explicit that repeated attempts may still be billed or have provider-side effects
- Resource lifecycle: providers still close their own
CloseableHttpClient
1.4 Error handling and logging
I did not find a blocking code issue in the current revision.
Good points here:
ModelInvocationRuntime.logInvocationFailure(...)records safe diagnostic context only.- The tests explicitly verify that provider response bodies with secrets/raw text are not surfaced directly in exception messages.
- Response-count mismatch fails fast instead of emitting misaligned vectors.
CI note:
- GitHub
Buildis green on the latest head.
2. Code quality
2.1 Code style
The new runtime classes are reasonably clear, and the core responsibility split is better than before.
2.2 Test coverage
The added tests cover the right regression surface:
- default retry policy attempts once,
- 429 / 5xx retry paths,
- non-retryable auth / parse / response-count failures,
- and the dimension-probing path that should not be rejected by batch response-count validation.
2.3 Documentation
This is a user-visible behavior improvement, so docs are required, and they are present in both English and Chinese:
docs/en/transforms/embedding.md:31-89docs/zh/transforms/embedding.md
The default behavior is also documented honestly: no retries unless the user opts in.
3. Architecture assessment
3.1 Solution quality
This is a strong long-term direction. The PR moves shared reliability concerns into one runtime instead of continuing to duplicate them inside each provider.
3.2 Maintainability
Maintainability is clearly better now. Adding a new provider no longer requires re-implementing the same retry / timeout / validation skeleton.
3.3 Extensibility
The new runtime/context/error-type split gives you a clean place for future provider growth and for later reliability improvements.
3.4 Historical compatibility
No migration burden introduced by this change.
4. Issue summary
| No. | Issue | Location | Severity |
|---|---|---|---|
| - | No formal issue found in the current revision | - | - |
5. Merge conclusion
Conclusion: can merge
-
Blocking items
- No code blocker from my side in the current revision.
-
Non-blocking suggestions
- If you want to harden this further later, I would consider adding one more regression test around transient plain network
IOExceptioncases. That is not a blocker for this PR, though.
- If you want to harden this further later, I would consider adding one more regression test around transient plain network
Overall, this is more than just “adding a few config options”. It gives embedding invocation a real shared reliability boundary, keeps the old default behavior compatible, and backs the new behavior with tests and docs. From the current code path, I’m good with merge.
DanielLeens
left a comment
There was a problem hiding this comment.
Thanks for the update. I re-reviewed the latest head and retraced the remote embedding invocation path. On the current revision I do not see a new source-level blocker; the remaining gate is still the red CI signal.
What this PR solves
- User pain point
Remote embedding calls previously lacked a shared runtime for timeout handling, retries, error normalization, and response-count validation, so provider behavior could drift and failures were harder to reason about. - Fix approach
This PR introducesModelInvocationRuntime,ModelInvocationContext, andModelInvocationOptions, then routes the provider implementations through that shared runtime. - One-line summary
This is a good infrastructure cleanup for remote model invocation, and I do not see a new source-level blocker on the latest head.
Runtime path I checked
Task startup
-> EmbeddingTransform.open() [EmbeddingTransform.java:89-204]
-> ModelInvocationOptions.fromConfig(...)
-> build provider-specific model
Runtime remote invocation
-> provider adapter / AbstractModel
-> ModelInvocationRuntime.invoke(...) [47-80]
-> build ModelInvocationContext
-> adapter.invoke(inputs, context)
-> validateOutputCount(...)
-> retry / backoff / timeout / error normalization [87-195]
Key findings
- The normal remote-embedding path definitely hits the new runtime; this is not just a side helper.
- The runtime now centralizes retry/backoff, timeout propagation, error normalization, and output-count validation.
ModelInvocationRuntimeTestcovers rate limit, server-side temporary failure, authentication failure, response-count mismatch, and response-parse failure with deterministic in-memory adapters.- I did not find a new source-level blocker on the latest head.
Test / CI notes
- The new tests are structurally stable from a source perspective: they are in-memory unit tests with no timing-sensitive sleeps or external service dependencies.
- The latest
Buildis still failing from GitHub metadata, and the fork-side run state aroundchanges/Check Helm Chart Syntaxis not clean yet, so CI still needs to be resolved before merge.
Conclusion: can merge after fixes
- Blocking items
- Please get the latest
Buildgreen before merge.
- Suggested but non-blocking improvements
- No new source-side changes are required from my side on the current head.
Overall, the shared invocation-runtime design looks sound on the latest head, and I do not have a new source-level blocker for this revision. The remaining gate is CI.
…nd add comments for RAG metadata contract
DanielLeens
left a comment
There was a problem hiding this comment.
Thanks for the update. I re-reviewed the latest head and retraced the embedding invocation path again.
What problem this PR solves
- User pain point
Remote embedding calls need one shared place for timeout handling, retries, cache/metrics hooks, and response validation instead of each provider solving those concerns differently. - Fix approach
This PR extends the sharedModelInvocationRuntimewith cache / metrics integration and threads that contract through the provider adapters and embedding models. - One sentence
The latest head still looks sound from the source side, and I do not see a reopened code-path blocker.
Runtime chain I checked:
EmbeddingTransform.open()
-> ModelInvocationOptions.fromConfig(...)
-> build provider-specific model
-> AbstractModel / runtime invocation
-> ModelInvocationRuntime.invoke(...)
-> cache lookup / partial miss merge
-> invokeWithRetries(...)
-> metrics hooks
-> output-count validation
Key findings:
- The normal embedding path definitely hits the new cache / metrics logic.
ModelInvocationRuntimeCacheTestverifies full-hit, partial-hit, and cache-store behavior without introducing timing-sensitive patterns.- I do not see a reopened source-level blocker on the current head.
1. Code change review
1.1 Core logic analysis
The latest delta mainly adds cache-key / cache-runtime / metrics coverage and keeps the runtime flow coherent. The cache merge path in ModelInvocationRuntime.invoke(...) still preserves input order after partial misses, which is the most important correctness point for this follow-up.
1.2 Compatibility impact
Fully compatible from the current source-review perspective. The new cache/metrics surfaces are additive and the default behavior remains unchanged when those hooks are not used.
1.3 Performance / side effects
The main new cost is optional cache-key / metrics bookkeeping around remote invocation boundaries. I did not find a new CPU, memory, concurrency, retry, or resource-release blocker in the current implementation.
1.4 Error handling and logging
No new source-level blocking issue found.
2. Code quality evaluation
2.1 Code style
The added runtime/cache tests make the contract easier to reason about.
2.2 Test coverage and stability
The new cache/metrics tests are deterministic in-memory tests. I did not see a flaky-test pattern in the added coverage.
Test stability rating: Stable.
2.3 Documentation
The updated English/Chinese embedding docs still match the runtime direction.
3. Architecture
3.1 Solution quality
This is a good follow-up to the shared runtime direction.
3.2 Maintainability
Better than before. Cache/metrics concerns now live in the same shared runtime instead of leaking into provider-specific code.
3.3 Extensibility
The current split is a good base for future provider/runtime features.
3.4 Historical compatibility
No historical-compatibility blocker found on the latest head.
4. Issue summary
No new source-level blocking issue found.
5. Merge conclusion
Conclusion: can merge after fixes
- Blocking items
- No code blocker from my side on the latest head.
- The current
Buildis still red from GitHub metadata. The visible failed lanes areengine-v2-it (11),unit-test (8), andconnector-file-sftp-it (8), with several other jobs cancelled, so CI still needs to be cleared before merge.
- Suggested but non-blocking follow-ups
- None from the source side in this round.
From the source-review perspective, the latest head still looks good to me. The remaining gate is CI rather than code-path correctness.
Purpose of this pull request
This PR improves the Embedding transform remote model invocation reliability and fixes embedding vector dimension initialization.
Does this PR introduce any user-facing change?
Yes.
This PR adds new optional Embedding transform configuration options for remote model invocation retry and timeout control. Existing jobs keep the previous behavior by default because
model_retry_max_attemptsdefaults to1.It also fixes an issue where the output vector dimension could be initialized incorrectly when batch embedding responses contain multiple items. After this change, the output vector dimension is derived from the actual embedding vector length.
How was this patch tested?
Added unit tests for the new and fixed behavior
Check list
New License Guide
incompatible-changes.mdto describe the incompatibility caused by this PR.