Skip to content

Conversation

@vishalm0509
Copy link
Collaborator

Description

This PR introduces the Clear Destination feature that allows users to manually or automatically clear data from a destination linked to a job when configuration or stream changes are detected.

Added two new API endpoints:

  • POST /api/v1/project/:projectid/jobs/:id/clear-destination – Triggers a clear destination workflow for the specified job.

  • GET /api/v1/project/:projectid/jobs/:id/clear-destination – Returns the current status of the clear destination workflow (running or not running).

Type of change

  • New feature (non-breaking change which adds functionality)
  • This change requires a documentation update

How Has This Been Tested?

  • Run clear destination manually
  • Update job and check if configured streams are getting dropped from the destination

vikaxsh and others added 30 commits June 12, 2025 11:26
@vishalm0509
Copy link
Collaborator Author

Even after resuming a paused job, the options are blocked. @deepanshupal09-datazip

  1. Pause a job
  2. Resume it
image

Comment on lines 100 to 112
if clearRunning, _ := isWorkflowRunning(ctx, s.temporal, projectID, jobID, temporal.ClearDestination); clearRunning {
return fmt.Errorf("clear-destination is in progress, cannot update job")
}

// Cancel sync before updating the job
if syncRunning, _ := isWorkflowRunning(ctx, s.temporal, projectID, jobID, temporal.Sync); syncRunning {
logger.Infof("sync is running for job %d, cancelling sync workflow", jobID)
jobSlice := []*models.Job{existingJob}
if err := cancelAllJobWorkflows(ctx, s.temporal, jobSlice, projectID); err != nil {
return fmt.Errorf("failed to cancel sync: %s", err)
}
logger.Infof("successfully cancelled sync for job %d", jobID)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we return type as well from isWorkflow runing, and cancel bases on that, would reduce db queries

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed that check, so extra db query won't happen

Before

	// Cancel sync before updating the job
	if syncRunning, _ := isWorkflowRunning(ctx, s.temporal, projectID, jobID, temporal.Sync); syncRunning {
		logger.Infof("sync is running for job %d, cancelling sync workflow", jobID)
		jobSlice := []*models.Job{existingJob}
		if err := cancelAllJobWorkflows(ctx, s.temporal, jobSlice, projectID); err != nil {
			return fmt.Errorf("failed to cancel sync: %s", err)
		}
		logger.Infof("successfully cancelled sync for job %d", jobID)
	}

After

		if err := cancelAllJobWorkflows(ctx, s.temporal, jobSlice, projectID); err != nil {
			return fmt.Errorf("failed to cancel sync: %s", err)
		}


if len(diffCatalog) > 0 {
logger.Infof("stream difference detected for job %d, running clear destination workflow", existingJob.ID)
if _, err := s.ClearDestination(ctx, projectID, jobID, req.DifferenceStreams, 10*time.Second); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you sure clear destination would only run after any of the workflow is not running

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixing this...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 264 to 269
// Check if sync is running and wait for it to stop
if running, _ := isWorkflowRunning(ctx, s.temporal, projectID, jobID, temporal.Sync); running {
if err := waitForSyncToStop(ctx, s.temporal, projectID, jobID, syncWaitTime); err != nil {
return nil, fmt.Errorf("sync is in progress, please cancel it before running clear-destination")
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since it is part of same workflow at a time any one of them would run , why to keep this check

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we need to wait for sync to cancel after cancelling it in updateJob flow

return nil, fmt.Errorf("job not found: %s", err)
}

diffCatalog, err := s.temporal.GetDifferenceStreams(ctx, job, job.StreamsConfig, req.UpdatedStreamsConfig)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
diffCatalog, err := s.temporal.GetDifferenceStreams(ctx, job, job.StreamsConfig, req.UpdatedStreamsConfig)
diffCatalog, err := s.temporal.GetStreamDifference(ctx, job, job.StreamsConfig, req.UpdatedStreamsConfig)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

for _, job := range jobs {
conditions = append(conditions, fmt.Sprintf(
"(WorkflowId BETWEEN 'sync-%s-%d' AND 'sync-%s-%d-~')",
"(WorkflowId BETWEEN 'sync-%s-%d' AND 'sync-%s-%d-~' AND OperationType = 'sync')",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be backward compatible ,i f not or you can list OperationType != 'clear'

Copy link
Collaborator Author

@vishalm0509 vishalm0509 Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OperationType != 'clear' and ``OperationType = 'sync'` Both will work

Anyways if a workflow is running it will definitely have operationType since we are registering it at the start of the workflow.

doc: https://docs.temporal.io/develop/go/observability#remove-search-attribute

}

// waitForSyncToStop waits for sync workflows to stop with timeout
func waitForSyncToStop(ctx context.Context, tempClient *temporal.Temporal, projectID string, jobID int, maxWaitTime time.Duration) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should not fire db queries in loop

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked we can't use the DescribeWorkflowExecution so I have increased timeout to 1s and also removed time.After check and added a context timeout instead

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

projectID, jobID, projectID, jobID, opType,
)

resp, err := tempClient.ListWorkflow(ctx, &workflowservice.ListWorkflowExecutionsRequest{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see if can use tempClient.Client.DescribeWorkflowExecution()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tempClient.Client.DescribeWorkflowExecution()

for this we need to provide workflowID, since the workflowID will change

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 83 to 102
// UpdateScheduleSpec updates an existing schedule's spec
func (t *Temporal) UpdateScheduleSpec(ctx context.Context, frequency, projectID string, jobID int) error {
cronExpression := utils.ToCron(frequency)
_, scheduleID := t.WorkflowAndScheduleID(projectID, jobID)

handle := t.Client.ScheduleClient().GetHandle(ctx, scheduleID)
return handle.Update(ctx, client.ScheduleUpdateOptions{
DoUpdate: func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) {
input.Description.Schedule.Spec = &client.ScheduleSpec{
CronExpressions: []string{cronExpression},
}
return &client.ScheduleUpdate{
Schedule: &input.Description.Schedule,
}, nil
},
})
}

// UpdateScheduleAction updates the action (workflow args) of an existing schedule
func (t *Temporal) UpdateScheduleAction(ctx context.Context, projectID string, jobID int, req interface{}) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use only one function for schedule update?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines +102 to +103
case ClearDestination:
return time.Hour * 24 * 30
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clear destination wont take that much time

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Up for discussion

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let us discuss and close this

@deepanshupal09-datazip
Copy link
Collaborator

Even after resuming a paused job, the options are blocked. @deepanshupal09-datazip

  1. Pause a job
  2. Resume it
image

not able to reproduce this

})
}

// @router /project/:projectid/jobs/:id/stream-difference [get]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// @router /project/:projectid/jobs/:id/stream-difference [get]
// @router /project/:projectid/jobs/:id/clear-destination [get]

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


if len(diffCatalog) > 0 {
logger.Infof("stream difference detected for job %d, running clear destination workflow", existingJob.ID)
if _, err := s.ClearDestination(ctx, projectID, jobID, req.DifferenceStreams, 30*time.Second); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make it constant and let us discuss about max wait time

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


if len(diffCatalog) > 0 {
logger.Infof("stream difference detected for job %d, running clear destination workflow", existingJob.ID)
if _, err := s.ClearDestination(ctx, projectID, jobID, req.DifferenceStreams, 30*time.Second); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if cleat destination failed, how would we recover and clear destination for only these streams req.DifferenceStreams??

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added TODO

// TODO: handle clear-destination workflow failure


// Check if sync is running and wait for it to stop
if running, _, _ := isWorkflowRunning(ctx, s.temporal, projectID, jobID, temporal.Sync); running {
if err := waitForSyncToStop(ctx, s.temporal, projectID, jobID, syncWaitTime); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if sync is running , ans user click clear destination?, why to wait in that case

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We won't wait because in that case we are passing syncWaitTIme=0

)

resp, err := tempClient.ListWorkflow(ctx, &workflowservice.ListWorkflowExecutionsRequest{
Query: query,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pass pageSize:1, Avoids loading unnecessary workflow metadata

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DOne

require github.com/beego/beego/v2 v2.3.8

require (
github.com/apache/spark-connect-go/v35 v35.0.0-20250317154112-ffd832059443
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why these are being added in this pr?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used the one from staging.

Not needed

Comment on lines +102 to +103
case ClearDestination:
return time.Hour * 24 * 30
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let us discuss and close this


// extractWorkflowResponse extracts and parses the JSON response from a workflow execution result
func ExtractWorkflowResponse(ctx context.Context, run client.WorkflowRun) (map[string]interface{}, error) {
var result map[string]interface{}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var result map[string]interface{}
result := make(map[string]interface)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DOne

"--difference", "/mnt/config/new_streams.json",
}
if encryptionKey, _ := web.AppConfig.String(constants.ConfEncryptionKey); encryptionKey != "" {
cmdArgs = append(cmdArgs, "--encryption-key", encryptionKey)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we create constants for command args or add a todo.

}

req := &ExecutionRequest{
Type: "docker",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why type is required?

utils.ErrorResponse(&h.Controller, http.StatusBadRequest, fmt.Sprintf("failed to validate request: %s", err), err)
return
}
result, err := h.etl.ClearDestination(h.Ctx.Request.Context(), projectID, id, "", 0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why sending empty stream config

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of clearing the entire data form streams.json we are passing it. Else we can pass difference_streams in case of streams edit

if err != nil {
utils.ErrorResponse(&h.Controller, http.StatusInternalServerError, "Failed to get stream difference", err)
return
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let us also print diff streams, so that we can have log till it persist

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

// Handle stream difference if provided
// TODO: handle clear-destination workflow failure
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let us add a recover flow for it

}

func (s *ETLService) ClearDestination(ctx context.Context, projectID string, jobID int, streamsConfig string, syncWaitTime time.Duration) (map[string]interface{}, error) {
job, err := s.db.GetJobByID(jobID, true)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let us log the diff streams that is being passed so that we have an idea while debugging that clear destination run for these streams

"success": "boolean",
"message": "string",
"data": {
"message": "string"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this message required?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed. Removed

Base automatically changed from refactor/bff to staging November 7, 2025 12:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants