Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: refactor to reuse in load data part 5 #42856

Merged
merged 9 commits into from
Apr 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,9 @@ mock_s3iface:
@mockgen -package mock github.com/aws/aws-sdk-go/service/s3/s3iface S3API > br/pkg/mock/s3iface.go

mock_lightning:
@mockgen -package mock -mock_names AbstractBackend=MockBackend github.com/pingcap/tidb/br/pkg/lightning/backend AbstractBackend,EngineWriter,TargetInfoGetter > br/pkg/mock/backend.go
@mockgen -package mock github.com/pingcap/tidb/br/pkg/lightning/backend Backend,EngineWriter,TargetInfoGetter,ChunkFlushStatus > br/pkg/mock/backend.go
@mockgen -package mock github.com/pingcap/tidb/br/pkg/lightning/backend/encode Encoder,EncodingBuilder,Rows,Row > br/pkg/mock/encode.go
@mockgen -package mocklocal github.com/pingcap/tidb/br/pkg/lightning/backend/local DiskUsage > br/pkg/mock/mocklocal/local.go

# There is no FreeBSD environment for GitHub actions. So cross-compile on Linux
# but that doesn't work with CGO_ENABLED=1, so disable cgo. The reason to have
Expand Down
1 change: 0 additions & 1 deletion br/pkg/lightning/backend/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ go_library(
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@org_golang_x_exp//slices",
"@org_uber_go_zap//:zap",
],
)
Expand Down
229 changes: 53 additions & 176 deletions br/pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,39 +30,12 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/parser/model"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)

const (
importMaxRetryTimes = 3 // tikv-importer has done retry internally. so we don't retry many times.
)

/*

Usual workflow:

1. Create a `Backend` for the whole process.

2. For each table,

i. Split into multiple "batches" consisting of data files with roughly equal total size.

ii. For each batch,

a. Create an `OpenedEngine` via `backend.OpenEngine()`

b. For each chunk, deliver data into the engine via `engine.WriteRows()`

c. When all chunks are written, obtain a `ClosedEngine` via `engine.Close()`

d. Import data via `engine.Import()`

e. Cleanup via `engine.Cleanup()`

3. Close the connection via `backend.Close()`

*/

func makeTag(tableName string, engineID int32) string {
return fmt.Sprintf("%s:%d", tableName, engineID)
}
Expand Down Expand Up @@ -99,7 +72,10 @@ type EngineFileSize struct {
// LocalWriterConfig defines the configuration to open a LocalWriter
type LocalWriterConfig struct {
// is the chunk KV written to this LocalWriter sent in order
// only needed for local backend, can omit for tidb backend
IsKVSorted bool
// only needed for tidb backend, can omit for local backend
TableName string
}

// EngineConfig defines configuration used for open engine
Expand Down Expand Up @@ -145,10 +121,21 @@ type TargetInfoGetter interface {
CheckRequirements(ctx context.Context, checkCtx *CheckCtx) error
}

// AbstractBackend is the abstract interface behind Backend.
// Backend defines the interface for a backend.
// Implementations of this interface must be goroutine safe: you can share an
// instance and execute any method anywhere.
type AbstractBackend interface {
// Usual workflow:
// 1. Create a `Backend` for the whole process.
// 2. For each table,
// i. Split into multiple "batches" consisting of data files with roughly equal total size.
// ii. For each batch,
// a. Create an `OpenedEngine` via `backend.OpenEngine()`
// b. For each chunk, deliver data into the engine via `engine.WriteRows()`
// c. When all chunks are written, obtain a `ClosedEngine` via `engine.Close()`
// d. Import data via `engine.Import()`
// e. Cleanup via `engine.Cleanup()`
// 3. Close the connection via `backend.Close()`
type Backend interface {
// Close the connection to the backend.
Close()

Expand Down Expand Up @@ -183,28 +170,22 @@ type AbstractBackend interface {
// (e.g. preparing to resolve a disk quota violation).
FlushAllEngines(ctx context.Context) error

// EngineFileSizes obtains the size occupied locally of all engines managed
// by this backend. This method is used to compute disk quota.
// It can return nil if the content are all stored remotely.
EngineFileSizes() []EngineFileSize

// ResetEngine clears all written KV pairs in this opened engine.
ResetEngine(ctx context.Context, engineUUID uuid.UUID) error

// LocalWriter obtains a thread-local EngineWriter for writing rows into the given engine.
LocalWriter(ctx context.Context, cfg *LocalWriterConfig, engineUUID uuid.UUID) (EngineWriter, error)

// TotalMemoryConsume counts total memory usage. This is only used for local backend.
TotalMemoryConsume() int64
}

// Backend is the delivery target for Lightning
type Backend struct {
abstract AbstractBackend
// EngineManager is the manager of engines.
// this is a wrapper of Backend, which provides some common methods for managing engines.
// and it has no states, can be created on demand
type EngineManager struct {
backend Backend
}

type engine struct {
backend AbstractBackend
backend Backend
logger log.Logger
uuid uuid.UUID
// id of the engine, used to generate uuid and stored in checkpoint
Expand All @@ -221,106 +202,17 @@ type OpenedEngine struct {
config *EngineConfig
}

// // import_ the data written to the engine into the target.
// import_(ctx context.Context) error

// // cleanup deletes the imported data.
// cleanup(ctx context.Context) error

// ClosedEngine represents a closed engine, allowing ingestion into the target.
// This type is goroutine safe: you can share an instance and execute any method
// anywhere.
type ClosedEngine struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to below where its methods are defined

engine
}

// LocalEngineWriter is a thread-local writer for writing rows into a single engine.
type LocalEngineWriter struct {
writer EngineWriter
tableName string
}

// MakeBackend creates a new Backend from an AbstractBackend.
func MakeBackend(ab AbstractBackend) Backend {
return Backend{abstract: ab}
}

// Close the connection to the backend.
func (be Backend) Close() {
be.abstract.Close()
}

// ShouldPostProcess returns whether KV-specific post-processing should be
func (be Backend) ShouldPostProcess() bool {
return be.abstract.ShouldPostProcess()
}

// FlushAll flushes all opened engines.
func (be Backend) FlushAll(ctx context.Context) error {
return be.abstract.FlushAllEngines(ctx)
}

// TotalMemoryConsume returns the total memory consumed by the backend.
func (be Backend) TotalMemoryConsume() int64 {
return be.abstract.TotalMemoryConsume()
}

// CheckDiskQuota verifies if the total engine file size is below the given
// quota. If the quota is exceeded, this method returns an array of engines,
// which after importing can decrease the total size below quota.
func (be Backend) CheckDiskQuota(quota int64) (
largeEngines []uuid.UUID,
inProgressLargeEngines int,
totalDiskSize int64,
totalMemSize int64,
) {
sizes := be.abstract.EngineFileSizes()
slices.SortFunc(sizes, func(i, j EngineFileSize) bool {
if i.IsImporting != j.IsImporting {
return i.IsImporting
}
return i.DiskSize+i.MemSize < j.DiskSize+j.MemSize
})
for _, size := range sizes {
totalDiskSize += size.DiskSize
totalMemSize += size.MemSize
if totalDiskSize+totalMemSize > quota {
if size.IsImporting {
inProgressLargeEngines++
} else {
largeEngines = append(largeEngines, size.UUID)
}
}
}
return
}

// UnsafeImportAndReset forces the backend to import the content of an engine
// into the target and then reset the engine to empty. This method will not
// close the engine. Make sure the engine is flushed manually before calling
// this method.
func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error {
// DO NOT call be.abstract.CloseEngine()! The engine should still be writable after
// calling UnsafeImportAndReset().
closedEngine := ClosedEngine{
engine: engine{
backend: be.abstract,
logger: makeLogger(log.FromContext(ctx), "<import-and-reset>", engineUUID),
uuid: engineUUID,
},
}
if err := closedEngine.Import(ctx, regionSplitSize, regionSplitKeys); err != nil {
return err
}
return be.abstract.ResetEngine(ctx, engineUUID)
// MakeEngineManager creates a new Backend from an Backend.
func MakeEngineManager(ab Backend) EngineManager {
return EngineManager{backend: ab}
}

// OpenEngine opens an engine with the given table name and engine ID.
func (be Backend) OpenEngine(ctx context.Context, config *EngineConfig, tableName string, engineID int32) (*OpenedEngine, error) {
func (be EngineManager) OpenEngine(ctx context.Context, config *EngineConfig, tableName string, engineID int32) (*OpenedEngine, error) {
tag, engineUUID := MakeUUID(tableName, engineID)
logger := makeLogger(log.FromContext(ctx), tag, engineUUID)

if err := be.abstract.OpenEngine(ctx, config, engineUUID); err != nil {
if err := be.backend.OpenEngine(ctx, config, engineUUID); err != nil {
return nil, err
}

Expand All @@ -346,7 +238,7 @@ func (be Backend) OpenEngine(ctx context.Context, config *EngineConfig, tableNam

return &OpenedEngine{
engine: engine{
backend: be.abstract,
backend: be.backend,
logger: logger,
uuid: engineUUID,
id: engineID,
Expand All @@ -356,11 +248,6 @@ func (be Backend) OpenEngine(ctx context.Context, config *EngineConfig, tableNam
}, nil
}

// Inner returns the underlying abstract backend.
func (be Backend) Inner() AbstractBackend {
return be.abstract
}

// Close the opened engine to prepare it for importing.
func (engine *OpenedEngine) Close(ctx context.Context) (*ClosedEngine, error) {
closedEngine, err := engine.unsafeClose(ctx, engine.config)
Expand All @@ -378,40 +265,16 @@ func (engine *OpenedEngine) Flush(ctx context.Context) error {
}

// LocalWriter returns a writer that writes to the local backend.
func (engine *OpenedEngine) LocalWriter(ctx context.Context, cfg *LocalWriterConfig) (*LocalEngineWriter, error) {
w, err := engine.backend.LocalWriter(ctx, cfg, engine.uuid)
if err != nil {
return nil, err
}
return &LocalEngineWriter{writer: w, tableName: engine.tableName}, nil
}

// TotalMemoryConsume returns the total memory consumed by the engine.
func (engine *OpenedEngine) TotalMemoryConsume() int64 {
return engine.engine.backend.TotalMemoryConsume()
}

// WriteRows writes a collection of encoded rows into the engine.
func (w *LocalEngineWriter) WriteRows(ctx context.Context, columnNames []string, rows encode.Rows) error {
return w.writer.AppendRows(ctx, w.tableName, columnNames, rows)
}

// Close closes the engine and returns the status of the engine.
func (w *LocalEngineWriter) Close(ctx context.Context) (ChunkFlushStatus, error) {
return w.writer.Close(ctx)
}

// IsSynced returns whether the engine is synced.
func (w *LocalEngineWriter) IsSynced() bool {
return w.writer.IsSynced()
func (engine *OpenedEngine) LocalWriter(ctx context.Context, cfg *LocalWriterConfig) (EngineWriter, error) {
return engine.backend.LocalWriter(ctx, cfg, engine.uuid)
}

// UnsafeCloseEngine closes the engine without first opening it.
// This method is "unsafe" as it does not follow the normal operation sequence
// (Open -> Write -> Close -> Import). This method should only be used when one
// knows via other ways that the engine has already been opened, e.g. when
// resuming from a checkpoint.
func (be Backend) UnsafeCloseEngine(ctx context.Context, cfg *EngineConfig, tableName string, engineID int32) (*ClosedEngine, error) {
func (be EngineManager) UnsafeCloseEngine(ctx context.Context, cfg *EngineConfig, tableName string, engineID int32) (*ClosedEngine, error) {
tag, engineUUID := MakeUUID(tableName, engineID)
return be.UnsafeCloseEngineWithUUID(ctx, cfg, tag, engineUUID, engineID)
}
Expand All @@ -421,9 +284,9 @@ func (be Backend) UnsafeCloseEngine(ctx context.Context, cfg *EngineConfig, tabl
// (Open -> Write -> Close -> Import). This method should only be used when one
// knows via other ways that the engine has already been opened, e.g. when
// resuming from a checkpoint.
func (be Backend) UnsafeCloseEngineWithUUID(ctx context.Context, cfg *EngineConfig, tag string, engineUUID uuid.UUID, id int32) (*ClosedEngine, error) {
func (be EngineManager) UnsafeCloseEngineWithUUID(ctx context.Context, cfg *EngineConfig, tag string, engineUUID uuid.UUID, id int32) (*ClosedEngine, error) {
return engine{
backend: be.abstract,
backend: be.backend,
logger: makeLogger(log.FromContext(ctx), tag, engineUUID),
uuid: engineUUID,
id: id,
Expand All @@ -445,6 +308,25 @@ func (en engine) GetID() int32 {
return en.id
}

// ClosedEngine represents a closed engine, allowing ingestion into the target.
// This type is goroutine safe: you can share an instance and execute any method
// anywhere.
type ClosedEngine struct {
engine
}

// NewClosedEngine creates a new ClosedEngine.
func NewClosedEngine(backend Backend, logger log.Logger, uuid uuid.UUID, id int32) *ClosedEngine {
return &ClosedEngine{
engine: engine{
backend: backend,
logger: logger,
uuid: uuid,
id: id,
},
}
}

// Import the data written to the engine into the target.
func (engine *ClosedEngine) Import(ctx context.Context, regionSplitSize, regionSplitKeys int64) error {
var err error
Expand Down Expand Up @@ -483,12 +365,7 @@ type ChunkFlushStatus interface {

// EngineWriter is the interface for writing data to an engine.
type EngineWriter interface {
AppendRows(
ctx context.Context,
tableName string,
columnNames []string,
rows encode.Rows,
) error
AppendRows(ctx context.Context, columnNames []string, rows encode.Rows) error
IsSynced() bool
Close(ctx context.Context) (ChunkFlushStatus, error)
}
Expand Down
Loading