Add foreach flattened view support via maestro-extensions module#194
Open
rdeepak2002 wants to merge 11 commits intomainfrom
Open
Add foreach flattened view support via maestro-extensions module#194rdeepak2002 wants to merge 11 commits intomainfrom
rdeepak2002 wants to merge 11 commits intomainfrom
Conversation
Ports the "flattened foreach" feature from internal Maestro to OSS. This denormalization strategy stores foreach step iterations in a dedicated table for efficient cursor-based pagination, status filtering, and summary statistics - critical for workflows with large iteration counts. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Redesign maestro-extensions to match internal's architecture: - Separate Spring Boot app (port 8081) instead of library in maestro-server - SQS event consumption via @SqsListener (SNS -> SQS subscription) - HTTP calls to maestro-server REST API via HttpMaestroDataProvider - Metrics aligned with internal (failure/duration counters, timer) - JavaTimeModule registered on ObjectMapper - Own application.yml, DataSource config, Flyway migrations - LocalStack bootstrap: maestro-event SQS queue + SNS subscription Removed: ForeachNotificationInterceptor, ForeachFlattenConfiguration, DaoBackedMaestroDataProvider, maestro-engine dependency Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Add docker-java.properties (api.version=1.44) to both maestro-server and maestro-extensions to fix Testcontainers Docker detection issue - Move testcontainerDep and postgresqlDep from testImplementation to runtimeOnly in extensions so bootRun can use jdbc:tc: URL - Add bvalJsrDep (Bean Validation provider) to extensions for @Min/@max validation in ForeachFlattenController E2E tested: both services start successfully, foreach workflow executes, events flow through SNS->SQS->extensions, flattened view endpoints return correct data. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- SqsMaestroEventListener: switch to MANUAL ack mode with Acknowledgement and Visibility params matching internal's SqsProcessorFinalizer pattern (ack on success, ack+delete on non-retryable, visibility extension on retryable failure, receive count monitoring at threshold 100) - MaestroEventProcessor: re-throw exceptions as MaestroRetryableError so the SQS listener can distinguish retryable from non-retryable failures - sqs_bootstrap.sh: add maestro-event-dlq queue and redrive policy (maxReceiveCount=5) on the maestro-event queue - Update tests for new method signatures and error handling behavior Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Matches internal's processingExceptionShouldThrowAndIncrementMetric test by also verifying the failure counter is incremented when the processor re-throws as MaestroRetryableError. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- sns_bootstrap.sh: enable RawMessageDelivery=true on the SNS→SQS subscription so messages arrive as raw MaestroEvent JSON, matching internal's raw delivery configuration - SqsMaestroEventListener: remove unwrapSnsEnvelope() and JsonNode imports — no longer needed with raw delivery. Directly deserialize the payload as MaestroEvent, matching internal's pattern. - SqsMaestroEventListenerTest: remove SNS-wrapped payload test - ForeachFlatteningHelper: fix javadoc typo (3344-13-223 → 3344-13-23) matching internal Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Rename MaestroDataProvider → MaestroClient and HttpMaestroDataProvider → HttpMaestroClient to align with internal maestro-client naming. Also rename getStepInstance → getWorkflowStepInstance to match internal API surface. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
It was a no-op delegate to withRetryableQuery. The internal version has a TODO to route to a read replica, but OSS has no such plan. Replace the single call site with withRetryableQuery directly. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The RedrivePolicy JSON inside --attributes shorthand caused a parse error, preventing the maestro-event queue from being created. Split into create-queue + set-queue-attributes to fix. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
9 tasks
…ibrary Converts maestro-extensions from a separate Spring Boot service (port 8081) to an auto-configuration library that registers into maestro-server when extensions.enabled=true. This follows the same pattern as maestro-aws. Key changes: - Remove standalone app class, application.yml, and docker-java.properties - Refactor MaestroExtensionsConfiguration to inject host beans via @qualifier and parameter naming instead of creating its own - Add @componentscan to pick up REST controllers from extensions package - Reuse server's HttpClient bean instead of creating a separate one - Add AutoConfiguration.imports for Spring Boot auto-config discovery - Delete SpectatorMaestroMetrics entirely (tests use mock instead) - Add maestro-extensions dependency to maestro-server build.gradle - Add extensions config block to application-aws.yml Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace boolean-flag method with explicit markTransactionSerializable() and markTransactionSerializableReadOnly() per PR #195 review feedback. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
derek-miller
pushed a commit
to derek-miller/maestro
that referenced
this pull request
Mar 4, 2026
* add a SEL function to check if a param exists and improve the SEL doc (Netflix#118) * fix some warnings from the build (Netflix#120) * add get workflow instances batch endpoint (Netflix#121) * Add a SEL function to get the value of error retries. Also add one example of k8s step to set env. (Netflix#123) * Add a Redis-based InstanceStepConcurrencyHandler implementation. (Netflix#125) * Add a Redis-based InstanceStepConcurrencyHandler implementation. * address the comment * Add postgres based Maestro persistence layer to support postgressql db. (Netflix#119) * Add postgres based Maestro persistence layer to support postgressql db. * Update indexed columns to use the COLLATE "C". * Make this minor update to use JSON instead JSONB to address some tricky cases. Note that Postgres JSONB will not preserve the original order of map field. It might cause troubles in certain cases, e.g. param map order. * update the dependency lock * address the comments * Improve Maestro to retry certain db errors, e.g. connection is closed. (Netflix#127) * Improve Maestro to retry certain db errors, e.g. connection is closed. * fix null pointer issue in the set contains call * Update delete queue config to match the deletion processor timeout. (Netflix#130) * Update delete queue config to match the deletion processor timeout. * address the comment * Explicitly clear the pending action after passing it to the step runtime. (Netflix#131) * Explicitly clear the pending action after passing it to the step runtime. * Add tests to verify that the pending action is reset. * Add comments for the config. * Improve the test a bit. * Fix edge case where unblocking workflow doesn't enqueue job event (Netflix#132) * Fix edge case where unblocking workflow doesn't enqueue job event There is an edge case when we unblock all failed instances in batches and default batch size = 100, the only failed instance was unblocked in first while loop but didn't enqueue a job event because the condition requires unblocked count > 0 (in 1st loop this value is not updated and is still 0). This PR updated the if condition to account for this case. * improve comment * Check if string map param is literal before returning the value. (Netflix#133) * Check if string map param is literal before returning the value. * Add additional endpoints to workflow controller. (Netflix#136) * Check if map param is literal before returning the value. (Netflix#140) * Check if map param is literal before returning the value. * address the comment * Maestro workers should not retry in certain cases (Netflix#141) * While a worker processes a job, the business logic might throw MaestroNotFoundException, e.g. the workflow is deleted. The worker should not retry in this case. * address the comment * fix a corner case during launching a subworkflow if the subworkflow instance already exists (Netflix#142) * getAction logic should not pick up the upstream actions in async execution mode (Netflix#143) * Subworkflow should wake up its child workflow than itself. (Netflix#144) * Improve step runtime to support setting the next polling delay. (Netflix#145) * Invalid job event should be removed from the queue. (Netflix#146) * Invalid job event should be removed from the queue. * address the comment * Improve the queue performance by adding an explicit lock mechanism (Netflix#129) * Improve the queue performance by adding an explicit lock mechanism because SKIP LOCKED does not perform well if there are a large number of rows. * Use a dedicated queue_lock table for locking queue id. * update dependencies field to JSONB so to support first_value() (Netflix#148) * Add while step runtime support (Netflix#147) * add while step runtime to support while loop * address the comments * fix batch loading rollup * Support waking up flow engine with an action code. (Netflix#149) * Support waking up flow engine with an action code. Meaning of action codes are determined by the Task implementation. * address the comment * deduplicate actions to avoid unnecessary executions in the flow engine (Netflix#150) * Implement tag permit support for Maestro. (Netflix#151) * Implement tag permit support for Maestro. * add extra metrics and small improvements related to tag permit feature (Netflix#152) * Jun/unblock event (Netflix#153) * fix missing action and message fields in the unblock event * upgrade dependency locks * Fix an issue for workflow timeout action: when the workflow is timed out, both the workflow instance and its steps should be marked as timeout. * Slightly improve the code readability with additional comments. (Netflix#155) * Fix the http status code in MaestroRuntimeException. (Netflix#156) * Improve the code and a unit test (Netflix#157) * calling Array.free() to release resources in loop cases (Netflix#159) * Improve actors to cancel the scheduled task ping action during activate/wakeup (Netflix#160) * Actor improvement: during activate call, cancel any existing scheduled task ping as activate will schedule a new ping. * Improve the code a bit and add extra unit tests. * fix tag permit release for queued cases (Netflix#161) * Support extension function in SEL Util class. (Netflix#162) * Support extension function in SEL Util class. * Also add a toJson extension function to Maestro SEL expression evaluator. * Address the comments. * Rename testcontainerPostgresDep to testcontainerDep in all build.gradle files To match the renamed dependency in dependencies.gradle after OSS merge * Regenerate all gradle lockfiles after OSS merge Generated by running: ./gradlew build --write-locks -x test -x integrationTest * Add db-type and ownership-timeout properties to production config - Add db-type: postgres to engine.configs (required by OSS) - Add ownership-timeout: 125000 to queue 5 for deletion processor - Tag permit features left disabled (not needed for Airbnb deployment) * Add OSS sync script for merging upstream changes - Prompts for branch name to avoid hardcoding usernames - Fetches latest from origin and oss remotes - Creates branch from origin/main and merges oss/master - Lists conflicts if any, with helpful next steps - Auto-commits if no conflicts detected --------- Co-authored-by: jun-he <jun-he@users.noreply.github.com> Co-authored-by: yingyiz-netflix <yingyiz@netflix.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Pull Request type
./gradlew build --write-locksto refresh dependencies)NOTE: Please remember to run
./gradlew spotlessApplyto fix any format violations.Changes in this PR
Ports the "flattened foreach" feature from internal Maestro (
maestro-signal-service) to OSS as a newmaestro-extensionsmodule. This is an auto-configurable library that registers into maestro-server whenextensions.enabled=true(set in theawsprofile). It consumes maestro events via SQS (subscribed to the SNS topic) and makes HTTP calls to maestro-server for data — matching the internal architecture.The denormalization strategy stores foreach step iterations in a dedicated
maestro_step_foreach_flattenedtable for efficient cursor-based pagination, status filtering, and summary statistics — critical for workflows with large iteration counts.Architecture:
maestro-extensionsis an auto-configurable library registered into maestro-server via Spring Boot auto-configuration (all endpoints on port 8080)@ConditionalOnProperty(value = "extensions.enabled", havingValue = "true")to activate only when configured@ComponentScanto register REST controllers from the extensions packageDataSource,ObjectMapper,MaestroMetrics,DatabaseConfiguration,HttpClientStepInstanceStatusChangeEventfrom an SQS queue (maestro-event) subscribed to the maestro SNS topicHttpMaestroClientfor workflow/step dataclasspath:db/migration/postgres/when jar is on classpathSqsProcessorFinalizerpatternRawMessageDelivery=trueso messages arrive as raw JSON (matching internal)File inventory
Files matching internal (logic identical, only package/annotation/import differences)
These files are direct ports from
maestro-signal-service. The core logic is identical; differences are limited to package names, license headers, DI annotations (@Inject/@Singleton→ Spring@Bean), metrics class names (MetricRepo→MaestroMetrics), and@SuppressFBWarningsannotations.handlers/ForeachFlatteningHandler.javahandlers/foreach/ForeachFlatteningHandler.java@SuppressFBWarningsprocessors/StepEventPreprocessor.javamessageProcessors/StepEventPreprocessor.javaMetricRepo→MaestroMetrics; metric constant inlineddao/models/ForeachFlattenedInstance.javadao/models/ForeachFlattenedInstance.javadao/models/ForeachFlattenedModel.javadao/models/ForeachFlattenedModel.javamodels/StepEventHandlerInput.javamodels/StepEventHandlerInput.java@SuppressFBWarningsmodels/StepIteration.javaclient/models/flattening/StepIteration.javamodels/StepIterationsSummary.javaclient/models/flattening/StepIterationsSummary.javautils/ForeachFlatteningHelper.javautils/ForeachFlatteningHelper.javautils/StepInstanceStatusEncoder.javautils/StepInstanceStatusEncoder.javaEnumMapreturn/field types →Mapinterface (PMD LooseCoupling rule); internal logic identicalcontrollers/ForeachFlattenController.javacontrollers/flattening/ForeachFlattenController.java@SuppressWarnings→@SuppressFBWarnings;javax.annotation.Nullable→com.netflix.maestro.annotations.NullableFiles with logical differences from internal
processors/MaestroEventProcessor.javamessageProcessors/MaestroEventProcessor.javaTargetedAtGroupHandler,TrinoDirectHandler, andWORKFLOW_INSTANCE_STATUS_CHANGE_EVENThandling (not in scope for OSS). ChangesMessageProcessor<T>interface withSupplier<MaestroEvent>to directMaestroEventparameter. Core foreach-handling logic is identical.listeners/SqsMaestroEventListener.javalisteners/SqsMaestroEventListener.javaSqsProcessorFinalizerutility class; OSS inlines the same manual ack / visibility / DLQ logic directly in the listener (same behavior, no shared library abstraction). UsesObjectMapper.readValue()directly instead ofMessageProcessor<T>withSupplier<T>.dao/MaestroForeachFlattenedDao.javadao/flattening/MaestroForeachFlattenedDao.javaSignalFastProperties.isUsePostgresForSignalService()), two separate INSERT queries, and@iterationIdxCRDB index hints. OSS is Postgres-only — single INSERT query, no CRDB conditionals.utils/PaginationHelper.javashared/utils/PaginationHelper.javavalidateParamAndDeriveDirection()+buildEmptyPaginationResult()(used by the controller). Internal's full pagination range calculation and result building are for other signal-service features not ported here.OSS-only files (no internal counterpart)
These files are needed because OSS uses Spring Boot (vs internal's Micronaut) and doesn't have access to internal shared libraries (
maestro-client,maestro-shared).config/MaestroExtensionsConfiguration.java@Configurationwith@ConditionalOnProperty+@ComponentScanfor auto-config. Injects host beans via@Qualifier(Constants.MAESTRO_QUALIFIER)and parameter naming.properties/MaestroExtensionsProperties.java@ConfigurationPropertiesfor SQS queue URL, maestro base URLprovider/MaestroClient.javaMaestroClientmethod names (getWorkflowInstance,getWorkflowDefinition,getWorkflowStepInstance). Internal'sMaestroClientis an 800+ line concrete class with SSLSocketFactory, streaming iterators, POST methods; OSS extracts only the 3 methods needed for foreach flattening.provider/HttpMaestroClient.javaMaestroClientuses Metatron mTLS)Resource files
src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.importssrc/main/resources/db/migration/postgres/V202311161000__add_foreach_flattened.sqlmaestro_step_foreach_flattenedtable (identical to internal, trailing newline only)src/test/resources/flattening/flattening_records.jsonTest files (11 files)
ExtensionsBaseTest.javaSignalBaseTest/MaestroBaseTest)ExtensionsDaoBaseTest.javaSignalDaoBaseTest)MaestroForeachFlattenedDaoTest.javaSignalFastPropertiesmock, dual INSERT)ForeachFlatteningHandlerTest.javaForeachFlatteningHelperTest.javaStepInstanceStatusEncoderTest.javaForeachFlattenControllerTest.javaStepEventPreprocessorTest.javaStepInstanceStatusChangeEvent.builder()+ AssertJ (internal uses mock fields + JUnit Assert + manual Spectator registry metric verification)MaestroEventProcessorTest.javaSqsMaestroEventListenerTest.javaSqsProcessorFinalizerTestbut tests the inlined listener logicHttpMaestroClientTest.javaChanges to existing modules
settings.gradlemaestro-extensionsto multi-project buildmaestro-database/.../AbstractDatabaseDao.javamarkTransactionSerializable()maestro-aws/localstack/sqs_bootstrap.shmaestro-eventqueue with DLQ + redrive policymaestro-aws/localstack/sns_bootstrap.shRawMessageDelivery=truemaestro-server/build.gradleimplementation project(':maestro-extensions')dependencymaestro-server/.../application-aws.ymlextensions.enabled, queue URL, base URL)maestro-server/.../docker-java.propertiesE2E Testing
These commands have been verified end-to-end on macOS. Copy and paste each command in order.
Prerequisites
Step 1: Start infrastructure (LocalStack + Redis)
Wait until
docker psshows both containers as healthy/running.Step 2: Start maestro-server with extensions (single terminal)
./gradlew :maestro-server:bootRun --args='--spring.profiles.active=aws'Wait for:
Started MaestroApp in X seconds. Extensions auto-configure and you should see log lines like:Step 3: Create the foreach workflow
Expected: JSON response with
workflow_definitioncontainingsample-foreach-wf.Step 4: Trigger the workflow
Expected: JSON with
"status":"CREATED"andworkflow_instance_id: 1.Step 5: Wait for execution + event processing
The foreach creates 9 iterations (dates 20200101-20200109), each with 6 inner steps (job.1-job.6). Events flow: maestro-server → SNS → SQS → extensions listener (same process).
Step 6: Check workflow completed
Expected:
"status": "SUCCEEDED"Step 7: Query flattened foreach summary (same port 8080)
The flattened view is keyed by inner step ID (e.g.,
job.1), not the foreach step name.Expected:
{ "representative_iteration": { "step_id": "job.1", "loop_params": {"i": "20200101"}, "step_runtime_state": {"status": "SUCCEEDED"} }, "count_by_status": { "SUCCEEDED": 9 }, "loop_param_values": { "i": ["20200101", "20200102", "20200103", "20200104", "20200105"] } }Step 8: Query paginated iterations
Expected: 3 iterations with
has_next_page: true, each showing loop paramiandSUCCEEDEDstatus.Step 9: Get a single iteration by ID
Expected: Single iteration with
iteration_rank: "11",loop_params: {"i": "20200101"}, statusSUCCEEDED.Cleanup