Skip to content

[Improve][Transform-V2] Improve embedding model invocation reliability#10863

Open
yzeng1618 wants to merge 6 commits into
apache:devfrom
yzeng1618:dev-embedding
Open

[Improve][Transform-V2] Improve embedding model invocation reliability#10863
yzeng1618 wants to merge 6 commits into
apache:devfrom
yzeng1618:dev-embedding

Conversation

@yzeng1618
Copy link
Copy Markdown
Collaborator

Purpose of this pull request

This PR improves the Embedding transform remote model invocation reliability and fixes embedding vector dimension initialization.

Does this PR introduce any user-facing change?

Yes.

This PR adds new optional Embedding transform configuration options for remote model invocation retry and timeout control. Existing jobs keep the previous behavior by default because model_retry_max_attempts defaults to 1.

It also fixes an issue where the output vector dimension could be initialized incorrectly when batch embedding responses contain multiple items. After this change, the output vector dimension is derived from the actual embedding vector length.

How was this patch tested?

Added unit tests for the new and fixed behavior

Check list

zengyi added 4 commits May 8, 2026 18:31
Add a shared nlpmodel invocation runtime with retry, timeout, error classification, response count validation, safe logging, metrics hooks, and cache boundary.

Route OpenAI, Doubao, Qianfan, Custom, and Zhipu embedding calls through ProviderAdapter and ModelInvocationRuntime while keeping default retry behavior compatible.

Document request-level batching, reliability options, idempotency boundaries, and cache/logging behavior for English and Chinese Embedding docs.
Copy link
Copy Markdown
Contributor

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution! I went through the full diff and the direction makes sense. The new common invocation runtime is wired into the normal embedding path correctly, the new retry/timeout options are actually exposed through the factory contract, and I did not find a blocking issue in the current revision.

What problem this PR solves

  • User pain point
    Embedding providers currently handle failures in a fragmented way. Timeout behavior, retry behavior, response-count validation, and safe diagnostics are not centralized, so users get inconsistent behavior across providers when they hit rate limits, transient 5xx responses, or mismatched output sizes.
  • Fix approach
    This PR introduces a shared ModelInvocationRuntime, ModelInvocationOptions, and ProviderAdapter boundary. Providers still own provider-specific request/response logic, but retry, timeout propagation, output-count validation, and safe logging move into one common runtime path.
  • One-line summary
    This turns embedding invocation from “each provider handles reliability separately” into “shared runtime reliability with provider-specific protocol adapters”.

1. Code review

1.1 Core logic analysis

Precise change scope

The main pieces are:

  • common runtime layer
    seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/ModelInvocationRuntime.java:28-196
    seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/ProviderAdapter.java:22-38

  • config exposure
    seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/ModelTransformConfig.java:81-113
    seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embedding/EmbeddingTransformFactory.java:42-84

  • runtime entry points
    seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embedding/EmbeddingTransform.java:89-203
    seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embedding/remote/AbstractModel.java:43-77

  • provider integration and tests
    .../remote/openai/OpenAIModel.java
    .../remote/doubao/DoubaoModel.java
    .../remote/qianfan/QianfanModel.java
    .../remote/custom/CustomModel.java
    seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/nlpmodel/ModelInvocationRuntimeTest.java:31-199
    seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/embedding/EmbeddingModelDimensionTest.java:51-214

Before / after

The new common entry point is:

for (int attempt = 1; attempt <= options.getRetryMaxAttempts(); attempt++) {
    ModelInvocationContext context = new ModelInvocationContext(...);
    try {
        T output = adapter.invoke(inputs, context);
        validateOutputCount(...);
        return output;
    } catch (IOException e) {
        handleInvocationException(normalize(e, context), context, attempt, start);
    } catch (RuntimeException e) {
        handleInvocationException(normalize(e, context), context, attempt, start);
    }
}

And the embedding factory now really exposes the new options:

.optional(
    EmbeddingTransformConfig.API_PATH,
    EmbeddingTransformConfig.SINGLE_VECTORIZED_INPUT_NUMBER,
    EmbeddingTransformConfig.PROCESS_BATCH_SIZE,
    ModelTransformConfig.MODEL_RETRY_MAX_ATTEMPTS,
    ModelTransformConfig.MODEL_RETRY_BACKOFF_MS,
    ModelTransformConfig.MODEL_RETRY_MAX_BACKOFF_MS,
    ModelTransformConfig.MODEL_REQUEST_TIMEOUT_MS)

Key findings

  1. The normal path definitely hits the new runtime, because EmbeddingTransform.open() builds ModelInvocationOptions and AbstractModel now owns ModelInvocationRuntime.
  2. The current version centralizes the three most important reliability behaviors: request timeout, retry for retryable failures, and response-count validation.
  3. The latest fix for dimension initialization is effective: dimension probing now bypasses the batch-size response-count mismatch trap where appropriate.
  4. The docs, option definitions, and factory contract are now aligned for the new retry/timeout options.

Full runtime chain

Transform initialization
  -> EmbeddingTransform.open() [89-203]
      -> read provider / model / api_path / retry / timeout from config [91-124]
      -> build the concrete provider model [97-187]
      -> probe model.dimension() for output dimension [198]

Shared invocation layer
  -> AbstractModel.vectorization(...) [49-77]
      -> batchProcess(fields, singleVectorizedInputNumber) [62-76]
      -> vector(batch)
          -> provider enters invocationRuntime.invoke(...)

Common runtime
  -> ModelInvocationRuntime.invoke(...) [47-81]
      -> create ModelInvocationContext [49-57]
      -> adapter.invoke(inputs, context) [60]
      -> validateOutputCount(...) [61, 87-103]
      -> on failure normalize(...) + handleInvocationException(...) [69-72, 105-156]
          -> timeout / rate limit / temporary server failures can retry
          -> parse failures / auth failures / count mismatches do not retry

Contract exposure
  -> ModelTransformConfig defines retry / timeout options [81-113]
  -> EmbeddingTransformFactory.optionRule() exposes them [42-84]
  -> docs/en/transforms/embedding.md:31-89
  -> docs/zh/transforms/embedding.md

1.2 Compatibility impact

Conclusion: fully compatible.

  • API: no breaking API change
  • Configs: only optional new configs added
  • Defaults: model_retry_max_attempts = 1, so the old no-retry behavior stays intact by default
  • Protocol: providers still own their provider-specific request/response protocol
  • Serialization: unchanged
  • Historical behavior: existing jobs keep the same behavior unless they explicitly opt into retries/backoff

1.3 Performance / side effects

  • CPU / memory / GC: small added runtime overhead only at the remote invocation boundary
  • Network: retries are bounded and disabled by default
  • Concurrency: no new global mutable state introduced
  • Idempotency: the docs are explicit that repeated attempts may still be billed or have provider-side effects
  • Resource lifecycle: providers still close their own CloseableHttpClient

1.4 Error handling and logging

I did not find a blocking code issue in the current revision.

Good points here:

  1. ModelInvocationRuntime.logInvocationFailure(...) records safe diagnostic context only.
  2. The tests explicitly verify that provider response bodies with secrets/raw text are not surfaced directly in exception messages.
  3. Response-count mismatch fails fast instead of emitting misaligned vectors.

CI note:

  • GitHub Build is green on the latest head.

2. Code quality

2.1 Code style

The new runtime classes are reasonably clear, and the core responsibility split is better than before.

2.2 Test coverage

The added tests cover the right regression surface:

  • default retry policy attempts once,
  • 429 / 5xx retry paths,
  • non-retryable auth / parse / response-count failures,
  • and the dimension-probing path that should not be rejected by batch response-count validation.

2.3 Documentation

This is a user-visible behavior improvement, so docs are required, and they are present in both English and Chinese:

  • docs/en/transforms/embedding.md:31-89
  • docs/zh/transforms/embedding.md

The default behavior is also documented honestly: no retries unless the user opts in.

3. Architecture assessment

3.1 Solution quality

This is a strong long-term direction. The PR moves shared reliability concerns into one runtime instead of continuing to duplicate them inside each provider.

3.2 Maintainability

Maintainability is clearly better now. Adding a new provider no longer requires re-implementing the same retry / timeout / validation skeleton.

3.3 Extensibility

The new runtime/context/error-type split gives you a clean place for future provider growth and for later reliability improvements.

3.4 Historical compatibility

No migration burden introduced by this change.

4. Issue summary

No. Issue Location Severity
- No formal issue found in the current revision - -

5. Merge conclusion

Conclusion: can merge

  1. Blocking items

    • No code blocker from my side in the current revision.
  2. Non-blocking suggestions

    • If you want to harden this further later, I would consider adding one more regression test around transient plain network IOException cases. That is not a blocker for this PR, though.

Overall, this is more than just “adding a few config options”. It gives embedding invocation a real shared reliability boundary, keeps the old default behavior compatible, and backs the new behavior with tests and docs. From the current code path, I’m good with merge.

Copy link
Copy Markdown
Contributor

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. I re-reviewed the latest head and retraced the remote embedding invocation path. On the current revision I do not see a new source-level blocker; the remaining gate is still the red CI signal.

What this PR solves

  • User pain point
    Remote embedding calls previously lacked a shared runtime for timeout handling, retries, error normalization, and response-count validation, so provider behavior could drift and failures were harder to reason about.
  • Fix approach
    This PR introduces ModelInvocationRuntime, ModelInvocationContext, and ModelInvocationOptions, then routes the provider implementations through that shared runtime.
  • One-line summary
    This is a good infrastructure cleanup for remote model invocation, and I do not see a new source-level blocker on the latest head.

Runtime path I checked

Task startup
  -> EmbeddingTransform.open() [EmbeddingTransform.java:89-204]
      -> ModelInvocationOptions.fromConfig(...)
      -> build provider-specific model

Runtime remote invocation
  -> provider adapter / AbstractModel
      -> ModelInvocationRuntime.invoke(...) [47-80]
          -> build ModelInvocationContext
          -> adapter.invoke(inputs, context)
          -> validateOutputCount(...)
          -> retry / backoff / timeout / error normalization [87-195]

Key findings

  1. The normal remote-embedding path definitely hits the new runtime; this is not just a side helper.
  2. The runtime now centralizes retry/backoff, timeout propagation, error normalization, and output-count validation.
  3. ModelInvocationRuntimeTest covers rate limit, server-side temporary failure, authentication failure, response-count mismatch, and response-parse failure with deterministic in-memory adapters.
  4. I did not find a new source-level blocker on the latest head.

Test / CI notes

  • The new tests are structurally stable from a source perspective: they are in-memory unit tests with no timing-sensitive sleeps or external service dependencies.
  • The latest Build is still failing from GitHub metadata, and the fork-side run state around changes / Check Helm Chart Syntax is not clean yet, so CI still needs to be resolved before merge.

Conclusion: can merge after fixes

  1. Blocking items
  • Please get the latest Build green before merge.
  1. Suggested but non-blocking improvements
  • No new source-side changes are required from my side on the current head.

Overall, the shared invocation-runtime design looks sound on the latest head, and I do not have a new source-level blocker for this revision. The remaining gate is CI.

Copy link
Copy Markdown
Contributor

@DanielLeens DanielLeens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. I re-reviewed the latest head and retraced the embedding invocation path again.

What problem this PR solves

  • User pain point
    Remote embedding calls need one shared place for timeout handling, retries, cache/metrics hooks, and response validation instead of each provider solving those concerns differently.
  • Fix approach
    This PR extends the shared ModelInvocationRuntime with cache / metrics integration and threads that contract through the provider adapters and embedding models.
  • One sentence
    The latest head still looks sound from the source side, and I do not see a reopened code-path blocker.

Runtime chain I checked:

EmbeddingTransform.open()
  -> ModelInvocationOptions.fromConfig(...)
  -> build provider-specific model
  -> AbstractModel / runtime invocation
      -> ModelInvocationRuntime.invoke(...)
          -> cache lookup / partial miss merge
          -> invokeWithRetries(...)
          -> metrics hooks
          -> output-count validation

Key findings:

  1. The normal embedding path definitely hits the new cache / metrics logic.
  2. ModelInvocationRuntimeCacheTest verifies full-hit, partial-hit, and cache-store behavior without introducing timing-sensitive patterns.
  3. I do not see a reopened source-level blocker on the current head.

1. Code change review

1.1 Core logic analysis

The latest delta mainly adds cache-key / cache-runtime / metrics coverage and keeps the runtime flow coherent. The cache merge path in ModelInvocationRuntime.invoke(...) still preserves input order after partial misses, which is the most important correctness point for this follow-up.

1.2 Compatibility impact

Fully compatible from the current source-review perspective. The new cache/metrics surfaces are additive and the default behavior remains unchanged when those hooks are not used.

1.3 Performance / side effects

The main new cost is optional cache-key / metrics bookkeeping around remote invocation boundaries. I did not find a new CPU, memory, concurrency, retry, or resource-release blocker in the current implementation.

1.4 Error handling and logging

No new source-level blocking issue found.

2. Code quality evaluation

2.1 Code style

The added runtime/cache tests make the contract easier to reason about.

2.2 Test coverage and stability

The new cache/metrics tests are deterministic in-memory tests. I did not see a flaky-test pattern in the added coverage.

Test stability rating: Stable.

2.3 Documentation

The updated English/Chinese embedding docs still match the runtime direction.

3. Architecture

3.1 Solution quality

This is a good follow-up to the shared runtime direction.

3.2 Maintainability

Better than before. Cache/metrics concerns now live in the same shared runtime instead of leaking into provider-specific code.

3.3 Extensibility

The current split is a good base for future provider/runtime features.

3.4 Historical compatibility

No historical-compatibility blocker found on the latest head.

4. Issue summary

No new source-level blocking issue found.

5. Merge conclusion

Conclusion: can merge after fixes

  1. Blocking items
  • No code blocker from my side on the latest head.
  • The current Build is still red from GitHub metadata. The visible failed lanes are engine-v2-it (11), unit-test (8), and connector-file-sftp-it (8), with several other jobs cancelled, so CI still needs to be cleared before merge.
  1. Suggested but non-blocking follow-ups
  • None from the source side in this round.

From the source-review perspective, the latest head still looks good to me. The remaining gate is CI rather than code-path correctness.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants