Skip to content

Commit

Permalink
add new function g.Go (gogf#2943)
Browse files Browse the repository at this point in the history
  • Loading branch information
gqcn authored Sep 11, 2023
1 parent 1582714 commit ab1970e
Show file tree
Hide file tree
Showing 7 changed files with 291 additions and 128 deletions.
14 changes: 14 additions & 0 deletions frame/g/g_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,20 @@ import (
"github.com/gogf/gf/v2/util/gutil"
)

type (
Func = gutil.Func // Func is the function which contains context parameter.
RecoverFunc = gutil.RecoverFunc // RecoverFunc is the panic recover function which contains context parameter.
)

// Go creates a new asynchronous goroutine function with specified recover function.
//
// The parameter `recoverFunc` is called when any panic during executing of `goroutineFunc`.
// If `recoverFunc` is given nil, it ignores the panic from `goroutineFunc` and no panic will
// throw to parent goroutine.
func Go(ctx context.Context, goroutineFunc Func, recoverFunc RecoverFunc) {
gutil.Go(ctx, goroutineFunc, recoverFunc)
}

// NewVar returns a gvar.Var.
func NewVar(i interface{}, safe ...bool) *Var {
return gvar.New(i, safe...)
Expand Down
18 changes: 18 additions & 0 deletions frame/g/g_z_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ package g_test
import (
"context"
"os"
"sync"
"testing"

"github.com/gogf/gf/v2/container/garray"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/test/gtest"
"github.com/gogf/gf/v2/util/gutil"
Expand Down Expand Up @@ -114,3 +116,19 @@ func Test_Object(t *testing.T) {
t.AssertNE(g.Validator(), nil)
})
}

func Test_Go(t *testing.T) {
gtest.C(t, func(t *gtest.T) {
var (
wg = sync.WaitGroup{}
array = garray.NewArray(true)
)
wg.Add(1)
g.Go(context.Background(), func(ctx context.Context) {
defer wg.Done()
array.Append(1)
}, nil)
wg.Wait()
t.Assert(array.Len(), 1)
})
}
155 changes: 29 additions & 126 deletions os/grpool/grpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (

"github.com/gogf/gf/v2/container/glist"
"github.com/gogf/gf/v2/container/gtype"
"github.com/gogf/gf/v2/errors/gcode"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/os/gtimer"
"github.com/gogf/gf/v2/util/grand"
)
Expand All @@ -23,7 +21,7 @@ import (
type Func func(ctx context.Context)

// RecoverFunc is the pool runtime panic recover function which contains context parameter.
type RecoverFunc func(ctx context.Context, err error)
type RecoverFunc func(ctx context.Context, exception error)

// Pool manages the goroutines using pool.
type Pool struct {
Expand All @@ -33,161 +31,66 @@ type Pool struct {
closed *gtype.Bool // Is pool closed or not.
}

// localPoolItem is the job item storing in job list.
type localPoolItem struct {
Ctx context.Context
Func Func
Ctx context.Context // Context.
Func Func // Job function.
}

const (
minTimerDuration = 500 * time.Millisecond
maxTimerDuration = 1500 * time.Millisecond
minSupervisorTimerDuration = 500 * time.Millisecond
maxSupervisorTimerDuration = 1500 * time.Millisecond
)

// Default goroutine pool.
var (
pool = New()
defaultPool = New()
)

// New creates and returns a new goroutine pool object.
// The parameter `limit` is used to limit the max goroutine count,
// which is not limited in default.
func New(limit ...int) *Pool {
p := &Pool{
limit: -1,
count: gtype.NewInt(),
list: glist.New(true),
closed: gtype.NewBool(),
}
var (
pool = &Pool{
limit: -1,
count: gtype.NewInt(),
list: glist.New(true),
closed: gtype.NewBool(),
}
timerDuration = grand.D(
minSupervisorTimerDuration,
maxSupervisorTimerDuration,
)
)
if len(limit) > 0 && limit[0] > 0 {
p.limit = limit[0]
pool.limit = limit[0]
}
timerDuration := grand.D(minTimerDuration, maxTimerDuration)
gtimer.Add(context.Background(), timerDuration, p.supervisor)
return p
gtimer.Add(context.Background(), timerDuration, pool.supervisor)
return pool
}

// Add pushes a new job to the pool using default goroutine pool.
// Add pushes a new job to the default goroutine pool.
// The job will be executed asynchronously.
func Add(ctx context.Context, f Func) error {
return pool.Add(ctx, f)
return defaultPool.Add(ctx, f)
}

// AddWithRecover pushes a new job to the pool with specified recover function.
// AddWithRecover pushes a new job to the default pool with specified recover function.
//
// The optional `recoverFunc` is called when any panic during executing of `userFunc`.
// If `recoverFunc` is not passed or given nil, it ignores the panic from `userFunc`.
// The job will be executed asynchronously.
func AddWithRecover(ctx context.Context, userFunc Func, recoverFunc RecoverFunc) error {
return pool.AddWithRecover(ctx, userFunc, recoverFunc)
return defaultPool.AddWithRecover(ctx, userFunc, recoverFunc)
}

// Size returns current goroutine count of default goroutine pool.
func Size() int {
return pool.Size()
return defaultPool.Size()
}

// Jobs returns current job count of default goroutine pool.
func Jobs() int {
return pool.Jobs()
}

// Add pushes a new job to the pool.
// The job will be executed asynchronously.
func (p *Pool) Add(ctx context.Context, f Func) error {
for p.closed.Val() {
return gerror.NewCode(
gcode.CodeInvalidOperation,
"goroutine pool is already closed",
)
}
p.list.PushFront(&localPoolItem{
Ctx: ctx,
Func: f,
})
// Check and fork new worker.
p.checkAndFork()
return nil
}

// checkAndFork checks and creates a new goroutine worker.
// Note that the worker dies if the job function panics and the job has no recover handling.
func (p *Pool) checkAndFork() {
// Check whether fork new goroutine or not.
var n int
for {
n = p.count.Val()
if p.limit != -1 && n >= p.limit {
// No need fork new goroutine.
return
}
if p.count.Cas(n, n+1) {
// Use CAS to guarantee atomicity.
break
}
}
// Create job function in goroutine.
go func() {
defer p.count.Add(-1)

var (
listItem interface{}
poolItem *localPoolItem
)
for !p.closed.Val() {
listItem = p.list.PopBack()
if listItem == nil {
return
}
poolItem = listItem.(*localPoolItem)
poolItem.Func(poolItem.Ctx)
}
}()
}

// AddWithRecover pushes a new job to the pool with specified recover function.
// The optional `recoverFunc` is called when any panic during executing of `userFunc`.
// If `recoverFunc` is not passed or given nil, it ignores the panic from `userFunc`.
// The job will be executed asynchronously.
func (p *Pool) AddWithRecover(ctx context.Context, userFunc Func, recoverFunc RecoverFunc) error {
return p.Add(ctx, func(ctx context.Context) {
defer func() {
if exception := recover(); exception != nil {
if recoverFunc != nil {
if v, ok := exception.(error); ok && gerror.HasStack(v) {
recoverFunc(ctx, v)
} else {
recoverFunc(ctx, gerror.NewCodef(gcode.CodeInternalPanic, "%+v", exception))
}
}
}
}()
userFunc(ctx)
})
}

// Cap returns the capacity of the pool.
// This capacity is defined when pool is created.
// It returns -1 if there's no limit.
func (p *Pool) Cap() int {
return p.limit
}

// Size returns current goroutine count of the pool.
func (p *Pool) Size() int {
return p.count.Val()
}

// Jobs returns current job count of the pool.
// Note that, it does not return worker/goroutine count but the job/task count.
func (p *Pool) Jobs() int {
return p.list.Size()
}

// IsClosed returns if pool is closed.
func (p *Pool) IsClosed() bool {
return p.closed.Val()
}

// Close closes the goroutine pool, which makes all goroutines exit.
func (p *Pool) Close() {
p.closed.Set(true)
return defaultPool.Jobs()
}
119 changes: 119 additions & 0 deletions os/grpool/grpool_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.

package grpool

import (
"context"

"github.com/gogf/gf/v2/errors/gcode"
"github.com/gogf/gf/v2/errors/gerror"
)

// Add pushes a new job to the pool.
// The job will be executed asynchronously.
func (p *Pool) Add(ctx context.Context, f Func) error {
for p.closed.Val() {
return gerror.NewCode(
gcode.CodeInvalidOperation,
"goroutine defaultPool is already closed",
)
}
p.list.PushFront(&localPoolItem{
Ctx: ctx,
Func: f,
})
// Check and fork new worker.
p.checkAndForkNewGoroutineWorker()
return nil
}

// AddWithRecover pushes a new job to the pool with specified recover function.
//
// The optional `recoverFunc` is called when any panic during executing of `userFunc`.
// If `recoverFunc` is not passed or given nil, it ignores the panic from `userFunc`.
// The job will be executed asynchronously.
func (p *Pool) AddWithRecover(ctx context.Context, userFunc Func, recoverFunc RecoverFunc) error {
return p.Add(ctx, func(ctx context.Context) {
defer func() {
if exception := recover(); exception != nil {
if recoverFunc != nil {
if v, ok := exception.(error); ok && gerror.HasStack(v) {
recoverFunc(ctx, v)
} else {
recoverFunc(ctx, gerror.NewCodef(gcode.CodeInternalPanic, "%+v", exception))
}
}
}
}()
userFunc(ctx)
})
}

// Cap returns the capacity of the pool.
// This capacity is defined when pool is created.
// It returns -1 if there's no limit.
func (p *Pool) Cap() int {
return p.limit
}

// Size returns current goroutine count of the pool.
func (p *Pool) Size() int {
return p.count.Val()
}

// Jobs returns current job count of the pool.
// Note that, it does not return worker/goroutine count but the job/task count.
func (p *Pool) Jobs() int {
return p.list.Size()
}

// IsClosed returns if pool is closed.
func (p *Pool) IsClosed() bool {
return p.closed.Val()
}

// Close closes the goroutine pool, which makes all goroutines exit.
func (p *Pool) Close() {
p.closed.Set(true)
}

// checkAndForkNewGoroutineWorker checks and creates a new goroutine worker.
// Note that the worker dies if the job function panics and the job has no recover handling.
func (p *Pool) checkAndForkNewGoroutineWorker() {
// Check whether fork new goroutine or not.
var n int
for {
n = p.count.Val()
if p.limit != -1 && n >= p.limit {
// No need fork new goroutine.
return
}
if p.count.Cas(n, n+1) {
// Use CAS to guarantee atomicity.
break
}
}

// Create job function in goroutine.
go func() {
defer p.count.Add(-1)

var (
listItem interface{}
poolItem *localPoolItem
)
// Harding working, one by one, job never empty, worker never die.
for !p.closed.Val() {
listItem = p.list.PopBack()
if listItem == nil {
return
}
poolItem = listItem.(*localPoolItem)
poolItem.Func(poolItem.Ctx)
}
}()
}
4 changes: 2 additions & 2 deletions os/grpool/grpool_supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

// supervisor checks the job list and fork new worker goroutine to handle the job
// if there are jobs but no workers in pool.
func (p *Pool) supervisor(ctx context.Context) {
func (p *Pool) supervisor(_ context.Context) {
if p.IsClosed() {
gtimer.Exit()
}
Expand All @@ -24,7 +24,7 @@ func (p *Pool) supervisor(ctx context.Context) {
number = p.limit
}
for i := 0; i < number; i++ {
p.checkAndFork()
p.checkAndForkNewGoroutineWorker()
}
}
}
Loading

0 comments on commit ab1970e

Please sign in to comment.