Skip to content

Commit

Permalink
fix: doc test errors
Browse files Browse the repository at this point in the history
  • Loading branch information
marksalpeter committed Sep 5, 2022
1 parent 3ba0cef commit ae99d19
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 27 deletions.
44 changes: 21 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ If you have another common use case you would like to see covered by this packag

## Cookbook

* [How to run a pipeline until the container is killed](https://github.com/deliveryhero/pipeline#PipelineShutsDownWhenContainerIsKilled)
* [How to shut down a pipeline when there is a error](https://github.com/deliveryhero/pipeline#PipelineShutsDownOnError)
* [How to shut down a pipeline after it has finished processing a batch of data](https://github.com/deliveryhero/pipeline#PipelineShutsDownWhenInputChannelIsClosed)
* [How to run a pipeline until the container is killed](https://github.com/deliveryhero/pipeline#pipelineshutsdownwhencontaineriskilled)
* [How to shut down a pipeline when there is a error](https://github.com/deliveryhero/pipeline#pipelineshutsdownonerror)
* [How to shut down a pipeline after it has finished processing a batch of data](https://github.com/deliveryhero/pipeline#pipelineshutsdownwheninputchannelisclosed)

## Functions

Expand Down Expand Up @@ -232,6 +232,7 @@ ProcessBatchConcurrently fans the in channel out to multiple batch Processors ru
then it fans the out channels of the batch Processors back into a single out chan

```golang

// Create a context that times out after 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand All @@ -253,19 +254,17 @@ p = pipeline.ProcessBatchConcurrently(ctx, 2, 2, time.Minute, pipeline.NewProces
for result := range p {
fmt.Printf("result: %d\n", result)
}
```

Output:
// Example Output:
// result: 1
// result: 2
// result: 3
// result: 5
// error: could not process [7 8], context deadline exceeded
// error: could not process [4 6], context deadline exceeded
// error: could not process [9], context deadline exceeded

```
result: 1
result: 2
result: 3
result: 5
error: could not process [7 8], context deadline exceeded
error: could not process [4 6], context deadline exceeded
error: could not process [9], context deadline exceeded
```

### func [ProcessConcurrently](/process.go#L26)

Expand All @@ -275,6 +274,7 @@ ProcessConcurrently fans the in channel out to multiple Processors running concu
then it fans the out channels of the Processors back into a single out chan

```golang

// Create a context that times out after 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand All @@ -296,19 +296,17 @@ p = pipeline.ProcessConcurrently(ctx, 2, pipeline.NewProcessor(func(ctx context.
for result := range p {
log.Printf("result: %d\n", result)
}
```

Output:
// Example Output:
// result: 2
// result: 1
// result: 4
// result: 3
// error: could not process 6, process was canceled
// error: could not process 5, process was canceled
// error: could not process 7, context deadline exceeded

```
result: 2
result: 1
result: 4
result: 3
error: could not process 6, process was canceled
error: could not process 5, process was canceled
error: could not process 7, context deadline exceeded
```

### func [Split](/split.go#L4)

Expand Down
2 changes: 1 addition & 1 deletion cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestCancel(t *testing.T) {
// Start canceling the pipeline about half way through the test
ctx, cancel := context.WithTimeout(context.Background(), testDuration/2)
defer cancel()
for i := range Cancel[int](ctx, canceled, in) {
for i := range Cancel(ctx, canceled, in) {
logf("%d", i)
}

Expand Down
2 changes: 1 addition & 1 deletion collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func TestCollect(t *testing.T) {
defer cancel()

// Collect responses
collect := Collect[int](ctx, test.args.maxSize, test.args.maxDuration, in)
collect := Collect(ctx, test.args.maxSize, test.args.maxDuration, in)
timeout := time.After(maxTestDuration)
var outs [][]int
var isOpen bool
Expand Down
2 changes: 1 addition & 1 deletion process_batch_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func ExampleProcessBatchConcurrently() {
fmt.Printf("result: %d\n", result)
}

// Output:
// Example Output:
// result: 1
// result: 2
// result: 3
Expand Down
2 changes: 1 addition & 1 deletion process_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func ExampleProcessConcurrently() {
log.Printf("result: %d\n", result)
}

// Output:
// Example Output:
// result: 2
// result: 1
// result: 4
Expand Down

0 comments on commit ae99d19

Please sign in to comment.