Skip to content

Commit

Permalink
util: gorotinue pool (#39872)
Browse files Browse the repository at this point in the history
close #38039
  • Loading branch information
hawkingrei authored Jan 4, 2023
1 parent 3ccff46 commit 3e65e9b
Show file tree
Hide file tree
Showing 14 changed files with 1,739 additions and 0 deletions.
8 changes: 8 additions & 0 deletions resourcemanager/pooltask/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "pooltask",
srcs = ["task.go"],
importpath = "github.com/pingcap/tidb/resourcemanager/pooltask",
visibility = ["//visibility:public"],
)
132 changes: 132 additions & 0 deletions resourcemanager/pooltask/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pooltask

import (
"sync"
)

// Context is a interface that can be used to create a context.
type Context[T any] interface {
GetContext() T
}

// NilContext is to create a nil as context
type NilContext struct{}

// GetContext is to get a nil as context
func (NilContext) GetContext() any {
return nil
}

// TaskBox is a box which contains all info about pooltask.
type TaskBox[T any, U any, C any, CT any, TF Context[CT]] struct {
constArgs C
contextFunc TF
wg *sync.WaitGroup
task chan Task[T]
resultCh chan U
taskID uint64
}

// NewTaskBox is to create a task box for pool.
func NewTaskBox[T any, U any, C any, CT any, TF Context[CT]](constArgs C, contextFunc TF, wg *sync.WaitGroup, taskCh chan Task[T], resultCh chan U, taskID uint64) TaskBox[T, U, C, CT, TF] {
return TaskBox[T, U, C, CT, TF]{
constArgs: constArgs,
contextFunc: contextFunc,
wg: wg,
task: taskCh,
resultCh: resultCh,
taskID: taskID,
}
}

// TaskID is to get the task id.
func (t TaskBox[T, U, C, CT, TF]) TaskID() uint64 {
return t.taskID
}

// ConstArgs is to get the const args.
func (t *TaskBox[T, U, C, CT, TF]) ConstArgs() C {
return t.constArgs
}

// GetTaskCh is to get the task channel.
func (t *TaskBox[T, U, C, CT, TF]) GetTaskCh() chan Task[T] {
return t.task
}

// GetResultCh is to get result channel
func (t *TaskBox[T, U, C, CT, TF]) GetResultCh() chan U {
return t.resultCh
}

// GetContextFunc is to get context func.
func (t *TaskBox[T, U, C, CT, TF]) GetContextFunc() TF {
return t.contextFunc
}

// Done is to set the pooltask status to complete.
func (t *TaskBox[T, U, C, CT, TF]) Done() {
t.wg.Done()
}

// Clone is to copy the box
func (t *TaskBox[T, U, C, CT, TF]) Clone() *TaskBox[T, U, C, CT, TF] {
newBox := NewTaskBox[T, U, C, CT, TF](t.constArgs, t.contextFunc, t.wg, t.task, t.resultCh, t.taskID)
return &newBox
}

// GPool is a goroutine pool.
type GPool[T any, U any, C any, CT any, TF Context[CT]] interface {
Tune(size int)
}

// TaskController is a controller that can control or watch the pool.
type TaskController[T any, U any, C any, CT any, TF Context[CT]] struct {
pool GPool[T, U, C, CT, TF]
close chan struct{}
wg *sync.WaitGroup
taskID uint64
resultCh chan U
}

// NewTaskController create a controller to deal with pooltask's status.
func NewTaskController[T any, U any, C any, CT any, TF Context[CT]](p GPool[T, U, C, CT, TF], taskID uint64, closeCh chan struct{}, wg *sync.WaitGroup, resultCh chan U) TaskController[T, U, C, CT, TF] {
return TaskController[T, U, C, CT, TF]{
pool: p,
taskID: taskID,
close: closeCh,
wg: wg,
resultCh: resultCh,
}
}

// Wait is to wait the pool task to stop.
func (t *TaskController[T, U, C, CT, TF]) Wait() {
<-t.close
t.wg.Wait()
close(t.resultCh)
}

// TaskID is to get the task id.
func (t *TaskController[T, U, C, CT, TF]) TaskID() uint64 {
return t.taskID
}

// Task is a task that can be executed.
type Task[T any] struct {
Task T
}
11 changes: 11 additions & 0 deletions util/gpool/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "gpool",
srcs = [
"gpool.go",
"spinlock.go",
],
importpath = "github.com/pingcap/tidb/util/gpool",
visibility = ["//visibility:public"],
)
69 changes: 69 additions & 0 deletions util/gpool/gpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package gpool

import (
"errors"
"sync/atomic"
"time"
)

const (
// DefaultCleanIntervalTime is the interval time to clean up goroutines.
DefaultCleanIntervalTime = 5 * time.Second

// OPENED represents that the pool is opened.
OPENED = iota

// CLOSED represents that the pool is closed.
CLOSED
)

var (
// ErrPoolClosed will be returned when submitting task to a closed pool.
ErrPoolClosed = errors.New("this pool has been closed")

// ErrPoolOverload will be returned when the pool is full and no workers available.
ErrPoolOverload = errors.New("too many goroutines blocked on submit or Nonblocking is set")

// ErrProducerClosed will be returned when the producer is closed.
ErrProducerClosed = errors.New("this producer has been closed")
)

// BasePool is base class of pool
type BasePool struct {
name string
generator atomic.Uint64
}

// NewBasePool is to create a new BasePool.
func NewBasePool() BasePool {
return BasePool{}
}

// SetName is to set name.
func (p *BasePool) SetName(name string) {
p.name = name
}

// Name is to get name.
func (p *BasePool) Name() string {
return p.name
}

// NewTaskID is to get a new task ID.
func (p *BasePool) NewTaskID() uint64 {
return p.generator.Add(1)
}
47 changes: 47 additions & 0 deletions util/gpool/spinlock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package gpool

import (
"runtime"
"sync"
"sync/atomic"
)

type spinLock uint32

const maxBackoff = 16

func (sl *spinLock) Lock() {
backoff := 1
for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
// Leverage the exponential backoff algorithm, see https://en.wikipedia.org/wiki/Exponential_backoff.
for i := 0; i < backoff; i++ {
runtime.Gosched()
}
if backoff < maxBackoff {
backoff <<= 1
}
}
}

func (sl *spinLock) Unlock() {
atomic.StoreUint32((*uint32)(sl), 0)
}

// NewSpinLock instantiates a spin-lock.
func NewSpinLock() sync.Locker {
return new(spinLock)
}
43 changes: 43 additions & 0 deletions util/gpool/spmc/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "spmc",
srcs = [
"option.go",
"spmcpool.go",
"worker.go",
"worker_loop_queue.go",
],
importpath = "github.com/pingcap/tidb/util/gpool/spmc",
visibility = ["//visibility:public"],
deps = [
"//resourcemanager/pooltask",
"//util/gpool",
"//util/logutil",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_log//:log",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "spmc_test",
srcs = [
"main_test.go",
"spmcpool_benchmark_test.go",
"spmcpool_test.go",
"worker_loop_queue_test.go",
],
embed = [":spmc"],
race = "on",
deps = [
"//resourcemanager/pooltask",
"//testkit/testsetup",
"//util",
"//util/gpool",
"@com_github_stretchr_testify//require",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_goleak//:goleak",
],
)
27 changes: 27 additions & 0 deletions util/gpool/spmc/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package spmc

import (
"testing"

"github.com/pingcap/tidb/testkit/testsetup"
"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
testsetup.SetupForCommonTest()
goleak.VerifyTestMain(m)
}
Loading

0 comments on commit 3e65e9b

Please sign in to comment.