Skip to content

Commit

Permalink
import from plz
Browse files Browse the repository at this point in the history
  • Loading branch information
taowen committed Feb 28, 2018
1 parent db6ada8 commit e0a39a4
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 0 deletions.
7 changes: 7 additions & 0 deletions executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package concurrent

import "context"

type Executor interface {
Go(handler func(ctx context.Context))
}
13 changes: 13 additions & 0 deletions go_above_19.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
//+build go1.9

package concurrent

import "sync"

type Map struct {
sync.Map
}

func NewMap() *Map {
return &Map{}
}
30 changes: 30 additions & 0 deletions go_below_19.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
//+build !go1.9

package concurrent

import "sync"

type Map struct {
lock sync.RWMutex
data map[interface{}]interface{}
}

func NewMap() *Map {
return &Map{
data: make(map[interface{}]interface{}, 32),
}
}

func (m *Map) Load(key interface{}) (elem interface{}, found bool) {
m.lock.RLock()
elem, found = m.data[key]
m.lock.RUnlock()
return
}

func (m *Map) Store(key interface{}, elem interface{}) {
m.lock.Lock()
m.data[key] = elem
m.lock.Unlock()
}

99 changes: 99 additions & 0 deletions unbounded_executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package concurrent

import (
"context"
"fmt"
"runtime"
"sync"
"time"
"runtime/debug"
)

var LogInfo = func(event string, properties ...interface{}) {
}

var LogPanic = func(recovered interface{}, properties ...interface{}) interface{} {
fmt.Println(fmt.Sprintf("paniced: %v", recovered))
debug.PrintStack()
return recovered
}

const StopSignal = "STOP!"

type UnboundedExecutor struct {
ctx context.Context
cancel context.CancelFunc
activeGoroutinesMutex *sync.Mutex
activeGoroutines map[string]int
}

// GlobalUnboundedExecutor has the life cycle of the program itself
// any goroutine want to be shutdown before main exit can be started from this executor
var GlobalUnboundedExecutor = NewUnboundedExecutor()

func NewUnboundedExecutor() *UnboundedExecutor {
ctx, cancel := context.WithCancel(context.TODO())
return &UnboundedExecutor{
ctx: ctx,
cancel: cancel,
activeGoroutinesMutex: &sync.Mutex{},
activeGoroutines: map[string]int{},
}
}

func (executor *UnboundedExecutor) Go(handler func(ctx context.Context)) {
_, file, line, _ := runtime.Caller(1)
executor.activeGoroutinesMutex.Lock()
defer executor.activeGoroutinesMutex.Unlock()
startFrom := fmt.Sprintf("%s:%d", file, line)
executor.activeGoroutines[startFrom] += 1
go func() {
defer func() {
recovered := recover()
if recovered != nil && recovered != StopSignal {
LogPanic(recovered)
}
executor.activeGoroutinesMutex.Lock()
defer executor.activeGoroutinesMutex.Unlock()
executor.activeGoroutines[startFrom] -= 1
}()
handler(executor.ctx)
}()
}

func (executor *UnboundedExecutor) Stop() {
executor.cancel()
}

func (executor *UnboundedExecutor) StopAndWaitForever() {
executor.StopAndWait(context.Background())
}

func (executor *UnboundedExecutor) StopAndWait(ctx context.Context) {
executor.cancel()
for {
fiveSeconds := time.NewTimer(time.Millisecond * 100)
select {
case <-fiveSeconds.C:
case <-ctx.Done():
return
}
if executor.checkGoroutines() {
return
}
}
}

func (executor *UnboundedExecutor) checkGoroutines() bool {
executor.activeGoroutinesMutex.Lock()
defer executor.activeGoroutinesMutex.Unlock()
for startFrom, count := range executor.activeGoroutines {
if count > 0 {
LogInfo("event!unbounded_executor.still waiting goroutines to quit",
"startFrom", startFrom,
"count", count)
return false
}
}
return true
}

0 comments on commit e0a39a4

Please sign in to comment.