Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
244 changes: 126 additions & 118 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,134 +1,137 @@
# Go-Taskflow
# go-taskflow

[![codecov](https://codecov.io/github/noneback/go-taskflow/graph/badge.svg?token=CITXYA10C6)](https://codecov.io/github/noneback/go-taskflow)
[![Go Reference](https://pkg.go.dev/badge/github.com/noneback/go-taskflow.svg)](https://pkg.go.dev/github.com/noneback/go-taskflow)
[![Go Report Card](https://goreportcard.com/badge/github.com/noneback/go-taskflow)](https://goreportcard.com/report/github.com/noneback/go-taskflow)
[![Mentioned in Awesome Go](https://awesome.re/mentioned-badge.svg)](https://github.com/avelino/awesome-go)

![go-taskflow](https://socialify.git.ci/noneback/go-taskflow/image?description=1&language=1&name=1&pattern=Solid&theme=Auto)

A General-purpose Task-parallel Programming Framework for Go, inspired by [taskflow-cpp](https://github.com/taskflow/taskflow), with Go's native capabilities and simplicity, suitable for complex dependency management in concurrent tasks.

## Feature
- **High extensibility**: Easily extend the framework to adapt to various specific use cases.

- **Native Go's concurrency model**: Leverages Go's goroutines to manage concurrent task execution effectively.

- **User-friendly programming interface**: Simplify complex task dependency management using Go.
go-taskflow is a general-purpose task-parallel programming framework for Go, inspired by [taskflow-cpp](https://github.com/taskflow/taskflow). It leverages Go's native capabilities and simplicity, making it ideal for managing complex dependencies in concurrent tasks.

- **Static\Subflow\Conditional\Cyclic tasking**: Define static tasks, condition nodes, nested subflows and cyclic flow to enhance modularity and programmability.
## Features

| Static | Subflow | Condition | Cyclic |
|:-----------|:------------:|------------:|------------:|
| ![](image/simple.svg) | ![](image/subflow.svg) | ![](image/condition.svg) | ![](image/loop.svg) |
- **High Extensibility**: Easily extend the framework to adapt to various specific use cases.
- **Native Go Concurrency Model**: Leverages Go's goroutines for efficient concurrent task execution.
- **User-Friendly Programming Interface**: Simplifies complex task dependency management in Go.
- **Static, Subflow, Conditional, and Cyclic Tasking**: Define static tasks, condition nodes, nested subflows, and cyclic flows to enhance modularity and programmability.

- **Priority Task Schedule**: Define tasks' priority, higher priority tasks will be scheduled first.
| Static | Subflow | Condition | Cyclic |
|:-----------|:------------:|------------:|------------:|
| ![](image/simple.svg) | ![](image/subflow.svg) | ![](image/condition.svg) | ![](image/loop.svg) |

- **Built-in visualization & profiling tools**: Generate visual representations of tasks and profile task execution performance using integrated tools, making debugging and optimization easier.
- **Priority Task Scheduling**: Assign task priorities to ensure higher-priority tasks are executed first.
- **Built-in Visualization and Profiling Tools**: Generate visual representations of tasks and profile task execution performance using integrated tools, simplifying debugging and optimization.

## Use Cases

- **Data Pipeline**: Orchestrate data processing stages that have complex dependencies.
- **Data Pipelines**: Orchestrate data processing stages with complex dependencies.
- **AI Agent Workflow Automation**: Define and execute AI agent workflows with clear sequences and dependency structures.
- **Parallel Graph Tasking**: Execute graph-based tasks concurrently to maximize CPU utilization.

- **AI Agent Workflow Automation**: Define and run AI Agent automation workflows where tasks have a clear sequence and dependency structure.
## Installation

- **Parallel Graph Tasking**: Execute Graph-based tasks concurrently to fully utilize CPU resources.
Import the latest version of go-taskflow using:

```bash
go get -u github.com/noneback/go-taskflow
```

## Example
import latest version: `go get -u github.com/noneback/go-taskflow`

Below is an example of using go-taskflow to implement a parallel merge sort:

```go
package main

import (
"fmt"
"log"
"math/rand"
"os"
"slices"
"strconv"
"sync"

gtf "github.com/noneback/go-taskflow"
"fmt"
"log"
"math/rand"
"os"
"slices"
"strconv"
"sync"

gtf "github.com/noneback/go-taskflow"
)

// merge sorted src to sorted dest
// mergeInto merges a sorted source array into a sorted destination array.
func mergeInto(dest, src []int) []int {
size := len(dest) + len(src)
tmp := make([]int, 0, size)
i, j := 0, 0
for i < len(dest) && j < len(src) {
if dest[i] < src[j] {
tmp = append(tmp, dest[i])
i++
} else {
tmp = append(tmp, src[j])
j++
}
}

if i < len(dest) {
tmp = append(tmp, dest[i:]...)
} else {
tmp = append(tmp, src[j:]...)
}

return tmp
size := len(dest) + len(src)
tmp := make([]int, 0, size)
i, j := 0, 0
for i < len(dest) && j < len(src) {
if dest[i] < src[j] {
tmp = append(tmp, dest[i])
i++
} else {
tmp = append(tmp, src[j])
j++
}
}

if i < len(dest) {
tmp = append(tmp, dest[i:]...)
} else {
tmp = append(tmp, src[j:]...)
}

return tmp
}
func main() {
size := 100
radomArr := make([][]int, 10)
sortedArr := make([]int, 0, 10*size)
mutex := &sync.Mutex{}

for i := 0; i < 10; i++ {
for j := 0; j < size; j++ {
radomArr[i] = append(radomArr[i], rand.Int())
}
}

sortTasks := make([]*gtf.Task, 10)
tf := gtf.NewTaskFlow("merge sort")
done := tf.NewTask("Done", func() {
if !slices.IsSorted(sortedArr) {
log.Fatal("Failed")
}
fmt.Println("Sorted")
fmt.Println(sortedArr[:1000])
})

for i := 0; i < 10; i++ {
sortTasks[i] = tf.NewTask("sort_"+strconv.Itoa(i), func() {
arr := radomArr[i]
slices.Sort(arr)
mutex.Lock()
defer mutex.Unlock()
sortedArr = mergeInto(sortedArr, arr)
})

}
done.Succeed(sortTasks...)

executor := gtf.NewExecutor(1000)

executor.Run(tf).Wait()

if err := tf.Dump(os.Stdout); err != nil {
log.Fatal("V->", err)
}

if err := executor.Profile(os.Stdout); err != nil {
log.Fatal("P->", err)
}

func main() {
size := 100
randomArr := make([][]int, 10)
sortedArr := make([]int, 0, 10*size)
mutex := &sync.Mutex{}

for i := 0; i < 10; i++ {
for j := 0; j < size; j++ {
randomArr[i] = append(randomArr[i], rand.Int())
}
}

sortTasks := make([]*gtf.Task, 10)
tf := gtf.NewTaskFlow("merge sort")
done := tf.NewTask("Done", func() {
if !slices.IsSorted(sortedArr) {
log.Fatal("Sorting failed")
}
fmt.Println("Sorted successfully")
fmt.Println(sortedArr[:1000])
})

for i := 0; i < 10; i++ {
sortTasks[i] = tf.NewTask("sort_"+strconv.Itoa(i), func() {
arr := randomArr[i]
slices.Sort(arr)
mutex.Lock()
defer mutex.Unlock()
sortedArr = mergeInto(sortedArr, arr)
})
}
done.Succeed(sortTasks...)

executor := gtf.NewExecutor(1000)

executor.Run(tf).Wait()

if err := tf.Dump(os.Stdout); err != nil {
log.Fatal("Error dumping taskflow:", err)
}

if err := executor.Profile(os.Stdout); err != nil {
log.Fatal("Error profiling taskflow:", err)
}
}
```

[more code examples](https://github.com/noneback/go-taskflow/tree/main/examples)
For more examples, visit the [examples directory](https://github.com/noneback/go-taskflow/tree/main/examples).

## Benchmark
We provide a basic benchmark to give a rough estimate of performance. However, most realistic workloads are I/O-bound, and their performance cannot be accurately reflected by the benchmark results. So, don’t take it too seriously.

If you really care about CPU Performance, we strongly recommend [taskflow-cpp](https://github.com/taskflow/taskflow).
The following benchmark provides a rough estimate of performance. Note that most realistic workloads are I/O-bound, and their performance cannot be accurately reflected by these results. For CPU-intensive tasks, consider using [taskflow-cpp](https://github.com/taskflow/taskflow).

```plaintext
goos: linux
Expand All @@ -143,51 +146,56 @@ PASS
ok github.com/noneback/go-taskflow/benchmark 5.802s
```

## Understand Condition Task Correctly
Condition Node is special in [taskflow-cpp](https://github.com/taskflow/taskflow). It not only enrolls in Condition Control but also in Looping.
## Understanding Conditional Tasks

Our repo keeps almost the same behavior. You should read [ConditionTasking](https://taskflow.github.io/taskflow/ConditionalTasking.html) to avoid common pitfalls.
Conditional nodes in go-taskflow behave similarly to those in [taskflow-cpp](https://github.com/taskflow/taskflow). They participate in both conditional control and looping. To avoid common pitfalls, refer to the [Conditional Tasking documentation](https://taskflow.github.io/taskflow/ConditionalTasking.html).

## Error Handling in go-taskflow

`errors` in golang are values. It is the user's job to handle it correctly.
In Go, `errors` are values, and it is the user's responsibility to handle them appropriately. Only unrecovered `panic` events are managed by the framework. If a `panic` occurs, the entire parent graph is canceled, leaving the remaining tasks incomplete. This behavior may evolve in the future. If you have suggestions, feel free to share them.

Only unrecovered `panic` needs to be addressed by the framework. Now, if it happens, the whole parent graph will be canceled, leaving the rest tasks undone. This behavior may evolve someday. If you have any good thoughts, feel free to let me know.
To prevent interruptions caused by `panic`, you can handle them manually when registering tasks:

If you prefer not to interrupt the whole taskflow when panics occur, you can also handle panics manually while registering tasks.
Eg:
```go
tf.NewTask("not interrupt", func() {
defer func() {
if r := recover(); r != nil {
// deal with it.
}
}()
// user functions.
)
defer func() {
if r := recover(); r != nil {
// Handle the panic.
}
}()
// User-defined logic.
})
```

## How to use visualize taskflow
## Visualizing Taskflows

To generate a visual representation of a taskflow, use the `Dump` method:

```go
if err := tf.Dump(os.Stdout); err != nil {
log.Fatal(err)
log.Fatal(err)
}
```
`tf.Dump` generates raw strings in dot format, use `dot` to draw a Graph svg.

The `Dump` method generates raw strings in DOT format. Use the `dot` tool to create a graph SVG.

![dot](image/desc.svg)

## How to use profile taskflow
## Profiling Taskflows

To profile a taskflow, use the `Profile` method:

```go
if err :=exector.Profile(os.Stdout);err != nil {
log.Fatal(err)
if err := executor.Profile(os.Stdout); err != nil {
log.Fatal(err)
}
```

`Profile` generates raw strings in flamegraph format, use `flamegraph` to draw a flamegraph svg.
The `Profile` method generates raw strings in flamegraph format. Use the `flamegraph` tool to create a flamegraph SVG.

![flg](image/fl.svg)

## Stargazer

[![Star History Chart](https://api.star-history.com/svg?repos=noneback/go-taskflow&type=Date)](https://star-history.com/#noneback/go-taskflow&Date)