Skip to content

Commit

Permalink
Errgroup int (#84)
Browse files Browse the repository at this point in the history
* integrate errgroup

errgroup doesn't change at all, and so I'm removing it as an external dependency.  This makes dnadesign more self-contained, which is a goal.
  • Loading branch information
Koeng101 authored Aug 13, 2024
1 parent 0cc4f7c commit 446e0ea
Show file tree
Hide file tree
Showing 15 changed files with 749 additions and 15 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ On the highest level:
* [external](https://pkg.go.dev/github.com/koeng101/dnadesign/external) contains integrations with external bioinformatics software, usually operating on the command line.
* [external/minimap2](https://pkg.go.dev/github.com/koeng101/dnadesign/external/minimap2) contains a function for working with [minimap2](https://github.com/lh3/minimap2) with Go.
* [external/samtools](https://pkg.go.dev/github.com/koeng101/dnadesign/external/samtools) contains a function for generating pileup files using [samtools](https://github.com/samtools/samtools) with Go.
* [external/bcftools](https://pkg.go.dev/github.com/koeng101/dnadesign/external/bcftools) contains GenerateVCF to generate a VCF file from sam alignments using [bcftools](https://samtools.github.io/bcftools/) with Go.

## Python

Expand All @@ -60,6 +61,7 @@ There are a few pieces of "complete" software that we have directly integrated i
- [svb](https://github.com/rleiwang/svb) in `lib/bio/slow5/svb`
- [intel-cpuid](https://github.com/aregm/cpuid) in `lib/bio/slow5/svb/intel-cpuid`
- [wordwrap](https://github.com/mitchellh/go-wordwrap) in `lib/bio/genbank`
- [errgroup](https://cs.opensource.google/go/x/sync/+/master:errgroup/) in `lib/bio/`

## Other

Expand All @@ -73,6 +75,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
- Integrated errgroup into source tree [#84](https://github.com/Koeng101/dnadesign/pull/84)
- Added kmer detection for ligation events in cloning and removed enzyme manager [#83](https://github.com/Koeng101/dnadesign/pull/83)
- Added option for linear ligations [#82](https://github.com/Koeng101/dnadesign/pull/82)
- Added minimal python packaging [#81](https://github.com/Koeng101/dnadesign/pull/81)
Expand Down
2 changes: 1 addition & 1 deletion lib/bio/bio.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ import (
"io"
"math"

"github.com/koeng101/dnadesign/lib/bio/errgroup"
"github.com/koeng101/dnadesign/lib/bio/fasta"
"github.com/koeng101/dnadesign/lib/bio/fastq"
"github.com/koeng101/dnadesign/lib/bio/genbank"
"github.com/koeng101/dnadesign/lib/bio/pileup"
"github.com/koeng101/dnadesign/lib/bio/sam"
"github.com/koeng101/dnadesign/lib/bio/slow5"
"github.com/koeng101/dnadesign/lib/bio/uniprot"
"golang.org/x/sync/errgroup"
)

// Format is a enum of different parser formats.
Expand Down
28 changes: 28 additions & 0 deletions lib/bio/errgroup/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
Copyright 2009 The Go Authors.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:

* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Google LLC nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

3 changes: 3 additions & 0 deletions lib/bio/errgroup/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
This is errgroup from `https://cs.opensource.google/go/x/sync`. It is integrated into the source tree because it doesn't change a lot and is one of the only external dependencies we use, especially for filtering of `bio` parser results.

I modified the code to pass the Go linter requirements.
135 changes: 135 additions & 0 deletions lib/bio/errgroup/errgroup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Package errgroup provides synchronization, error propagation, and Context
// cancelation for groups of goroutines working on subtasks of a common task.
//
// [errgroup.Group] is related to [sync.WaitGroup] but adds handling of tasks
// returning errors.
package errgroup

import (
"context"
"fmt"
"sync"
)

type token struct{}

// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero Group is valid, has no limit on the number of active goroutines,
// and does not cancel on error.
type Group struct {
cancel func(error)

wg sync.WaitGroup

sem chan token

errOnce sync.Once
err error
}

func (g *Group) done() {
if g.sem != nil {
<-g.sem
}
g.wg.Done()
}

// WithContext returns a new Group and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := withCancelCause(ctx)
return &Group{cancel: cancel}, ctx
}

// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel(g.err)
}
return g.err
}

// Go calls the given function in a new goroutine.
// It blocks until the new goroutine can be added without the number of
// active goroutines in the group exceeding the configured limit.
//
// The first call to return a non-nil error cancels the group's context, if the
// group was created by calling WithContext. The error will be returned by Wait.
func (g *Group) Go(f func() error) {
if g.sem != nil {
g.sem <- token{}
}

g.wg.Add(1)
go func() {
defer g.done()

if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel(g.err)
}
})
}
}()
}

// TryGo calls the given function in a new goroutine only if the number of
// active goroutines in the group is currently below the configured limit.
//
// The return value reports whether the goroutine was started.
func (g *Group) TryGo(f func() error) bool {
if g.sem != nil {
select {
case g.sem <- token{}:
// Note: this allows barging iff channels in general allow barging.
default:
return false
}
}

g.wg.Add(1)
go func() {
defer g.done()

if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel(g.err)
}
})
}
}()
return true
}

// SetLimit limits the number of active goroutines in this group to at most n.
// A negative value indicates no limit.
//
// Any subsequent call to the Go method will block until it can add an active
// goroutine without exceeding the configured limit.
//
// The limit must not be modified while any goroutines in the group are active.
func (g *Group) SetLimit(n int) {
if n < 0 {
g.sem = nil
return
}
if len(g.sem) != 0 {
panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
}
g.sem = make(chan token, n)
}
100 changes: 100 additions & 0 deletions lib/bio/errgroup/errgroup_example_md5all_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package errgroup_test

import (
"context"
"crypto/md5"
"fmt"
"log"
"os"
"path/filepath"

"github.com/koeng101/dnadesign/lib/bio/errgroup"
)

// Pipeline demonstrates the use of a Group to implement a multi-stage
// pipeline: a version of the MD5All function with bounded parallelism from
// https://blog.golang.org/pipelines.
func ExampleGroup_pipeline() {
m, err := MD5All(context.Background(), ".")
if err != nil {
log.Fatal(err)
}

for k, sum := range m {
fmt.Printf("%s:\t%x\n", k, sum)
}
}

type result struct {
path string
sum [md5.Size]byte
}

// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents. If the directory walk
// fails or any read operation fails, MD5All returns an error.
func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) {
// ctx is canceled when g.Wait() returns. When this version of MD5All returns
// - even in case of error! - we know that all of the goroutines have finished
// and the memory they were using can be garbage-collected.
g, ctx := errgroup.WithContext(ctx)
paths := make(chan string)

g.Go(func() error {
defer close(paths)
return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
select {
case paths <- path:
case <-ctx.Done():
return ctx.Err()
}
return nil
})
})

// Start a fixed number of goroutines to read and digest files.
c := make(chan result)
const numDigesters = 20
for i := 0; i < numDigesters; i++ {
g.Go(func() error {
for path := range paths {
data, err := os.ReadFile(path)
if err != nil {
return err
}
select {
case c <- result{path, md5.Sum(data)}:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})
}
go func() {
_ = g.Wait()
close(c)
}()

m := make(map[string][md5.Size]byte)
for r := range c {
m[r.path] = r.sum
}
// Check whether any of the goroutines failed. Since g is accumulating the
// errors, we don't need to send them (or check for them) in the individual
// results sent on the channel.
if err := g.Wait(); err != nil {
return nil, err
}
return m, nil
}
Loading

0 comments on commit 446e0ea

Please sign in to comment.