-
Notifications
You must be signed in to change notification settings - Fork 32
feat: clear destination #243
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: staging
Are you sure you want to change the base?
Conversation
… into refactor/bff
… into refactor/bff
… into refactor/bff
…o refactor/worker
…into feat/clear_v0
…ke-ui into feat/clear_v0
…into feat/clear_v0
* fix: send id in job edit * fix: add modal for spec error, fix double error messages * fix: integration test fix * fix: minor fix
…into feat/clear_v0
|
Even after resuming a paused job, the options are blocked. @deepanshupal09-datazip
|
server/internal/services/etl/job.go
Outdated
| if clearRunning, _ := isWorkflowRunning(ctx, s.temporal, projectID, jobID, temporal.ClearDestination); clearRunning { | ||
| return fmt.Errorf("clear-destination is in progress, cannot update job") | ||
| } | ||
|
|
||
| // Cancel sync before updating the job | ||
| if syncRunning, _ := isWorkflowRunning(ctx, s.temporal, projectID, jobID, temporal.Sync); syncRunning { | ||
| logger.Infof("sync is running for job %d, cancelling sync workflow", jobID) | ||
| jobSlice := []*models.Job{existingJob} | ||
| if err := cancelAllJobWorkflows(ctx, s.temporal, jobSlice, projectID); err != nil { | ||
| return fmt.Errorf("failed to cancel sync: %s", err) | ||
| } | ||
| logger.Infof("successfully cancelled sync for job %d", jobID) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we return type as well from isWorkflow runing, and cancel bases on that, would reduce db queries
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have removed that check, so extra db query won't happen
Before
// Cancel sync before updating the job
if syncRunning, _ := isWorkflowRunning(ctx, s.temporal, projectID, jobID, temporal.Sync); syncRunning {
logger.Infof("sync is running for job %d, cancelling sync workflow", jobID)
jobSlice := []*models.Job{existingJob}
if err := cancelAllJobWorkflows(ctx, s.temporal, jobSlice, projectID); err != nil {
return fmt.Errorf("failed to cancel sync: %s", err)
}
logger.Infof("successfully cancelled sync for job %d", jobID)
}After
if err := cancelAllJobWorkflows(ctx, s.temporal, jobSlice, projectID); err != nil {
return fmt.Errorf("failed to cancel sync: %s", err)
}
server/internal/services/etl/job.go
Outdated
|
|
||
| if len(diffCatalog) > 0 { | ||
| logger.Infof("stream difference detected for job %d, running clear destination workflow", existingJob.ID) | ||
| if _, err := s.ClearDestination(ctx, projectID, jobID, req.DifferenceStreams, 10*time.Second); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are you sure clear destination would only run after any of the workflow is not running
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixing this...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| // Check if sync is running and wait for it to stop | ||
| if running, _ := isWorkflowRunning(ctx, s.temporal, projectID, jobID, temporal.Sync); running { | ||
| if err := waitForSyncToStop(ctx, s.temporal, projectID, jobID, syncWaitTime); err != nil { | ||
| return nil, fmt.Errorf("sync is in progress, please cancel it before running clear-destination") | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since it is part of same workflow at a time any one of them would run , why to keep this check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we need to wait for sync to cancel after cancelling it in updateJob flow
server/internal/services/etl/job.go
Outdated
| return nil, fmt.Errorf("job not found: %s", err) | ||
| } | ||
|
|
||
| diffCatalog, err := s.temporal.GetDifferenceStreams(ctx, job, job.StreamsConfig, req.UpdatedStreamsConfig) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| diffCatalog, err := s.temporal.GetDifferenceStreams(ctx, job, job.StreamsConfig, req.UpdatedStreamsConfig) | |
| diffCatalog, err := s.temporal.GetStreamDifference(ctx, job, job.StreamsConfig, req.UpdatedStreamsConfig) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| for _, job := range jobs { | ||
| conditions = append(conditions, fmt.Sprintf( | ||
| "(WorkflowId BETWEEN 'sync-%s-%d' AND 'sync-%s-%d-~')", | ||
| "(WorkflowId BETWEEN 'sync-%s-%d' AND 'sync-%s-%d-~' AND OperationType = 'sync')", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it be backward compatible ,i f not or you can list OperationType != 'clear'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OperationType != 'clear' and ``OperationType = 'sync'` Both will work
Anyways if a workflow is running it will definitely have operationType since we are registering it at the start of the workflow.
doc: https://docs.temporal.io/develop/go/observability#remove-search-attribute
| } | ||
|
|
||
| // waitForSyncToStop waits for sync workflows to stop with timeout | ||
| func waitForSyncToStop(ctx context.Context, tempClient *temporal.Temporal, projectID string, jobID int, maxWaitTime time.Duration) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should not fire db queries in loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked we can't use the DescribeWorkflowExecution so I have increased timeout to 1s and also removed time.After check and added a context timeout instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checking...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| projectID, jobID, projectID, jobID, opType, | ||
| ) | ||
|
|
||
| resp, err := tempClient.ListWorkflow(ctx, &workflowservice.ListWorkflowExecutionsRequest{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see if can use tempClient.Client.DescribeWorkflowExecution()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tempClient.Client.DescribeWorkflowExecution()
for this we need to provide workflowID, since the workflowID will change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checking...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| // UpdateScheduleSpec updates an existing schedule's spec | ||
| func (t *Temporal) UpdateScheduleSpec(ctx context.Context, frequency, projectID string, jobID int) error { | ||
| cronExpression := utils.ToCron(frequency) | ||
| _, scheduleID := t.WorkflowAndScheduleID(projectID, jobID) | ||
|
|
||
| handle := t.Client.ScheduleClient().GetHandle(ctx, scheduleID) | ||
| return handle.Update(ctx, client.ScheduleUpdateOptions{ | ||
| DoUpdate: func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) { | ||
| input.Description.Schedule.Spec = &client.ScheduleSpec{ | ||
| CronExpressions: []string{cronExpression}, | ||
| } | ||
| return &client.ScheduleUpdate{ | ||
| Schedule: &input.Description.Schedule, | ||
| }, nil | ||
| }, | ||
| }) | ||
| } | ||
|
|
||
| // UpdateScheduleAction updates the action (workflow args) of an existing schedule | ||
| func (t *Temporal) UpdateScheduleAction(ctx context.Context, projectID string, jobID int, req interface{}) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use only one function for schedule update?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| case ClearDestination: | ||
| return time.Hour * 24 * 30 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clear destination wont take that much time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Up for discussion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let us discuss and close this
not able to reproduce this |
… into feat/clear_v0
server/internal/handlers/job.go
Outdated
| }) | ||
| } | ||
|
|
||
| // @router /project/:projectid/jobs/:id/stream-difference [get] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // @router /project/:projectid/jobs/:id/stream-difference [get] | |
| // @router /project/:projectid/jobs/:id/clear-destination [get] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
server/internal/services/etl/job.go
Outdated
|
|
||
| if len(diffCatalog) > 0 { | ||
| logger.Infof("stream difference detected for job %d, running clear destination workflow", existingJob.ID) | ||
| if _, err := s.ClearDestination(ctx, projectID, jobID, req.DifferenceStreams, 30*time.Second); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make it constant and let us discuss about max wait time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
server/internal/services/etl/job.go
Outdated
|
|
||
| if len(diffCatalog) > 0 { | ||
| logger.Infof("stream difference detected for job %d, running clear destination workflow", existingJob.ID) | ||
| if _, err := s.ClearDestination(ctx, projectID, jobID, req.DifferenceStreams, 30*time.Second); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if cleat destination failed, how would we recover and clear destination for only these streams req.DifferenceStreams??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added TODO
// TODO: handle clear-destination workflow failure
|
|
||
| // Check if sync is running and wait for it to stop | ||
| if running, _, _ := isWorkflowRunning(ctx, s.temporal, projectID, jobID, temporal.Sync); running { | ||
| if err := waitForSyncToStop(ctx, s.temporal, projectID, jobID, syncWaitTime); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if sync is running , ans user click clear destination?, why to wait in that case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We won't wait because in that case we are passing syncWaitTIme=0
| ) | ||
|
|
||
| resp, err := tempClient.ListWorkflow(ctx, &workflowservice.ListWorkflowExecutionsRequest{ | ||
| Query: query, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pass pageSize:1, Avoids loading unnecessary workflow metadata
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DOne
* chore: remove temporalAddr constant and make retry simple * chore: remove logs
…into feat/clear_v0
| require github.com/beego/beego/v2 v2.3.8 | ||
|
|
||
| require ( | ||
| github.com/apache/spark-connect-go/v35 v35.0.0-20250317154112-ffd832059443 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why these are being added in this pr?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Used the one from staging.
Not needed
| case ClearDestination: | ||
| return time.Hour * 24 * 30 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let us discuss and close this
|
|
||
| // extractWorkflowResponse extracts and parses the JSON response from a workflow execution result | ||
| func ExtractWorkflowResponse(ctx context.Context, run client.WorkflowRun) (map[string]interface{}, error) { | ||
| var result map[string]interface{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| var result map[string]interface{} | |
| result := make(map[string]interface) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DOne
| "--difference", "/mnt/config/new_streams.json", | ||
| } | ||
| if encryptionKey, _ := web.AppConfig.String(constants.ConfEncryptionKey); encryptionKey != "" { | ||
| cmdArgs = append(cmdArgs, "--encryption-key", encryptionKey) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we create constants for command args or add a todo.
| } | ||
|
|
||
| req := &ExecutionRequest{ | ||
| Type: "docker", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why type is required?
| utils.ErrorResponse(&h.Controller, http.StatusBadRequest, fmt.Sprintf("failed to validate request: %s", err), err) | ||
| return | ||
| } | ||
| result, err := h.etl.ClearDestination(h.Ctx.Request.Context(), projectID, id, "", 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why sending empty stream config
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case of clearing the entire data form streams.json we are passing it. Else we can pass difference_streams in case of streams edit
| if err != nil { | ||
| utils.ErrorResponse(&h.Controller, http.StatusInternalServerError, "Failed to get stream difference", err) | ||
| return | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let us also print diff streams, so that we can have log till it persist
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| } | ||
|
|
||
| // Handle stream difference if provided | ||
| // TODO: handle clear-destination workflow failure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let us add a recover flow for it
| } | ||
|
|
||
| func (s *ETLService) ClearDestination(ctx context.Context, projectID string, jobID int, streamsConfig string, syncWaitTime time.Duration) (map[string]interface{}, error) { | ||
| job, err := s.db.GetJobByID(jobID, true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let us log the diff streams that is being passed so that we have an idea while debugging that clear destination run for these streams
| "success": "boolean", | ||
| "message": "string", | ||
| "data": { | ||
| "message": "string" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this message required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not needed. Removed


Description
This PR introduces the Clear Destination feature that allows users to manually or automatically clear data from a destination linked to a job when configuration or stream changes are detected.
Added two new API endpoints:
POST
/api/v1/project/:projectid/jobs/:id/clear-destination– Triggers a clear destination workflow for the specified job.GET
/api/v1/project/:projectid/jobs/:id/clear-destination– Returns the current status of the clear destination workflow (running or not running).Type of change
How Has This Been Tested?