Skip to content

Commit

Permalink
Parallel Task Processor (#2994)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Jan 28, 2020
1 parent 543cc7c commit 1d9a7d7
Show file tree
Hide file tree
Showing 13 changed files with 751 additions and 122 deletions.
10 changes: 10 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,8 @@ const (

// SequentialTaskProcessingScope is used by sequential task processing logic
SequentialTaskProcessingScope
// ParallelTaskProcessingScope is used by parallel task processing logic
ParallelTaskProcessingScope

// HistoryArchiverScope is used by history archivers
HistoryArchiverScope
Expand Down Expand Up @@ -1220,6 +1222,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
ElasticsearchCountWorkflowExecutionsScope: {operation: "CountWorkflowExecutions"},
ElasticsearchDeleteWorkflowExecutionsScope: {operation: "DeleteWorkflowExecution"},
SequentialTaskProcessingScope: {operation: "SequentialTaskProcessing"},
ParallelTaskProcessingScope: {operation: "ParallelTaskProcessing"},

HistoryArchiverScope: {operation: "HistoryArchiver"},
VisibilityArchiverScope: {operation: "VisibilityArchiver"},
Expand Down Expand Up @@ -1485,6 +1488,10 @@ const (
SequentialTaskQueueProcessingLatency
SequentialTaskTaskProcessingLatency

ParallelTaskSubmitRequest
ParallelTaskSubmitLatency
ParallelTaskTaskProcessingLatency

HistoryArchiverArchiveNonRetryableErrorCount
HistoryArchiverArchiveTransientErrorCount
HistoryArchiverArchiveSuccessCount
Expand Down Expand Up @@ -1828,6 +1835,9 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
SequentialTaskQueueSize: {metricName: "sequentialtask_queue_size", metricType: Timer},
SequentialTaskQueueProcessingLatency: {metricName: "sequentialtask_queue_processing_latency", metricType: Timer},
SequentialTaskTaskProcessingLatency: {metricName: "sequentialtask_task_processing_latency", metricType: Timer},
ParallelTaskSubmitRequest: {metricName: "paralleltask_submit_request", metricType: Counter},
ParallelTaskSubmitLatency: {metricName: "paralleltask_submit_latency", metricType: Timer},
ParallelTaskTaskProcessingLatency: {metricName: "paralleltask_task_processing_latency", metricType: Timer},

HistoryArchiverArchiveNonRetryableErrorCount: {metricName: "history_archiver_archive_non_retryable_error", metricType: Counter},
HistoryArchiverArchiveTransientErrorCount: {metricName: "history_archiver_archive_transient_error", metricType: Counter},
Expand Down
34 changes: 25 additions & 9 deletions common/task/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,27 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination interface_mock.go -self_package github.com/uber/cadence/common/task

package task

import (
"github.com/uber/cadence/common"
)

type (
// SequentialTaskProcessor is the generic coroutine pool interface
// which process sequential task
SequentialTaskProcessor interface {
// Processor is the generic coroutine pool interface
// which process tasks
Processor interface {
common.Daemon
Submit(task SequentialTask) error
Submit(task Task) error
}

// SequentialTask is the interface for tasks which should be executed sequentially
SequentialTask interface {
// State represents the current state of a task
State int

// Task is the interface for tasks which should be executed sequentially
Task interface {
// Execute process this task
Execute() error
// HandleErr handle the error returned by Execute
Expand All @@ -44,24 +49,35 @@ type (
Ack()
// Nack marks the task as unsuccessful completed
Nack()
// State returns the current task state
State() State
}

// SequentialTaskQueueFactory is the function which generate a new SequentialTaskQueue
// for a give SequentialTask
SequentialTaskQueueFactory func(task SequentialTask) SequentialTaskQueue
SequentialTaskQueueFactory func(task Task) SequentialTaskQueue

// SequentialTaskQueue is the generic task queue interface which group
// sequential tasks to be executed one by one
SequentialTaskQueue interface {
// QueueID return the ID of the queue, as well as the tasks inside (same)
QueueID() interface{}
// Offer push an task to the task set
Add(task SequentialTask)
Add(task Task)
// Poll pop an task from the task set
Remove() SequentialTask
Remove() Task
// IsEmpty indicate if the task set is empty
IsEmpty() bool
// Len return the size of the queue
Len() int
}
)

const (
// TaskStatePending is the state for a task when it's waiting to be processed or currently being processed
TaskStatePending State = iota + 1
// TaskStateAcked is the state for a task if it has been successfully completed
TaskStateAcked
// TaskStateNacked is the state for a task if it can not be processed
TaskStateNacked
)
Loading

0 comments on commit 1d9a7d7

Please sign in to comment.