Pipeline is a go library that helps you build pipelines without worrying about channel management and concurrency. It contains common fan-in and fan-out operations as well as useful utility funcs for batch processing and scaling.
If you have another common use case you would like to see covered by this package, please open a feature request.
- How to run a pipeline until the container is killed
- How to shut down a pipeline when there is a error
- How to shut down a pipeline after it has finished processing a batch of data
func Apply
func Apply[A, B, C any](a Processor[A, []B], b Processor[B, C]) Processor[A, []C]
Apply connects two processes, applying the second to each item of the first output
transform := pipeline.NewProcessor(func(_ context.Context, s string) ([]string, error) {
return strings.Split(s, ","), nil
}, nil)
double := pipeline.NewProcessor(func(_ context.Context, s string) (string, error) {
return s + s, nil
}, nil)
addLeadingZero := pipeline.NewProcessor(func(_ context.Context, s string) (string, error) {
return "0" + s, nil
}, nil)
apply := pipeline.Apply(
transform,
pipeline.Sequence(
double,
addLeadingZero,
double,
),
)
input := "1,2,3,4,5"
for out := range pipeline.Process(context.Background(), apply, pipeline.Emit(input)) {
for j := range out {
fmt.Printf("process: %s\n", out[j])
}
}
Output:
process: 011011
process: 022022
process: 033033
process: 044044
process: 055055
func Buffer
func Buffer[Item any](size int, in <-chan Item) <-chan Item
Buffer creates a buffered channel that will close after the input is closed and the buffer is fully drained
func Cancel
func Cancel[Item any](ctx context.Context, cancel func(Item, error), in <-chan Item) <-chan Item
Cancel passes an Item any
from the in <-chan Item
directly to the out <-chan Item
until the Context
is canceled.
After the context is canceled, everything from in <-chan Item
is sent to the cancel
func instead with the ctx.Err()
.
// Create a context that lasts for 1 second
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// Create a basic pipeline that emits one int every 250ms
p := pipeline.Delay(ctx, time.Second/4,
pipeline.Emit(1, 2, 3, 4, 5),
)
// If the context is canceled, pass the ints to the cancel func for teardown
p = pipeline.Cancel(ctx, func(i int, err error) {
fmt.Printf("%+v could not be processed, %s\n", i, err)
}, p)
// Otherwise, process the inputs
for out := range p {
fmt.Printf("process: %+v\n", out)
}
Output:
process: 1
process: 2
process: 3
process: 4
5 could not be processed, context deadline exceeded
func Collect
func Collect[Item any](ctx context.Context, maxSize int, maxDuration time.Duration, in <-chan Item) <-chan []Item
Collect collects [Item any]
s from its in channel and returns []Item
from its out channel.
It will collect up to maxSize
inputs from the in <-chan Item
over up to maxDuration
before returning them as []Item
.
That means when maxSize
is reached before maxDuration
, [maxSize]Item
will be passed to the out channel.
But if maxDuration
is reached before maxSize
inputs are collected, [< maxSize]Item
will be passed to the out channel.
When the context
is canceled, everything in the buffer will be flushed to the out channel.
func Delay
func Delay[Item any](ctx context.Context, duration time.Duration, in <-chan Item) <-chan Item
Delay delays reading each input by duration
.
If the context is canceled, the delay will not be applied.
func Drain
func Drain[Item any](in <-chan Item)
Drain empties the input and blocks until the channel is closed
func Emit
func Emit[Item any](is ...Item) <-chan Item
Emit fans is ...Item`` out to a
<-chan Item`
func Emitter
func Emitter[Item any](ctx context.Context, next func() Item) <-chan Item
Emitter continuously emits new items generated by the next func until the context is canceled
func Merge
func Merge[Item any](ins ...<-chan Item) <-chan Item
Merge fans multiple channels in to a single channel
one := pipeline.Emit(1)
two := pipeline.Emit(2, 2)
three := pipeline.Emit(3, 3, 3)
for i := range pipeline.Merge(one, two, three) {
fmt.Printf("output: %d\n", i)
}
fmt.Println("done")
Output:
Output:: 1
Output:: 3
Output:: 2
Output:: 2
Output:: 3
Output:: 3
done
func Process
func Process[Input, Output any](ctx context.Context, processor Processor[Input, Output], in <-chan Input) <-chan Output
Process takes each input from the in <-chan Input
and calls Processor.Process
on it.
When Processor.Process
returns an Output
, it will be sent to the output <-chan Output
.
If Processor.Process
returns an error, Processor.Cancel
will be called with the corresponding input and error message.
Finally, if the Context
is canceled, all inputs remaining in the in <-chan Input
will go directly to Processor.Cancel
.
// Create a context that times out after 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Create a pipeline that emits 1-6 at a rate of one int per second
p := pipeline.Delay(ctx, time.Second, pipeline.Emit(1, 2, 3, 4, 5, 6))
// Multiply each number by 10
p = pipeline.Process(ctx, pipeline.NewProcessor(func(ctx context.Context, in int) (int, error) {
return in * 10, nil
}, func(i int, err error) {
fmt.Printf("error: could not multiply %v, %s\n", i, err)
}), p)
// Finally, lets print the results and see what happened
for result := range p {
fmt.Printf("result: %d\n", result)
}
Output:
result: 10
result: 20
result: 30
result: 40
result: 50
error: could not multiply 6, context deadline exceeded
func ProcessBatch
func ProcessBatch[Input, Output any]( ctx context.Context, maxSize int, maxDuration time.Duration, processor Processor[[]Input, []Output], in <-chan Input, ) <-chan Output
ProcessBatch collects up to maxSize elements over maxDuration and processes them together as a slice of Input
s.
It passed an []Output to the Processor.Process
method and expects a []Input back.
It passes []Input batches of inputs to the Processor.Cancel
method.
If the receiver is backed up, ProcessBatch can holds up to 2x maxSize.
// Create a context that times out after 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Create a pipeline that emits 1-6 at a rate of one int per second
p := pipeline.Delay(ctx, time.Second, pipeline.Emit(1, 2, 3, 4, 5, 6))
// Multiply every 2 adjacent numbers together
p = pipeline.ProcessBatch(ctx, 2, time.Minute, pipeline.NewProcessor(func(ctx context.Context, is []int) ([]int, error) {
o := 1
for _, i := range is {
o *= i
}
return []int{o}, nil
}, func(is []int, err error) {
fmt.Printf("error: could not multiply %v, %s\n", is, err)
}), p)
// Finally, lets print the results and see what happened
for result := range p {
fmt.Printf("result: %d\n", result)
}
Output:
result: 2
result: 12
error: could not multiply [5 6], context deadline exceeded
func ProcessBatchConcurrently[Input, Output any]( ctx context.Context, concurrently, maxSize int, maxDuration time.Duration, processor Processor[[]Input, []Output], in <-chan Input, ) <-chan Output
ProcessBatchConcurrently fans the in channel out to multiple batch Processors running concurrently, then it fans the out channels of the batch Processors back into a single out chan
// Create a context that times out after 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Create a pipeline that emits 1-9
p := pipeline.Emit(1, 2, 3, 4, 5, 6, 7, 8, 9)
// Add a 1 second delay to each number
p = pipeline.Delay(ctx, time.Second, p)
// Group two inputs at a time
p = pipeline.ProcessBatchConcurrently(ctx, 2, 2, time.Minute, pipeline.NewProcessor(func(ctx context.Context, ins []int) ([]int, error) {
return ins, nil
}, func(i []int, err error) {
fmt.Printf("error: could not process %v, %s\n", i, err)
}), p)
// Finally, lets print the results and see what happened
for result := range p {
fmt.Printf("result: %d\n", result)
}
Output:
result: 1
result: 2
result: 3
result: 5
error: could not process [7 8], context deadline exceeded
error: could not process [4 6], context deadline exceeded
error: could not process [9], context deadline exceeded
func ProcessConcurrently
func ProcessConcurrently[Input, Output any](ctx context.Context, concurrently int, p Processor[Input, Output], in <-chan Input) <-chan Output
ProcessConcurrently fans the in channel out to multiple Processors running concurrently, then it fans the out channels of the Processors back into a single out chan
// Create a context that times out after 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Create a pipeline that emits 1-7
p := pipeline.Emit(1, 2, 3, 4, 5, 6, 7)
// Add a two second delay to each number
p = pipeline.Delay(ctx, 2*time.Second, p)
// Add two concurrent processors that pass input numbers to the output
p = pipeline.ProcessConcurrently(ctx, 2, pipeline.NewProcessor(func(ctx context.Context, in int) (int, error) {
return in, nil
}, func(i int, err error) {
fmt.Printf("error: could not process %v, %s\n", i, err)
}), p)
// Finally, lets print the results and see what happened
for result := range p {
log.Printf("result: %d\n", result)
}
Output:
result: 2
result: 1
result: 4
result: 3
error: could not process 6, process was canceled
error: could not process 5, process was canceled
error: could not process 7, context deadline exceeded
func Split
func Split[Item any](in <-chan []Item) <-chan Item
Split takes an interface from Collect and splits it back out into individual elements
The following example shows how you can shutdown a pipeline gracefully when it receives an error message
// Create a context that can be canceled
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create a pipeline that emits 1-10
p := pipeline.Emit(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// A step that will shutdown the pipeline if the number is greater than 1
p = pipeline.Process(ctx, pipeline.NewProcessor(func(ctx context.Context, i int) (int, error) {
// Shut down the pipeline by canceling the context
if i != 1 {
cancel()
return i, fmt.Errorf("%d caused the shutdown", i)
}
return i, nil
}, func(i int, err error) {
// The cancel func is called when an error is returned by the process func or the context is canceled
fmt.Printf("could not process %d: %s\n", i, err)
}), p)
// Finally, lets print the results and see what happened
for result := range p {
fmt.Printf("result: %d\n", result)
}
fmt.Println("exiting the pipeline after all data is processed")
Output:
could not process 2: 2 caused the shutdown
result: 1
could not process 3: context canceled
could not process 4: context canceled
could not process 5: context canceled
could not process 6: context canceled
could not process 7: context canceled
could not process 8: context canceled
could not process 9: context canceled
could not process 10: context canceled
exiting the pipeline after all data is processed
This example demonstrates a pipline that runs until the os / container the pipline is running in kills it
// Gracefully shutdown the pipeline when the the system is shutting down
// by canceling the context when os.Kill or os.Interrupt signal is sent
ctx, cancel := signal.NotifyContext(context.Background(), os.Kill, os.Interrupt)
defer cancel()
// Create a pipeline that keeps emitting numbers sequentially until the context is canceled
var count int
p := pipeline.Emitter(ctx, func() int {
count++
return count
})
// Filter out only even numbers
p = pipeline.Process(ctx, pipeline.NewProcessor(func(ctx context.Context, i int) (int, error) {
if i%2 == 0 {
return i, nil
}
return i, fmt.Errorf("'%d' is an odd number", i)
}, func(i int, err error) {
fmt.Printf("error processing '%v': %s\n", i, err)
}), p)
// Wait a few nanoseconds an simulate the os.Interrupt signal
go func() {
time.Sleep(time.Millisecond / 10)
fmt.Print("\n--- os kills the app ---\n\n")
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
}()
// Finally, lets print the results and see what happened
for result := range p {
fmt.Printf("result: %d\n", result)
}
fmt.Println("exiting after the input channel is closed")
Output:
error processing '1': '1' is an odd number
result: 2
--- os kills the app ---
error processing '3': '3' is an odd number
error processing '4': context canceled
exiting after the input channel is closed
The following example demonstrates a pipeline that naturally finishes its run when the input channel is closed
// Create a context that can be canceled
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create a pipeline that emits 1-10 and then closes its output channel
p := pipeline.Emit(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// Multiply every number by 2
p = pipeline.Process(ctx, pipeline.NewProcessor(func(ctx context.Context, i int) (int, error) {
return i * 2, nil
}, func(i int, err error) {
fmt.Printf("could not multiply %d: %s\n", i, err)
}), p)
// Finally, lets print the results and see what happened
for result := range p {
fmt.Printf("result: %d\n", result)
}
fmt.Println("exiting after the input channel is closed")
Output:
result: 2
result: 4
result: 6
result: 8
result: 10
result: 12
result: 14
result: 16
result: 18
result: 20
exiting after the input channel is closed