Skip to content

Add foreach flattened view support via maestro-extensions module#194

Open
rdeepak2002 wants to merge 11 commits intomainfrom
feature/flattened-foreach
Open

Add foreach flattened view support via maestro-extensions module#194
rdeepak2002 wants to merge 11 commits intomainfrom
feature/flattened-foreach

Conversation

@rdeepak2002
Copy link
Collaborator

@rdeepak2002 rdeepak2002 commented Feb 18, 2026

Pull Request type

  • Bugfix
  • Feature
  • Refactoring (no functional changes, no api changes)
  • Build related changes (Please run ./gradlew build --write-locks to refresh dependencies)
  • Other (please describe):

NOTE: Please remember to run ./gradlew spotlessApply to fix any format violations.

Changes in this PR

Ports the "flattened foreach" feature from internal Maestro (maestro-signal-service) to OSS as a new maestro-extensions module. This is an auto-configurable library that registers into maestro-server when extensions.enabled=true (set in the aws profile). It consumes maestro events via SQS (subscribed to the SNS topic) and makes HTTP calls to maestro-server for data — matching the internal architecture.

The denormalization strategy stores foreach step iterations in a dedicated maestro_step_foreach_flattened table for efficient cursor-based pagination, status filtering, and summary statistics — critical for workflows with large iteration counts.

Architecture:

  • maestro-extensions is an auto-configurable library registered into maestro-server via Spring Boot auto-configuration (all endpoints on port 8080)
  • Uses @ConditionalOnProperty(value = "extensions.enabled", havingValue = "true") to activate only when configured
  • Uses @ComponentScan to register REST controllers from the extensions package
  • Injects shared beans from the host application: DataSource, ObjectMapper, MaestroMetrics, DatabaseConfiguration, HttpClient
  • Consumes StepInstanceStatusChangeEvent from an SQS queue (maestro-event) subscribed to the maestro SNS topic
  • Makes HTTP GET calls to maestro-server REST API via HttpMaestroClient for workflow/step data
  • Flyway migrations auto-discovered at classpath:db/migration/postgres/ when jar is on classpath
  • Manual SQS acknowledgment with DLQ routing (redrive after 5 failures), visibility timeout on failure, and receive count monitoring — matching internal's SqsProcessorFinalizer pattern
  • SNS→SQS subscription uses RawMessageDelivery=true so messages arrive as raw JSON (matching internal)

File inventory

Files matching internal (logic identical, only package/annotation/import differences)

These files are direct ports from maestro-signal-service. The core logic is identical; differences are limited to package names, license headers, DI annotations (@Inject/@Singleton → Spring @Bean), metrics class names (MetricRepoMaestroMetrics), and @SuppressFBWarnings annotations.

OSS file Internal counterpart Notes
handlers/ForeachFlatteningHandler.java handlers/foreach/ForeachFlatteningHandler.java + @SuppressFBWarnings
processors/StepEventPreprocessor.java messageProcessors/StepEventPreprocessor.java MetricRepoMaestroMetrics; metric constant inlined
dao/models/ForeachFlattenedInstance.java dao/models/ForeachFlattenedInstance.java Package only
dao/models/ForeachFlattenedModel.java dao/models/ForeachFlattenedModel.java Package only
models/StepEventHandlerInput.java models/StepEventHandlerInput.java + @SuppressFBWarnings
models/StepIteration.java client/models/flattening/StepIteration.java Package only
models/StepIterationsSummary.java client/models/flattening/StepIterationsSummary.java Package only
utils/ForeachFlatteningHelper.java utils/ForeachFlatteningHelper.java Package only
utils/StepInstanceStatusEncoder.java utils/StepInstanceStatusEncoder.java EnumMap return/field types → Map interface (PMD LooseCoupling rule); internal logic identical
controllers/ForeachFlattenController.java controllers/flattening/ForeachFlattenController.java @SuppressWarnings@SuppressFBWarnings; javax.annotation.Nullablecom.netflix.maestro.annotations.Nullable

Files with logical differences from internal

OSS file Internal counterpart Differences
processors/MaestroEventProcessor.java messageProcessors/MaestroEventProcessor.java Removes TargetedAtGroupHandler, TrinoDirectHandler, and WORKFLOW_INSTANCE_STATUS_CHANGE_EVENT handling (not in scope for OSS). Changes MessageProcessor<T> interface with Supplier<MaestroEvent> to direct MaestroEvent parameter. Core foreach-handling logic is identical.
listeners/SqsMaestroEventListener.java listeners/SqsMaestroEventListener.java Internal delegates to shared SqsProcessorFinalizer utility class; OSS inlines the same manual ack / visibility / DLQ logic directly in the listener (same behavior, no shared library abstraction). Uses ObjectMapper.readValue() directly instead of MessageProcessor<T> with Supplier<T>.
dao/MaestroForeachFlattenedDao.java dao/flattening/MaestroForeachFlattenedDao.java Internal supports dual CockroachDB + Postgres with a runtime feature flag (SignalFastProperties.isUsePostgresForSignalService()), two separate INSERT queries, and @iterationIdx CRDB index hints. OSS is Postgres-only — single INSERT query, no CRDB conditionals.
utils/PaginationHelper.java shared/utils/PaginationHelper.java OSS keeps only validateParamAndDeriveDirection() + buildEmptyPaginationResult() (used by the controller). Internal's full pagination range calculation and result building are for other signal-service features not ported here.

OSS-only files (no internal counterpart)

These files are needed because OSS uses Spring Boot (vs internal's Micronaut) and doesn't have access to internal shared libraries (maestro-client, maestro-shared).

OSS file Purpose
config/MaestroExtensionsConfiguration.java Spring @Configuration with @ConditionalOnProperty + @ComponentScan for auto-config. Injects host beans via @Qualifier(Constants.MAESTRO_QUALIFIER) and parameter naming.
properties/MaestroExtensionsProperties.java @ConfigurationProperties for SQS queue URL, maestro base URL
provider/MaestroClient.java Interface abstracting data fetching — matches internal's MaestroClient method names (getWorkflowInstance, getWorkflowDefinition, getWorkflowStepInstance). Internal's MaestroClient is an 800+ line concrete class with SSLSocketFactory, streaming iterators, POST methods; OSS extracts only the 3 methods needed for foreach flattening.
provider/HttpMaestroClient.java JDK HttpClient implementation calling maestro-server REST API (internal's MaestroClient uses Metatron mTLS)

Resource files

File Description
src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports Spring Boot auto-configuration registration
src/main/resources/db/migration/postgres/V202311161000__add_foreach_flattened.sql Flyway migration for maestro_step_foreach_flattened table (identical to internal, trailing newline only)
src/test/resources/flattening/flattening_records.json Test fixture for DAO tests (identical to internal)

Test files (11 files)

File Covers Notes
ExtensionsBaseTest.java Shared ObjectMapper setup OSS-only (replaces internal's SignalBaseTest/MaestroBaseTest)
ExtensionsDaoBaseTest.java Testcontainers Postgres + Flyway + DataSource OSS-only (replaces internal's SignalDaoBaseTest)
MaestroForeachFlattenedDaoTest.java DAO insert/query/summary with real Postgres (11 tests) Same 11 tests as internal; removes CRDB code paths (SignalFastProperties mock, dual INSERT)
ForeachFlatteningHandlerTest.java Handler logic (2 tests) Matches internal (package/import only)
ForeachFlatteningHelperTest.java Model building logic (5 tests) Matches internal (package/import only)
StepInstanceStatusEncoderTest.java Status encoding/decoding (2 tests) Matches internal (package only)
ForeachFlattenControllerTest.java REST endpoints with mocked DAO (9 tests) Matches internal (package/import only)
StepEventPreprocessorTest.java Preprocessor (2 tests) Same 2 scenarios as internal; rewritten with StepInstanceStatusChangeEvent.builder() + AssertJ (internal uses mock fields + JUnit Assert + manual Spectator registry metric verification)
MaestroEventProcessorTest.java Event dispatch + retryable error + metrics (4 tests) 4 tests vs internal's 7 — removed 3 targeted-at-group tests; kept foreach-relevant tests; rewritten with builder pattern
SqsMaestroEventListenerTest.java Manual ack, deserialization error deletion, visibility extension (5 tests) Covers same scenarios as internal's SqsProcessorFinalizerTest but tests the inlined listener logic
HttpMaestroClientTest.java HTTP client with mocked responses (6 tests) OSS-only (no internal counterpart)

Changes to existing modules

File Change
settings.gradle Added maestro-extensions to multi-project build
maestro-database/.../AbstractDatabaseDao.java Added markTransactionSerializable()
maestro-aws/localstack/sqs_bootstrap.sh Added maestro-event queue with DLQ + redrive policy
maestro-aws/localstack/sns_bootstrap.sh Added SNS→SQS subscription with RawMessageDelivery=true
maestro-server/build.gradle Added implementation project(':maestro-extensions') dependency
maestro-server/.../application-aws.yml Added extensions config block (extensions.enabled, queue URL, base URL)
maestro-server/.../docker-java.properties Testcontainers Docker API version fix

E2E Testing

These commands have been verified end-to-end on macOS. Copy and paste each command in order.

Prerequisites

  • Docker Desktop running
  • Java 21+

Step 1: Start infrastructure (LocalStack + Redis)

docker compose -f maestro-aws/docker-compose.yml up -d

Wait until docker ps shows both containers as healthy/running.

Step 2: Start maestro-server with extensions (single terminal)

./gradlew :maestro-server:bootRun --args='--spring.profiles.active=aws'

Wait for: Started MaestroApp in X seconds. Extensions auto-configure and you should see log lines like:

Creating HttpMaestroClient within Spring boot...
Creating MaestroForeachFlattenedDao within Spring boot...
Creating SqsMaestroEventListener within Spring boot...

Step 3: Create the foreach workflow

curl -s -X POST 'http://localhost:8080/api/v3/workflows' \
  -H 'Content-Type: application/json' \
  -H 'user: tester' \
  -d @maestro-server/src/test/resources/samples/sample-foreach-wf.json

Expected: JSON response with workflow_definition containing sample-foreach-wf.

Step 4: Trigger the workflow

curl -s -X POST 'http://localhost:8080/api/v3/workflows/sample-foreach-wf/versions/latest/actions/start' \
  -H 'Content-Type: application/json' \
  -H 'user: tester' \
  -d '{"initiator": {"type": "manual"}}'

Expected: JSON with "status":"CREATED" and workflow_instance_id: 1.

Step 5: Wait for execution + event processing

sleep 30

The foreach creates 9 iterations (dates 20200101-20200109), each with 6 inner steps (job.1-job.6). Events flow: maestro-server → SNS → SQS → extensions listener (same process).

Step 6: Check workflow completed

curl -s 'http://localhost:8080/api/v3/workflows/sample-foreach-wf/instances/1/runs/1' | python3 -m json.tool | grep status

Expected: "status": "SUCCEEDED"

Step 7: Query flattened foreach summary (same port 8080)

The flattened view is keyed by inner step ID (e.g., job.1), not the foreach step name.

curl -s 'http://localhost:8080/api/v3/views/flatten/workflows/sample-foreach-wf/instances/1/runs/1/steps/job.1/summary?statusCount=true' | python3 -m json.tool

Expected:

{
    "representative_iteration": { "step_id": "job.1", "loop_params": {"i": "20200101"}, "step_runtime_state": {"status": "SUCCEEDED"} },
    "count_by_status": { "SUCCEEDED": 9 },
    "loop_param_values": { "i": ["20200101", "20200102", "20200103", "20200104", "20200105"] }
}

Step 8: Query paginated iterations

curl -s 'http://localhost:8080/api/v3/views/flatten/workflows/sample-foreach-wf/instances/1/runs/1/steps/job.1?first=3' | python3 -m json.tool

Expected: 3 iterations with has_next_page: true, each showing loop param i and SUCCEEDED status.

Step 9: Get a single iteration by ID

curl -s 'http://localhost:8080/api/v3/views/flatten/workflows/sample-foreach-wf/instances/1/runs/1/steps/job.1/iterations/1' | python3 -m json.tool

Expected: Single iteration with iteration_rank: "11", loop_params: {"i": "20200101"}, status SUCCEEDED.

Cleanup

docker compose -f maestro-aws/docker-compose.yml down

Deepak Ramalingam and others added 7 commits February 17, 2026 18:46
Ports the "flattened foreach" feature from internal Maestro to OSS.
This denormalization strategy stores foreach step iterations in a
dedicated table for efficient cursor-based pagination, status filtering,
and summary statistics - critical for workflows with large iteration counts.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Redesign maestro-extensions to match internal's architecture:
- Separate Spring Boot app (port 8081) instead of library in maestro-server
- SQS event consumption via @SqsListener (SNS -> SQS subscription)
- HTTP calls to maestro-server REST API via HttpMaestroDataProvider
- Metrics aligned with internal (failure/duration counters, timer)
- JavaTimeModule registered on ObjectMapper
- Own application.yml, DataSource config, Flyway migrations
- LocalStack bootstrap: maestro-event SQS queue + SNS subscription

Removed: ForeachNotificationInterceptor, ForeachFlattenConfiguration,
DaoBackedMaestroDataProvider, maestro-engine dependency

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Add docker-java.properties (api.version=1.44) to both maestro-server
  and maestro-extensions to fix Testcontainers Docker detection issue
- Move testcontainerDep and postgresqlDep from testImplementation to
  runtimeOnly in extensions so bootRun can use jdbc:tc: URL
- Add bvalJsrDep (Bean Validation provider) to extensions for @Min/@max
  validation in ForeachFlattenController

E2E tested: both services start successfully, foreach workflow
executes, events flow through SNS->SQS->extensions, flattened view
endpoints return correct data.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- SqsMaestroEventListener: switch to MANUAL ack mode with Acknowledgement
  and Visibility params matching internal's SqsProcessorFinalizer pattern
  (ack on success, ack+delete on non-retryable, visibility extension on
  retryable failure, receive count monitoring at threshold 100)
- MaestroEventProcessor: re-throw exceptions as MaestroRetryableError so
  the SQS listener can distinguish retryable from non-retryable failures
- sqs_bootstrap.sh: add maestro-event-dlq queue and redrive policy
  (maxReceiveCount=5) on the maestro-event queue
- Update tests for new method signatures and error handling behavior

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Matches internal's processingExceptionShouldThrowAndIncrementMetric
test by also verifying the failure counter is incremented when the
processor re-throws as MaestroRetryableError.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- sns_bootstrap.sh: enable RawMessageDelivery=true on the SNS→SQS
  subscription so messages arrive as raw MaestroEvent JSON, matching
  internal's raw delivery configuration
- SqsMaestroEventListener: remove unwrapSnsEnvelope() and JsonNode
  imports — no longer needed with raw delivery. Directly deserialize
  the payload as MaestroEvent, matching internal's pattern.
- SqsMaestroEventListenerTest: remove SNS-wrapped payload test
- ForeachFlatteningHelper: fix javadoc typo (3344-13-223 → 3344-13-23)
  matching internal

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Rename MaestroDataProvider → MaestroClient and
HttpMaestroDataProvider → HttpMaestroClient to align with internal
maestro-client naming. Also rename getStepInstance →
getWorkflowStepInstance to match internal API surface.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@rdeepak2002 rdeepak2002 changed the title Add foreach flattened view support via maestro-extensions module [do not review] Add foreach flattened view support via maestro-extensions module Feb 18, 2026
Deepak Ramalingam and others added 2 commits February 18, 2026 14:34
It was a no-op delegate to withRetryableQuery. The internal version
has a TODO to route to a read replica, but OSS has no such plan.
Replace the single call site with withRetryableQuery directly.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The RedrivePolicy JSON inside --attributes shorthand caused a parse
error, preventing the maestro-event queue from being created. Split
into create-queue + set-queue-attributes to fix.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@rdeepak2002 rdeepak2002 changed the title [do not review] Add foreach flattened view support via maestro-extensions module Add foreach flattened view support via maestro-extensions module Feb 18, 2026
@rdeepak2002 rdeepak2002 marked this pull request as ready for review February 18, 2026 23:27
Deepak Ramalingam and others added 2 commits February 24, 2026 12:05
…ibrary

Converts maestro-extensions from a separate Spring Boot service (port 8081)
to an auto-configuration library that registers into maestro-server when
extensions.enabled=true. This follows the same pattern as maestro-aws.

Key changes:
- Remove standalone app class, application.yml, and docker-java.properties
- Refactor MaestroExtensionsConfiguration to inject host beans via
  @qualifier and parameter naming instead of creating its own
- Add @componentscan to pick up REST controllers from extensions package
- Reuse server's HttpClient bean instead of creating a separate one
- Add AutoConfiguration.imports for Spring Boot auto-config discovery
- Delete SpectatorMaestroMetrics entirely (tests use mock instead)
- Add maestro-extensions dependency to maestro-server build.gradle
- Add extensions config block to application-aws.yml

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace boolean-flag method with explicit markTransactionSerializable()
and markTransactionSerializableReadOnly() per PR #195 review feedback.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
derek-miller pushed a commit to derek-miller/maestro that referenced this pull request Mar 4, 2026
* add a SEL function to check if a param exists and improve the SEL doc (Netflix#118)

* fix some warnings from the build (Netflix#120)

* add get workflow instances batch endpoint (Netflix#121)

* Add a SEL function to get the value of error retries. Also add one example of k8s step to set env. (Netflix#123)

* Add a Redis-based InstanceStepConcurrencyHandler implementation. (Netflix#125)

* Add a Redis-based InstanceStepConcurrencyHandler implementation.

* address the comment

* Add postgres based Maestro persistence layer to support postgressql db. (Netflix#119)

* Add postgres based Maestro persistence layer to support postgressql db.

* Update indexed columns to use the COLLATE "C".

* Make this minor update to use JSON instead JSONB to address some tricky cases.
Note that Postgres JSONB will not preserve the original order of map field.
It might cause troubles in certain cases, e.g. param map order.

* update the dependency lock

* address the comments

* Improve Maestro to retry certain db errors, e.g. connection is closed. (Netflix#127)

* Improve Maestro to retry certain db errors, e.g. connection is closed.

* fix null pointer issue in the set contains call

* Update delete queue config to match the deletion processor timeout. (Netflix#130)

* Update delete queue config to match the deletion processor timeout.

* address the comment

* Explicitly clear the pending action after passing it to the step runtime. (Netflix#131)

* Explicitly clear the pending action after passing it to the step runtime.

* Add tests to verify that the pending action is reset.

* Add comments for the config.

* Improve the test a bit.

* Fix edge case where unblocking workflow doesn't enqueue job event (Netflix#132)

* Fix edge case where unblocking workflow doesn't enqueue job event

There is an edge case when we unblock all failed instances in batches and default batch size = 100, the only failed instance was unblocked in first while loop but didn't enqueue a job event because the condition requires unblocked count > 0 (in 1st loop this value is not updated and is still 0). This PR updated the if condition to account for this case.

* improve comment

* Check if string map param is literal before returning the value. (Netflix#133)

* Check if string map param is literal before returning the value.

* Add additional endpoints to workflow controller. (Netflix#136)

* Check if map param is literal before returning the value. (Netflix#140)

* Check if map param is literal before returning the value.

* address the comment

* Maestro workers should not retry in certain cases (Netflix#141)

* While a worker processes a job, the business logic might throw MaestroNotFoundException, e.g. the workflow is deleted. The worker should not retry in this case.

* address the comment

* fix a corner case during launching a subworkflow if the subworkflow instance already exists (Netflix#142)

* getAction logic should not pick up the upstream actions in async execution mode (Netflix#143)

* Subworkflow should wake up its child workflow than itself. (Netflix#144)

* Improve step runtime to support setting the next polling delay. (Netflix#145)

* Invalid job event should be removed from the queue. (Netflix#146)

* Invalid job event should be removed from the queue.

* address the comment

* Improve the queue performance by adding an explicit lock mechanism (Netflix#129)

* Improve the queue performance by adding an explicit lock mechanism because SKIP LOCKED does not perform well if there are a large number of rows.

* Use a dedicated queue_lock table for locking queue id.

* update dependencies field to JSONB so to support first_value() (Netflix#148)

* Add while step runtime support (Netflix#147)

* add while step runtime to support while loop

* address the comments

* fix batch loading rollup

* Support waking up flow engine with an action code. (Netflix#149)

* Support waking up flow engine with an action code.
Meaning of action codes are determined by the Task implementation.

* address the comment

* deduplicate actions to avoid unnecessary executions in the flow engine (Netflix#150)

* Implement tag permit support for Maestro. (Netflix#151)

* Implement tag permit support for Maestro.

* add extra metrics and small improvements related to tag permit feature (Netflix#152)

* Jun/unblock event (Netflix#153)

* fix missing action and message fields in the unblock event

* upgrade dependency locks

* Fix an issue for workflow timeout action: when the workflow is timed out, both the workflow instance and its steps should be marked as timeout.

* Slightly improve the code readability with additional comments. (Netflix#155)

* Fix the http status code in MaestroRuntimeException. (Netflix#156)

* Improve the code and a unit test (Netflix#157)

* calling Array.free() to release resources in loop cases (Netflix#159)

* Improve actors to cancel the scheduled task ping action during activate/wakeup (Netflix#160)

* Actor improvement: during activate call, cancel any existing scheduled task ping as activate will schedule a new ping.

* Improve the code a bit and add extra unit tests.

* fix tag permit release for queued cases (Netflix#161)

* Support extension function in SEL Util class. (Netflix#162)

* Support extension function in SEL Util class.
* Also add a toJson extension function to Maestro SEL expression evaluator.
* Address the comments.

* Rename testcontainerPostgresDep to testcontainerDep in all build.gradle files

To match the renamed dependency in dependencies.gradle after OSS merge

* Regenerate all gradle lockfiles after OSS merge

Generated by running: ./gradlew build --write-locks -x test -x integrationTest

* Add db-type and ownership-timeout properties to production config

- Add db-type: postgres to engine.configs (required by OSS)
- Add ownership-timeout: 125000 to queue 5 for deletion processor
- Tag permit features left disabled (not needed for Airbnb deployment)

* Add OSS sync script for merging upstream changes

- Prompts for branch name to avoid hardcoding usernames
- Fetches latest from origin and oss remotes
- Creates branch from origin/main and merges oss/master
- Lists conflicts if any, with helpful next steps
- Auto-commits if no conflicts detected

---------

Co-authored-by: jun-he <jun-he@users.noreply.github.com>
Co-authored-by: yingyiz-netflix <yingyiz@netflix.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant