Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add select for update #319

Merged
merged 8 commits into from
Oct 30, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/datasource/sql/datasource/datasource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type DataSourceManager interface {
RegisterResource(resource rm.Resource) error
// Unregister a Resource from the Resource Manager
UnregisterResource(resource rm.Resource) error
// Get all resources managed by this manager
// GetManagedResources Get all resources managed by this manager
GetManagedResources() map[string]rm.Resource
// BranchRollback
BranchRollback(ctx context.Context, req message.BranchRollbackRequest) (branch.BranchStatus, error)
Expand Down
350 changes: 350 additions & 0 deletions pkg/datasource/sql/exec/select_for_update_executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,350 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package exec

import (
"bytes"
"context"
"database/sql/driver"
"fmt"
"io"
"time"

"github.com/arana-db/parser/ast"
"github.com/arana-db/parser/format"
"github.com/arana-db/parser/model"
"github.com/seata/seata-go/pkg/datasource/sql/datasource"
"github.com/seata/seata-go/pkg/datasource/sql/types"
"github.com/seata/seata-go/pkg/datasource/sql/undo/builder"
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/message"
seatabytes "github.com/seata/seata-go/pkg/util/bytes"
"github.com/seata/seata-go/pkg/util/log"
)

const (
// todo replace by config
retryTimes = 5
retryInterval = 20 * time.Millisecond
)

type SelectForUpdateExecutor struct {
builder.BasicUndoLogBuilder
}

func (s SelectForUpdateExecutor) interceptors(interceptors []SQLHook) {
}

func (s SelectForUpdateExecutor) ExecWithNamedValue(ctx context.Context, execCtx *types.ExecContext, f CallbackWithNamedValue) (types.ExecResult, error) {
if !execCtx.IsInGlobalTransaction && !execCtx.IsRequireGlobalLock {
return f(ctx, execCtx.Query, execCtx.NamedValues)
}

var (
tx driver.Tx
nowTs = time.Now().Unix()
result types.ExecResult
savepointName string
originalAutoCommit = execCtx.IsAutoCommit
)

table, err := execCtx.ParseContext.GteTableName()
if err != nil {
return nil, err
}
// build query primary key sql
selectPKSQL, err := s.buildSelectPKSQL(execCtx.ParseContext.SelectStmt, execCtx.MetaDataMap[table])
if err != nil {
return nil, err
}

i := 0
for ; i < retryTimes; i++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里的 retry 可以用范哥封装的的backoff 吗?可以统一的话感觉会更好

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里的 retry 可以用范哥封装的的backoff 吗?可以统一的话感觉会更好

后面提需求,统一来优化吧

if originalAutoCommit {
// In order to hold the local db lock during global lock checking
// set auto commit value to false first if original auto commit was true
tx, err = execCtx.Conn.Begin()
if err != nil {
return nil, err
}
execCtx.IsAutoCommit = false
} else if execCtx.IsSupportsSavepoints {
// In order to release the local db lock when global lock conflict
// create a save point if original auto commit was false, then use the save point here to release db
// lock during global lock checking if necessary
savepointName = fmt.Sprintf("savepoint %d;", nowTs)
stmt, err := execCtx.Conn.Prepare(savepointName)
if err != nil {
return nil, err
}
if _, err = stmt.Exec(nil); err != nil {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

按照stmt.go中调用规则应该是
ExecWithNamedValue 对应 stmt.ExecContext
ExecWithValue 对应stmt.Exec

请教下这种使用方式是保存当前快照吗?

Copy link
Contributor Author

@luky116 luky116 Oct 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

按照stmt.go中调用规则应该是
ExecWithNamedValue 对应 stmt.ExecContext
ExecWithValue 对应stmt.Exec

这个应该没有对应关系吧,Exec和ExecContext都可以。

请教下这种使用方式是保存当前快照吗?

这里是设置save point断点

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好的, 我一开始以为这里是因为不同的conn类型导致 需要调用不同的 Exec方法.

return nil, err
}
} else {
return nil, fmt.Errorf("not support savepoint. please check your db version")
}

// execute business SQL, try to get local lock
result, err = f(ctx, execCtx.Query, execCtx.NamedValues)
if err != nil {
return nil, err
}

// query primary key values
stmt, err := execCtx.Conn.Prepare(selectPKSQL)
if err != nil {
return nil, err
}
values := make([]driver.Value, 0, len(execCtx.NamedValues))
for _, val := range execCtx.NamedValues {
values = append(values, val.Value)
}
rows, err := stmt.Query(values)
if err != nil {
return nil, err
}

lockKey := s.buildLockKey(rows, execCtx.MetaDataMap[table])
if lockKey == "" {
break
}
// check global lock
lockable, err := datasource.GetDataSourceManager(branch.BranchTypeAT).LockQuery(ctx, message.GlobalLockQueryRequest{
BranchRegisterRequest: message.BranchRegisterRequest{
Xid: execCtx.TxCtx.XID,
BranchType: branch.BranchTypeAT,
ResourceId: execCtx.TxCtx.ResourceID,
LockKey: lockKey,
},
})

// if obtained global lock
if err == nil && lockable {
break
}

if savepointName != "" {
if stmt, err = execCtx.Conn.Prepare(fmt.Sprintf("rollback to %s;", savepointName)); err != nil {
return nil, err
}
if _, err = stmt.Exec(nil); err != nil {
return nil, err
}
} else {
if err = tx.Rollback(); err != nil {
return nil, err
}
}
time.Sleep(retryInterval)
}

if i >= retryTimes {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里漏掉了rollback

Copy link
Contributor Author

@luky116 luky116 Oct 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果 i >= retryTimes,说明重试了很多次失败了,rollback会在for循环最后做,这里可以不做rollback

return nil, fmt.Errorf("global lock wait timeout")
}

if originalAutoCommit {
if err = tx.Commit(); err != nil {
return nil, err
}
execCtx.IsAutoCommit = true
}
return result, nil
}

func (s SelectForUpdateExecutor) ExecWithValue(ctx context.Context, execCtx *types.ExecContext, f CallbackWithValue) (types.ExecResult, error) {
if !execCtx.IsInGlobalTransaction && !execCtx.IsRequireGlobalLock {
return f(ctx, execCtx.Query, execCtx.Values)
}

var (
tx driver.Tx
nowTs = time.Now().Unix()
result types.ExecResult
savepointName string
originalAutoCommit = execCtx.IsAutoCommit
)

table, err := execCtx.ParseContext.GteTableName()
if err != nil {
return nil, err
}
// build query primary key sql
selectPKSQL, err := s.buildSelectPKSQL(execCtx.ParseContext.SelectStmt, execCtx.MetaDataMap[table])
if err != nil {
return nil, err
}

i := 0
for ; i < retryTimes; i++ {
if originalAutoCommit {
// In order to hold the local db lock during global lock checking
// set auto commit value to false first if original auto commit was true
tx, err = execCtx.Conn.Begin()
if err != nil {
return nil, err
}
} else if execCtx.IsSupportsSavepoints {
// In order to release the local db lock when global lock conflict
// create a save point if original auto commit was false, then use the save point here to release db
// lock during global lock checking if necessary
savepointName = fmt.Sprintf("savepoint %d;", nowTs)
stmt, err := execCtx.Conn.Prepare(savepointName)
if err != nil {
return nil, err
}
if _, err = stmt.Exec(nil); err != nil {
return nil, err
}
} else {
return nil, fmt.Errorf("not support savepoint. please check your db version")
}

// execute business SQL, try to get local lock
result, err = f(ctx, execCtx.Query, execCtx.Values)
if err != nil {
return nil, err
}

// query primary key values
stmt, err := execCtx.Conn.Prepare(selectPKSQL)
if err != nil {
return nil, err
}
rows, err := stmt.Query(execCtx.Values)
if err != nil {
return nil, err
}

lockKey := s.buildLockKey(rows, execCtx.MetaDataMap[table])
if lockKey == "" {
break
}
// check global lock
lockable, err := datasource.GetDataSourceManager(branch.BranchTypeAT).LockQuery(ctx, message.GlobalLockQueryRequest{
BranchRegisterRequest: message.BranchRegisterRequest{
Xid: execCtx.TxCtx.XID,
BranchType: branch.BranchTypeAT,
ResourceId: execCtx.TxCtx.ResourceID,
LockKey: lockKey,
},
})

// has obtained global lock
if err == nil && lockable {
break
}

if savepointName != "" {
if stmt, err = execCtx.Conn.Prepare(fmt.Sprintf("rollback to %s;", savepointName)); err != nil {
return nil, err
}
if _, err = stmt.Exec(nil); err != nil {
return nil, err
}
} else {
if err = tx.Rollback(); err != nil {
return nil, err
}
}
time.Sleep(retryInterval)
}

if i >= retryTimes {
return nil, fmt.Errorf("global lock wait timeout")
}

if originalAutoCommit {
if err = tx.Commit(); err != nil {
return nil, err
}
execCtx.IsAutoCommit = true
}
return result, nil
}

// buildSelectSQLByUpdate build select sql from update sql
func (u *SelectForUpdateExecutor) buildSelectPKSQL(stmt *ast.SelectStmt, meta types.TableMeta) (string, error) {
pks := meta.GetPrimaryKeyOnlyName()
if len(pks) == 0 {
return "", fmt.Errorf("%s needs to contain the primary key.", meta.Schema)
}

fields := []*ast.SelectField{}
for _, column := range pks {
fields = append(fields, &ast.SelectField{
Expr: &ast.ColumnNameExpr{
Name: &ast.ColumnName{
Name: model.CIStr{
O: column,
L: column,
},
},
},
})
}

selStmt := ast.SelectStmt{
SelectStmtOpts: &ast.SelectStmtOpts{},
From: stmt.From,
Where: stmt.Where,
Fields: &ast.FieldList{Fields: fields},
OrderBy: stmt.OrderBy,
Limit: stmt.Limit,
TableHints: stmt.TableHints,
}

b := seatabytes.NewByteBuffer([]byte{})
selStmt.Restore(format.NewRestoreCtx(format.RestoreKeyWordUppercase, b))
sql := string(b.Bytes())
log.Infof("build select sql by update sourceQuery, sql {}", sql)

return sql, nil
}

// the string as local key. the local key example(multi pk): "t_user:1_a,2_b"
func (s SelectForUpdateExecutor) buildLockKey(rows driver.Rows, meta types.TableMeta) string {
var (
lockKeys bytes.Buffer
filedSequence int
)
lockKeys.WriteString(meta.Schema)
lockKeys.WriteString(":")

ss := s.GetScanSlice(meta.GetPrimaryKeyOnlyName(), meta)
for {
err := rows.Next(ss)
if err == io.EOF {
break
}

if filedSequence > 0 {
lockKeys.WriteString(",")
}

pkSplitIndex := 0
for _, value := range ss {
if pkSplitIndex > 0 {
lockKeys.WriteString("_")
}
lockKeys.WriteString(fmt.Sprintf("%v", value))
pkSplitIndex++
}
filedSequence++
}
return lockKeys.String()
}
Loading