Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add utility functions similar to those found in stdlib io package #549

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions storage/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ type PeekableStorage interface {
Peek(ctx context.Context, key string) ([]byte, io.Closer, error)
}

// FinalizableStorage is a future-detection interface in which a storage implementation can be finalized,
// indicating any addition reads, writes, or function calls will fale. FinalizableStorage is analogous
// IPLD concept to io.Closer in the stdlib.
type FinalizableStorage interface {
Finalize() error
}

// the following are all hypothetical additional future interfaces (in varying degress of speculativeness):

// FUTURE: an EnumerableStorage API, that lets you list all keys present?
Expand Down
236 changes: 236 additions & 0 deletions storage/pipe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
package storage

import (
"context"
"errors"
"fmt"
"io"
"sync"
)

type storageUnit struct {
key string
data []byte
}

// onceError is an object that will only store an error once.
type onceError struct {
sync.Mutex // guards following
err error
}

func (a *onceError) Store(err error) {
a.Lock()
defer a.Unlock()
if a.err != nil {
return
}
a.err = err

Check warning on line 28 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L22-L28

Added lines #L22 - L28 were not covered by tests
}
func (a *onceError) Load() error {
a.Lock()
defer a.Unlock()
return a.err

Check warning on line 33 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L30-L33

Added lines #L30 - L33 were not covered by tests
}

// ErrMismatchedKey is the error used when read and write keys don't match
var ErrMismatchedKey = errors.New("put/get keys do not match")

// A pipeStorage is the shared pipeStorage structure underlying ReadablePipeStorage and WritablePipeStorage.
type pipeStorage struct {
wrMu sync.Mutex // Serializes Write operations
wrCh chan storageUnit
rdCh chan struct{}

once sync.Once // Protects closing done
done chan struct{}
rerr onceError
werr onceError
keysLk sync.RWMutex
keys map[string]struct{}
}

func (p *pipeStorage) has(ctx context.Context, key string) (bool, error) {
select {
case <-p.done:
return false, p.readableFinalizeError()
default:

Check warning on line 57 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L53-L57

Added lines #L53 - L57 were not covered by tests
}
p.keysLk.RLock()
defer p.keysLk.RUnlock()
_, ok := p.keys[key]
return ok, nil

Check warning on line 62 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L59-L62

Added lines #L59 - L62 were not covered by tests
}

func (p *pipeStorage) get(ctx context.Context, key string) ([]byte, error) {
select {
case <-p.done:
return nil, p.readableFinalizeError()
default:

Check warning on line 69 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L65-L69

Added lines #L65 - L69 were not covered by tests
}

select {
case su := <-p.wrCh:
p.rdCh <- struct{}{}
if su.key != key {
return nil, fmt.Errorf("%w: put %s, got %s", ErrMismatchedKey, su.key, key)
}
return su.data, nil
case <-ctx.Done():
return nil, ctx.Err()
case <-p.done:
return nil, p.readableFinalizeError()

Check warning on line 82 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L72-L82

Added lines #L72 - L82 were not covered by tests
}
}

func (p *pipeStorage) finalizeReadable(err error) error {
if err == nil {
err = io.ErrClosedPipe
}
p.rerr.Store(err)
p.once.Do(func() { close(p.done) })
return nil

Check warning on line 92 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L86-L92

Added lines #L86 - L92 were not covered by tests
}

func (p *pipeStorage) put(ctx context.Context, key string, b []byte) error {
select {
case <-p.done:
return p.writableFinalizeError()
default:
p.wrMu.Lock()
defer p.wrMu.Unlock()

Check warning on line 101 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L95-L101

Added lines #L95 - L101 were not covered by tests
}

select {
case p.wrCh <- storageUnit{key, b}:
p.keysLk.Lock()
p.keys[key] = struct{}{}
p.keysLk.Unlock()
<-p.rdCh
return nil
case <-ctx.Done():
return ctx.Err()
case <-p.done:
return p.writableFinalizeError()

Check warning on line 114 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L104-L114

Added lines #L104 - L114 were not covered by tests
}
}

func (p *pipeStorage) finalizeWritable(err error) error {
if err == nil {
err = io.EOF
}
p.werr.Store(err)
p.once.Do(func() { close(p.done) })
return nil

Check warning on line 124 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L118-L124

Added lines #L118 - L124 were not covered by tests
}

// readableFinalizeError is considered internal to the pipe type.
func (p *pipeStorage) readableFinalizeError() error {
rerr := p.rerr.Load()
if werr := p.werr.Load(); rerr == nil && werr != nil {
return werr
}
return io.ErrClosedPipe

Check warning on line 133 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L128-L133

Added lines #L128 - L133 were not covered by tests
}

// writableFinalizeError is considered internal to the pipe type.
func (p *pipeStorage) writableFinalizeError() error {
werr := p.werr.Load()
if rerr := p.rerr.Load(); werr == nil && rerr != nil {
return rerr
}
return io.ErrClosedPipe

Check warning on line 142 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L137-L142

Added lines #L137 - L142 were not covered by tests
}

// A ReadablePipeStorage is the read half of a pipe.
type ReadablePipeStorage struct {
p *pipeStorage
}

// Has implements the Storage interface
func (r *ReadablePipeStorage) Has(ctx context.Context, key string) (bool, error) {
return r.p.has(ctx, key)

Check warning on line 152 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L151-L152

Added lines #L151 - L152 were not covered by tests
}

// Get implements the ReadableStorage interface:
// it reads data from the pipe, blocking until a writer
// arrives or the write end is closed.
// If the write end is closed with an error, that error is
// returned as err; otherwise err is EOF.
func (r *ReadablePipeStorage) Get(ctx context.Context, key string) ([]byte, error) {
return r.p.get(ctx, key)

Check warning on line 161 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L160-L161

Added lines #L160 - L161 were not covered by tests
}

// Finalize closes the reader; subsequent writes to the
// write half of the pipe will return the error ErrClosedPipe.
func (r *ReadablePipeStorage) Finalize() error {
return r.FinalizeWithError(nil)

Check warning on line 167 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L166-L167

Added lines #L166 - L167 were not covered by tests
}

// FinalizeWithError closes the reader; subsequent writes
// to the write half of the pipe will return the error err.
//
// FinalizeWithError never overwrites the previous error if it exists
// and always returns nil.
func (r *ReadablePipeStorage) FinalizeWithError(err error) error {
return r.p.finalizeReadable(err)

Check warning on line 176 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L175-L176

Added lines #L175 - L176 were not covered by tests
}

// A WritablePipeStorage is the write half of a pipe.
type WritablePipeStorage struct {
p *pipeStorage
}

// Has implements the Storage interface
func (w *WritablePipeStorage) Has(ctx context.Context, key string) (bool, error) {
return w.p.has(ctx, key)

Check warning on line 186 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L185-L186

Added lines #L185 - L186 were not covered by tests
}

// Put implements the standard Write interface:
// it writes data to the pipe, blocking until one or more readers
// have consumed all the data or the read end is closed.
// If the read end is closed with an error, that err is
// returned as err; otherwise err is ErrClosedPipe.
func (w *WritablePipeStorage) Put(ctx context.Context, key string, data []byte) error {
return w.p.put(ctx, key, data)

Check warning on line 195 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L194-L195

Added lines #L194 - L195 were not covered by tests
}

// Finalize closes the writer; subsequent reads from the
// read half of the pipe will return no bytes and EOF.
func (w *WritablePipeStorage) Finalize() error {
return w.FinalizeWithError(nil)

Check warning on line 201 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L200-L201

Added lines #L200 - L201 were not covered by tests
}

// FinalizeWithError closes the writer; subsequent reads from the
// read half of the pipe will return no bytes and the error err,
// or EOF if err is nil.
//
// FinalizeWithError never overwrites the previous error if it exists
// and always returns nil.
func (w *WritablePipeStorage) FinalizeWithError(err error) error {
return w.p.finalizeWritable(err)

Check warning on line 211 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L210-L211

Added lines #L210 - L211 were not covered by tests
}

// PipeStorage creates a synchronous in-memory pipe.
// It can be used to connect code expecting an io.Reader
// with code expecting an io.Writer.
//
// Reads and Writes on the pipe are matched one to one
// except when multiple Reads are needed to consume a single Write.
// That is, each Write to the PipeWriter blocks until it has satisfied
// one or more Reads from the PipeReader that fully consume
// the written data.
// The data is copied directly from the Write to the corresponding
// Read (or Reads); there is no internal buffering.
//
// It is safe to call Read and Write in parallel with each other or with Close.
// Parallel calls to Read and parallel calls to Write are also safe:
// the individual calls will be gated sequentially.
func PipeStorage() (*ReadablePipeStorage, *WritablePipeStorage) {
p := &pipeStorage{
wrCh: make(chan storageUnit),
rdCh: make(chan struct{}),
done: make(chan struct{}),
}
return &ReadablePipeStorage{p}, &WritablePipeStorage{p}

Check warning on line 235 in storage/pipe.go

View check run for this annotation

Codecov / codecov/patch

storage/pipe.go#L229-L235

Added lines #L229 - L235 were not covered by tests
}
63 changes: 63 additions & 0 deletions storage/teestorage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package storage

import (
"context"
"io"
)

type teeStorage struct {
ReadableStorage
out WritableStorage
}

type teeReadCloser struct {
io.Reader
readCloser io.Closer
key string
writeClose func(string) error
}

func (trc teeReadCloser) Close() error {
err := trc.readCloser.Close()
if err != nil {
return err
}
err = trc.writeClose(trc.key)
if err != nil {
return err
}
return nil

Check warning on line 29 in storage/teestorage.go

View check run for this annotation

Codecov / codecov/patch

storage/teestorage.go#L20-L29

Added lines #L20 - L29 were not covered by tests
}

func (ts teeStorage) GetStream(ctx context.Context, key string) (io.ReadCloser, error) {
rdr, err := GetStream(ctx, ts.ReadableStorage, key)
if err != nil {
return nil, err
}
writer, committer, err := PutStream(ctx, ts.out)
if err != nil {
return nil, err
}
return teeReadCloser{
Reader: io.TeeReader(rdr, writer),
readCloser: rdr,
writeClose: committer,
key: key,
}, nil

Check warning on line 46 in storage/teestorage.go

View check run for this annotation

Codecov / codecov/patch

storage/teestorage.go#L32-L46

Added lines #L32 - L46 were not covered by tests
}

func (ts teeStorage) Get(ctx context.Context, key string) ([]byte, error) {
data, err := ts.ReadableStorage.Get(ctx, key)
if err != nil {
return nil, err
}
err = ts.out.Put(ctx, key, data)
return data, err

Check warning on line 55 in storage/teestorage.go

View check run for this annotation

Codecov / codecov/patch

storage/teestorage.go#L49-L55

Added lines #L49 - L55 were not covered by tests
}

func TeeStorage(in ReadableStorage, out WritableStorage) ReadableStorage {
return teeStorage{
ReadableStorage: in,
out: out,
}

Check warning on line 62 in storage/teestorage.go

View check run for this annotation

Codecov / codecov/patch

storage/teestorage.go#L58-L62

Added lines #L58 - L62 were not covered by tests
}
Loading