Skip to content
This repository has been archived by the owner on Sep 21, 2022. It is now read-only.

Commit

Permalink
improved "in use" error, drop vttopo
Browse files Browse the repository at this point in the history
Numbered pool now accepts a purpose when locking resources,
and reports it as part of the error if it's in use.
Also dropping vttopo from Makefile and .gitignore.
  • Loading branch information
sougou committed Nov 4, 2013
1 parent 444b32f commit 6627763
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 33 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ go/cmd/vtctl/vtctl
go/cmd/vtctld/vtctld
go/cmd/vtocc/vtocc
go/cmd/vttablet/vttablet
go/cmd/vttopo/vttopo
go/cmd/zk/zk
go/cmd/zkclient2/zkclient2
go/cmd/zkctl/zkctl
Expand Down
2 changes: 0 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ build:
cd go/cmd/vtctld; go build
cd go/cmd/vtocc; go build
cd go/cmd/vttablet; go build
cd go/cmd/vttopo; go build
cd go/cmd/zk; go build
cd go/cmd/zkclient2; go build
cd go/cmd/zkctl; go build
Expand Down Expand Up @@ -64,7 +63,6 @@ clean:
cd go/cmd/vtctl; go clean
cd go/cmd/vtocc; go clean
cd go/cmd/vttablet; go clean
cd go/cmd/vttopo; go clean
cd go/cmd/zk; go clean
cd go/cmd/zkctl; go clean
cd go/cmd/zkocc; go clean
31 changes: 20 additions & 11 deletions go/pools/numbered.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package pools

import (
"errors"
"fmt"
"sync"
"time"
Expand All @@ -22,6 +21,7 @@ type Numbered struct {
type numberedWrapper struct {
val interface{}
inUse bool
purpose string
timeCreated time.Time
timeUsed time.Time
}
Expand All @@ -39,10 +39,14 @@ func (nu *Numbered) Register(id int64, val interface{}) error {
nu.mu.Lock()
defer nu.mu.Unlock()
if _, ok := nu.resources[id]; ok {
return errors.New("already present")
return fmt.Errorf("already present")
}
now := time.Now()
nu.resources[id] = &numberedWrapper{val, false, now, now}
nu.resources[id] = &numberedWrapper{
val: val,
timeCreated: now,
timeUsed: now,
}
return nil
}

Expand All @@ -57,19 +61,21 @@ func (nu *Numbered) Unregister(id int64) {
}
}

// Get locks the resource for use. If it cannot be found or
// is already in use, it returns an error.
func (nu *Numbered) Get(id int64) (val interface{}, err error) {
// Get locks the resource for use. It accepts a purpose as a string.
// If it cannot be found, it returns a "not found" error. If in use,
// it returns a "in use: purpose" error.
func (nu *Numbered) Get(id int64, purpose string) (val interface{}, err error) {
nu.mu.Lock()
defer nu.mu.Unlock()
nw, ok := nu.resources[id]
if !ok {
return nil, errors.New("not found")
return nil, fmt.Errorf("not found")
}
if nw.inUse {
return nil, errors.New("in use")
return nil, fmt.Errorf("in use: %s", nw.purpose)
}
nw.inUse = true
nw.purpose = purpose
return nw.val, nil
}

Expand All @@ -79,13 +85,14 @@ func (nu *Numbered) Put(id int64) {
defer nu.mu.Unlock()
if nw, ok := nu.resources[id]; ok {
nw.inUse = false
nw.purpose = ""
nw.timeUsed = time.Now()
}
}

// GetOutdated returns a list of resources that are older than age, and locks them.
// It does not return any resources that are already locked.
func (nu *Numbered) GetOutdated(age time.Duration) (vals []interface{}) {
func (nu *Numbered) GetOutdated(age time.Duration, purpose string) (vals []interface{}) {
nu.mu.Lock()
defer nu.mu.Unlock()
now := time.Now()
Expand All @@ -95,6 +102,7 @@ func (nu *Numbered) GetOutdated(age time.Duration) (vals []interface{}) {
}
if nw.timeCreated.Add(age).Sub(now) <= 0 {
nw.inUse = true
nw.purpose = purpose
vals = append(vals, nw.val)
}
}
Expand All @@ -103,8 +111,8 @@ func (nu *Numbered) GetOutdated(age time.Duration) (vals []interface{}) {

// GetIdle returns a list of resurces that have been idle for longer
// than timeout, and locks them. It does not return any resources that
// are laready locked.
func (nu *Numbered) GetIdle(timeout time.Duration) (vals []interface{}) {
// are already locked.
func (nu *Numbered) GetIdle(timeout time.Duration, purpose string) (vals []interface{}) {
nu.mu.Lock()
defer nu.mu.Unlock()
now := time.Now()
Expand All @@ -114,6 +122,7 @@ func (nu *Numbered) GetIdle(timeout time.Duration) (vals []interface{}) {
}
if nw.timeUsed.Add(timeout).Sub(now) <= 0 {
nw.inUse = true
nw.purpose = purpose
vals = append(vals, nw.val)
}
}
Expand Down
18 changes: 12 additions & 6 deletions go/pools/numbered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@ func TestNumbered(t *testing.T) {
t.Errorf("want 'already present', got '%v'", err)
}
var v interface{}
if v, err = p.Get(id); err != nil {
if v, err = p.Get(id, "test"); err != nil {
t.Errorf("Error %v", err)
}
if v.(int64) != id {
t.Errorf("want %v, got %v", id, v.(int64))
}
if v, err = p.Get(id); err.Error() != "in use" {
t.Errorf("want 'in use', got '%v'", err)
if v, err = p.Get(id, "test1"); err.Error() != "in use: test" {
t.Errorf("want 'in use: test', got '%v'", err)
}
p.Put(id)
if v, err = p.Get(1); err.Error() != "not found" {
if v, err = p.Get(1, "test2"); err.Error() != "not found" {
t.Errorf("want 'not found', got '%v'", err)
}
p.Unregister(1) // Should not fail
Expand All @@ -47,20 +47,26 @@ func TestNumbered(t *testing.T) {
time.Sleep(100 * time.Millisecond)

// p has 0, 1, 2 (0 & 1 are aged)
vals := p.GetOutdated(200 * time.Millisecond)
vals := p.GetOutdated(200*time.Millisecond, "by outdated")
if len(vals) != 2 {
t.Errorf("want 2, got %v", len(vals))
}
if v, err = p.Get(vals[0].(int64), "test1"); err.Error() != "in use: by outdated" {
t.Errorf("want 'in use: by outdated', got '%v'", err)
}
for _, v := range vals {
p.Put(v.(int64))
}
time.Sleep(100 * time.Millisecond)

// p has 0, 1, 2 (2 is idle)
vals = p.GetIdle(200 * time.Millisecond)
vals = p.GetIdle(200*time.Millisecond, "by idle")
if len(vals) != 1 {
t.Errorf("want 1, got %v", len(vals))
}
if v, err = p.Get(vals[0].(int64), "test1"); err.Error() != "in use: by idle" {
t.Errorf("want 'in use: by idle', got '%v'", err)
}
if vals[0].(int64) != 2 {
t.Errorf("want 2, got %v", vals[0])
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/tabletserver/active_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (ap *ActivePool) Close() {
}

func (ap *ActivePool) QueryKiller() {
for _, v := range ap.pool.GetOutdated(time.Duration(ap.Timeout())) {
for _, v := range ap.pool.GetOutdated(time.Duration(ap.Timeout()), "for abort") {
ap.kill(v.(int64))
}
}
Expand Down
6 changes: 3 additions & 3 deletions go/vt/tabletserver/active_tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (axp *ActiveTxPool) Open() {

func (axp *ActiveTxPool) Close() {
axp.ticks.Stop()
for _, v := range axp.pool.GetOutdated(time.Duration(0)) {
for _, v := range axp.pool.GetOutdated(time.Duration(0), "for closing") {
conn := v.(*TxConnection)
conn.Close()
conn.discard(TX_CLOSE)
Expand All @@ -87,7 +87,7 @@ func (axp *ActiveTxPool) WaitForEmpty() {
}

func (axp *ActiveTxPool) TransactionKiller() {
for _, v := range axp.pool.GetOutdated(time.Duration(axp.Timeout())) {
for _, v := range axp.pool.GetOutdated(time.Duration(axp.Timeout()), "for rollback") {
conn := v.(*TxConnection)
log.Infof("killing transaction %d: %#v", conn.transactionId, conn.queries)
killStats.Add("Transactions", 1)
Expand Down Expand Up @@ -132,7 +132,7 @@ func (axp *ActiveTxPool) Rollback(transactionId int64) {

// You must call Recycle on TxConnection once done.
func (axp *ActiveTxPool) Get(transactionId int64) (conn *TxConnection) {
v, err := axp.pool.Get(transactionId)
v, err := axp.pool.Get(transactionId, "for query")
if err != nil {
panic(NewTabletError(NOT_IN_TX, "Transaction %d: %v", transactionId, err))
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/tabletserver/reserved_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (rp *ReservedPool) Open(connFactory CreateConnectionFunc) {
}

func (rp *ReservedPool) Close() {
for _, v := range rp.pool.GetOutdated(time.Duration(0)) {
for _, v := range rp.pool.GetOutdated(time.Duration(0), "for close") {
conn := v.(*reservedConnection)
conn.Close()
rp.pool.Unregister(conn.connectionId)
Expand All @@ -55,7 +55,7 @@ func (rp *ReservedPool) CloseConnection(connectionId int64) {

// You must call Recycle on the PoolConnection once done.
func (rp *ReservedPool) Get(connectionId int64) PoolConnection {
v, err := rp.pool.Get(connectionId)
v, err := rp.pool.Get(connectionId, "for query")
if err != nil {
panic(NewTabletError(FAIL, "Error getting connection %d: %v", connectionId, err))
}
Expand Down
14 changes: 7 additions & 7 deletions go/vt/vtgate/vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (vtg *VTGate) GetSessionId(sessionParams *proto.SessionParams, session *pro

// ExecuteShard executes a non-streaming query on the specified shards.
func (vtg *VTGate) ExecuteShard(context *rpcproto.Context, query *proto.QueryShard, reply *mproto.QueryResult) error {
scatterConn, err := vtg.connections.Get(query.SessionId)
scatterConn, err := vtg.connections.Get(query.SessionId, "for query")
if err != nil {
return fmt.Errorf("query: %s, session %d: %v", query.Sql, query.SessionId, err)
}
Expand All @@ -72,7 +72,7 @@ func (vtg *VTGate) ExecuteShard(context *rpcproto.Context, query *proto.QuerySha

// ExecuteBatchShard executes a group of queries on the specified shards.
func (vtg *VTGate) ExecuteBatchShard(context *rpcproto.Context, batchQuery *proto.BatchQueryShard, reply *tproto.QueryResultList) error {
scatterConn, err := vtg.connections.Get(batchQuery.SessionId)
scatterConn, err := vtg.connections.Get(batchQuery.SessionId, "for batch query")
if err != nil {
return fmt.Errorf("query: %v, session %d: %v", batchQuery.Queries, batchQuery.SessionId, err)
}
Expand All @@ -88,7 +88,7 @@ func (vtg *VTGate) ExecuteBatchShard(context *rpcproto.Context, batchQuery *prot

// StreamExecuteShard executes a streaming query on the specified shards.
func (vtg *VTGate) StreamExecuteShard(context *rpcproto.Context, query *proto.QueryShard, sendReply func(interface{}) error) error {
scatterConn, err := vtg.connections.Get(query.SessionId)
scatterConn, err := vtg.connections.Get(query.SessionId, "for stream query")
if err != nil {
return fmt.Errorf("query: %s, session %d: %v", query.Sql, query.SessionId, err)
}
Expand All @@ -102,7 +102,7 @@ func (vtg *VTGate) StreamExecuteShard(context *rpcproto.Context, query *proto.Qu

// Begin begins a transaction. It has to be concluded by a Commit or Rollback.
func (vtg *VTGate) Begin(context *rpcproto.Context, session *proto.Session, noOutput *rpc.UnusedResponse) error {
scatterConn, err := vtg.connections.Get(session.SessionId)
scatterConn, err := vtg.connections.Get(session.SessionId, "for begin")
if err != nil {
return fmt.Errorf("session %d: %v", session.SessionId, err)
}
Expand All @@ -116,7 +116,7 @@ func (vtg *VTGate) Begin(context *rpcproto.Context, session *proto.Session, noOu

// Commit commits a transaction.
func (vtg *VTGate) Commit(context *rpcproto.Context, session *proto.Session, noOutput *rpc.UnusedResponse) error {
scatterConn, err := vtg.connections.Get(session.SessionId)
scatterConn, err := vtg.connections.Get(session.SessionId, "for commit")
if err != nil {
return fmt.Errorf("session %d: %v", session.SessionId, err)
}
Expand All @@ -130,7 +130,7 @@ func (vtg *VTGate) Commit(context *rpcproto.Context, session *proto.Session, noO

// Rollback rolls back a transaction.
func (vtg *VTGate) Rollback(context *rpcproto.Context, session *proto.Session, noOutput *rpc.UnusedResponse) error {
scatterConn, err := vtg.connections.Get(session.SessionId)
scatterConn, err := vtg.connections.Get(session.SessionId, "for rollback")
if err != nil {
return fmt.Errorf("session %d: %v", session.SessionId, err)
}
Expand All @@ -144,7 +144,7 @@ func (vtg *VTGate) Rollback(context *rpcproto.Context, session *proto.Session, n

// CloseSession closes the current session and releases all associated resources for the session.
func (vtg *VTGate) CloseSession(context *rpcproto.Context, session *proto.Session, noOutput *rpc.UnusedResponse) error {
scatterConn, err := vtg.connections.Get(session.SessionId)
scatterConn, err := vtg.connections.Get(session.SessionId, "for close")
if err != nil {
return nil
}
Expand Down

0 comments on commit 6627763

Please sign in to comment.