Skip to content

Commit

Permalink
Merge pull request #76 from vinted/receive_workers
Browse files Browse the repository at this point in the history
receive: implement worker architecture
  • Loading branch information
GiedriusS authored Jan 5, 2024
2 parents 4735da1 + 45f988c commit dc60d10
Showing 1 changed file with 60 additions and 7 deletions.
67 changes: 60 additions & 7 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1163,7 +1163,7 @@ func newReplicationErrors(threshold, numErrors int) []*replicationErrors {
func newPeerGroup(dialOpts ...grpc.DialOption) peersContainer {
return &peerGroup{
dialOpts: dialOpts,
cache: map[string]*grpc.ClientConn{},
cache: map[string]*peerWorker{},
m: sync.RWMutex{},
dialer: grpc.DialContext,
}
Expand All @@ -1176,7 +1176,7 @@ type peersContainer interface {

type peerGroup struct {
dialOpts []grpc.DialOption
cache map[string]*grpc.ClientConn
cache map[string]*peerWorker
m sync.RWMutex

// dialer is used for testing.
Expand All @@ -1192,7 +1192,7 @@ func (p *peerGroup) close(addr string) error {
return nil
}

if err := c.Close(); err != nil {
if err := c.cc.Close(); err != nil {
return fmt.Errorf("closing connection for %s", addr)
}

Expand All @@ -1206,21 +1206,74 @@ func (p *peerGroup) get(ctx context.Context, addr string) (storepb.WriteableStor
c, ok := p.cache[addr]
p.m.RUnlock()
if ok {
return storepb.NewWriteableStoreClient(c), nil
return c, nil
}

p.m.Lock()
defer p.m.Unlock()
// Make sure that another caller hasn't created the connection since obtaining the write lock.
c, ok = p.cache[addr]
if ok {
return storepb.NewWriteableStoreClient(c), nil
return c, nil
}
conn, err := p.dialer(ctx, addr, p.dialOpts...)
if err != nil {
return nil, errors.Wrap(err, "failed to dial peer")
}

p.cache[addr] = conn
return storepb.NewWriteableStoreClient(conn), nil
p.cache[addr] = newPeerWorker(conn)
return p.cache[addr], nil
}

type peerWorker struct {
cc *grpc.ClientConn

work chan peerWorkItem
turnOffGoroutines func()
}

func newPeerWorker(cc *grpc.ClientConn) *peerWorker {
work := make(chan peerWorkItem)
ctx, cancel := context.WithCancel(context.Background())

for i := 0; i < 20; i++ {
go func() {
for {
select {
case <-ctx.Done():
return
case w := <-work:
_, err := storepb.NewWriteableStoreClient(cc).RemoteWrite(w.ctx, w.req)
w.errResult <- err
}
}
}()
}

return &peerWorker{
cc: cc,
work: work,
turnOffGoroutines: cancel,
}
}

type peerWorkItem struct {
cc *grpc.ClientConn
req *storepb.WriteRequest
ctx context.Context

errResult chan error
}

func (pw *peerWorker) RemoteWrite(ctx context.Context, in *storepb.WriteRequest, opts ...grpc.CallOption) (*storepb.WriteResponse, error) {
w := peerWorkItem{
cc: pw.cc,
req: in,
errResult: make(chan error, 1),
ctx: ctx,
}

pw.work <- w

return nil, <-w.errResult
}

0 comments on commit dc60d10

Please sign in to comment.