Skip to content

feature: query shard on multi table and multi db #40

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ var (
}
executors[executorConf.Name] = executor
}
if executorConf.Mode == config.SHD {
executor, err := executor.NewShardingExecutor(executorConf)
if err != nil {
panic(err)
}
executors[executorConf.Name] = executor
}
}

if conf.DistributedTransaction != nil {
Expand Down
54 changes: 54 additions & 0 deletions docker/conf/config_shd.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
listeners:
- protocol_type: mysql
socket_address:
address: 0.0.0.0
port: 13306
config:
users:
dksl: "123456"
server_version: "8.0.27"
executor: redirect

executors:
- name: redirect
mode: shd
config:
db_groups:
- name: drug_0
load_balance_algorithm: RandomWeight
data_sources:
- name: drug_0
weight: r10w10
- name: drug_1
load_balance_algorithm: RandomWeight
data_sources:
- name: drug_1
weight: r10w10
logic_tables:
- db_name: drug
table_name: drug_resource
allow_full_scan: true
sharding_rule:
column: id
sharding_algorithm: NumberMod
topology:
"0": 0-4
"1": 5-9

data_source_cluster:
- name: drug_0
capacity: 10
max_capacity: 20
idle_timeout: 60s
dsn: root:123456@tcp(dbpack-mysql1:3306)/drug_0?timeout=10s&readTimeout=10s&writeTimeout=10s&parseTime=true&loc=Local&charset=utf8mb4,utf8
ping_interval: 20s
ping_times_for_change_status: 3
filters:
- MysqlDistributedTransaction
- name: drug_1
capacity: 10
max_capacity: 20
idle_timeout: 60s
dsn: root:123456@tcp(dbpack-mysql2:3306)/drug_1?timeout=60s&readTimeout=60s&writeTimeout=60s&parseTime=true&loc=Local&charset=utf8mb4,utf8
ping_interval: 20s
ping_times_for_change_status: 3
4 changes: 2 additions & 2 deletions docker/docker-compose-rws.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ services:
environment:
MYSQL_ROOT_PASSWORD: "123456"
volumes:
- ./scripts/:/docker-entrypoint-initdb.d/:rw
- ./scripts/init.sql:/docker-entrypoint-initdb.d/init.sql:rw
command: ['mysqld', '--character-set-server=utf8mb4', '--collation-server=utf8mb4_unicode_ci']
mysql2:
image: mysql:8.0
Expand All @@ -39,7 +39,7 @@ services:
environment:
MYSQL_ROOT_PASSWORD: "123456"
volumes:
- ./scripts/:/docker-entrypoint-initdb.d/:rw
- ./scripts/init.sql:/docker-entrypoint-initdb.d/init.sql:rw
command: [ 'mysqld', '--character-set-server=utf8mb4', '--collation-server=utf8mb4_unicode_ci' ]
dbpack:
image: dbpack:latest
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-sdb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ services:
environment:
MYSQL_ROOT_PASSWORD: "123456"
volumes:
- ./scripts/:/docker-entrypoint-initdb.d/:rw
- ./scripts/init.sql:/docker-entrypoint-initdb.d/init.sql:rw
command: ['mysqld', '--character-set-server=utf8mb4', '--collation-server=utf8mb4_unicode_ci']
dbpack:
image: dbpack:latest
Expand Down
42 changes: 42 additions & 0 deletions docker/docker-compose-shd.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
version: "2.3"
services:
mysql1:
image: mysql:8.0
container_name: dbpack-mysql1
networks:
- local
ports:
- "3306:3306"
environment:
MYSQL_ROOT_PASSWORD: "123456"
volumes:
- ./scripts/drug_0.sql:/docker-entrypoint-initdb.d/drug_0.sql:rw
command: ['mysqld', '--character-set-server=utf8mb4', '--collation-server=utf8mb4_unicode_ci']
mysql2:
image: mysql:8.0
container_name: dbpack-mysql2
networks:
- local
ports:
- "3307:3306"
environment:
MYSQL_ROOT_PASSWORD: "123456"
volumes:
- ./scripts/drug_1.sql:/docker-entrypoint-initdb.d/drug_1.sql:rw
command: [ 'mysqld', '--character-set-server=utf8mb4', '--collation-server=utf8mb4_unicode_ci' ]
dbpack:
image: dbpack:latest
container_name: dbpack
command: sh -c "sleep 60 && ./dbpack start -c config.yaml"
networks:
- local
ports:
- "13306:13306"
volumes:
- ./conf/config_shd.yaml:/app/config.yaml
depends_on:
- mysql1
- mysql2
networks:
local:
name: dbpack
3,583 changes: 3,583 additions & 0 deletions docker/scripts/drug_0.sql

Large diffs are not rendered by default.

3,581 changes: 3,581 additions & 0 deletions docker/scripts/drug_1.sql

Large diffs are not rendered by default.

8 changes: 7 additions & 1 deletion makefile
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,15 @@ unit-test: ## run unit test
go test ./pkg/... -coverprofile=coverage.txt -covermode=atomic

########################################################
build: ## build dbpack cli, and put in dist dir
build-local: ## build dbpack cli, and put in dist dir
@mkdir -p dist
${GO_BUILD_ENVVARS} go build -o ./dist/dbpack ./cmd

########################################################
build: ## build dbpack cli, and put in dist dir
@mkdir -p dist
GOOS="linux" GOARCH="amd64" CGO_ENABLED=0 go build -o ./dist/dbpack ./cmd

########################################################
docker-build: build ## build docker image
docker build -f docker/Dockerfile -t dbpack:latest .
Expand All @@ -76,6 +81,7 @@ docker-build: build ## build docker image
integration-test: build docker-build
sh test/cmd/test_single_db.sh
sh test/cmd/test_read_write_splitting.sh
sh test/cmd/test_sharding.sh

########################################################
clean: ## clean temporary build dir
Expand Down
37 changes: 37 additions & 0 deletions pkg/config/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"time"

"github.com/pkg/errors"

"github.com/cectc/dbpack/pkg/lb"
)

type (
Expand All @@ -44,6 +46,41 @@ type (
PingTimesForChangeStatus int `yaml:"ping_times_for_change_status" json:"ping_times_for_change_status"`
Filters []string `yaml:"filters" json:"filters"`
}

DataSourceRef struct {
Name string `yaml:"name" json:"name"`
Weight string `yaml:"weight,omitempty" json:"weight,omitempty"`
}

ReadWriteSplittingConfig struct {
LoadBalanceAlgorithm lb.LoadBalanceAlgorithm `yaml:"load_balance_algorithm" json:"load_balance_algorithm"`
DataSources []*DataSourceRef `yaml:"data_sources" json:"data_sources"`
}

DataSourceRefGroup struct {
Name string `yaml:"name" json:"name"`
LBAlgorithm lb.LoadBalanceAlgorithm `yaml:"load_balance_algorithm" json:"load_balance_algorithm"`
DataSources []*DataSourceRef `yaml:"data_sources" json:"data_sources"`
}

ShardingRule struct {
Column string `yaml:"column" json:"column"`
ShardingAlgorithm string `yaml:"sharding_algorithm" json:"sharding_algorithm"`
Config Parameters `yaml:"config,omitempty" json:"config,omitempty"`
}

LogicTable struct {
DBName string `yaml:"db_name" json:"db_name"`
TableName string `yaml:"table_name" json:"table_name"`
AllowFullScan bool `yaml:"allow_full_scan" json:"allow_full_scan"`
ShardingRule *ShardingRule `yaml:"sharding_rule" json:"sharding_rule"`
Topology map[int]string `yaml:"topology" json:"topology"`
}

ShardingConfig struct {
DBGroups []*DataSourceRefGroup `yaml:"db_groups" json:"db_groups"`
LogicTables []*LogicTable `yaml:"logic_tables" json:"logic_tables"`
}
)

const (
Expand Down
65 changes: 17 additions & 48 deletions pkg/executor/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,55 +23,23 @@ import (

"github.com/pkg/errors"

"github.com/cectc/dbpack/pkg/lb"
"github.com/cectc/dbpack/pkg/config"
"github.com/cectc/dbpack/pkg/proto"
"github.com/cectc/dbpack/pkg/resource"
)

type (
DataSourceRef struct {
Name string `yaml:"name" json:"name"`
Weight string `yaml:"weight,omitempty" json:"weight,omitempty"`
}

DataSourceBrief struct {
Name string `yaml:"name" json:"name"`
WriteWeight int `yaml:"write_weight" json:"write_weight"`
ReadWeight int `yaml:"read_weight" json:"read_weight"`
IsMaster bool `yaml:"is_master" json:"is_master"`
DB proto.DB
}

ReadWriteSplittingConfig struct {
LoadBalanceAlgorithm lb.LoadBalanceAlgorithm `yaml:"load_balance_algorithm" json:"load_balance_algorithm"`
DataSources []*DataSourceRef `yaml:"data_sources" json:"data_sources"`
}

DataSourceRefGroup struct {
Name string `yaml:"name" json:"name"`
Group []*DataSourceRef `yaml:"group" json:"group"`
}

ShardingRule struct {
Column string `yaml:"column" json:"column"`
Expr string `yaml:"expr" json:"expr"`
}

VirtualTableRule struct {
Name string `yaml:"name" json:"name"`
AllowFullTableScan bool `yaml:"allow_full_table_scan" json:"allow_full_table_scan"`
ShardingDB []*ShardingRule `yaml:"sharding_db" json:"sharding_db"`
ShardingTable []*ShardingRule `yaml:"sharding_table" json:"sharding_table"`
Topology *Topology `yaml:"topology" json:"topology"`
ShadowTopology *Topology `yaml:"shadow_topology" json:"shadow_topology"`
}

Topology struct {
DBNamePattern string `yaml:"db_name_pattern" json:"db_name_pattern"`
TableNamePattern string `yaml:"table_name_pattern" json:"table_name_pattern"`
}
const (
weightRegex = `^r([\d]+)w([\d]+)$`
)

type DataSourceBrief struct {
Name string `yaml:"name" json:"name"`
WriteWeight int `yaml:"write_weight" json:"write_weight"`
ReadWeight int `yaml:"read_weight" json:"read_weight"`
IsMaster bool `yaml:"is_master" json:"is_master"`
DB proto.DB
}

func (brief *DataSourceBrief) Counting() bool {
return brief.DB.Status() == proto.Running
}
Expand All @@ -83,8 +51,8 @@ func (brief *DataSourceBrief) Weight(ctx context.Context) int {
return brief.ReadWeight
}

func (ref *DataSourceRef) castToDataSourceBrief() (*DataSourceBrief, error) {
weightRegexp := regexp.MustCompile(`^r([\d]+)w([\d]+)$`)
func castToDataSourceBrief(ref *config.DataSourceRef) (*DataSourceBrief, error) {
weightRegexp := regexp.MustCompile(weightRegex)
params := weightRegexp.FindStringSubmatch(ref.Weight)
if len(params) != 3 {
return nil, errors.Errorf("datasource reference '%s' weight invalid: %s", ref.Name, ref.Weight)
Expand All @@ -110,19 +78,20 @@ func (ref *DataSourceRef) castToDataSourceBrief() (*DataSourceBrief, error) {
}

// groupDataSourceRefs cast DataSourceRef to DataSourceBrief, then group them.
func groupDataSourceRefs(dataSources []*DataSourceRef) (masters []*DataSourceBrief, reads []*DataSourceBrief, err error) {
func groupDataSourceRefs(dataSources []*config.DataSourceRef) (all, masters, reads []*DataSourceBrief, err error) {
for i := 0; i < len(dataSources); i++ {
ds := dataSources[i]
brief, err := ds.castToDataSourceBrief()
brief, err := castToDataSourceBrief(ds)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
if brief.IsMaster {
masters = append(masters, brief)
}
if brief.ReadWeight > 0 {
reads = append(reads, brief)
}
all = append(all, brief)
}
return
}
Loading