Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: refactor to reuse in load data part 5 #42856

Merged
merged 9 commits into from
Apr 11, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
change
  • Loading branch information
D3Hunter committed Apr 7, 2023
commit 5d9a83ffecf919228ad67af5012f555728458b80
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ mock_s3iface:
mock_lightning:
@mockgen -package mock -mock_names AbstractBackend=MockBackend github.com/pingcap/tidb/br/pkg/lightning/backend AbstractBackend,EngineWriter,TargetInfoGetter > br/pkg/mock/backend.go
@mockgen -package mock github.com/pingcap/tidb/br/pkg/lightning/backend/encode Encoder,EncodingBuilder,Rows,Row > br/pkg/mock/encode.go
@mockgen -package mocklocal github.com/pingcap/tidb/br/pkg/lightning/backend/local DiskUsage > br/pkg/mock/mocklocal/local.go

# There is no FreeBSD environment for GitHub actions. So cross-compile on Linux
# but that doesn't work with CGO_ENABLED=1, so disable cgo. The reason to have
Expand Down
82 changes: 19 additions & 63 deletions br/pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/parser/model"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)

const (
Expand Down Expand Up @@ -183,11 +182,6 @@ type AbstractBackend interface {
// (e.g. preparing to resolve a disk quota violation).
FlushAllEngines(ctx context.Context) error

// EngineFileSizes obtains the size occupied locally of all engines managed
// by this backend. This method is used to compute disk quota.
// It can return nil if the content are all stored remotely.
EngineFileSizes() []EngineFileSize

// ResetEngine clears all written KV pairs in this opened engine.
ResetEngine(ctx context.Context, engineUUID uuid.UUID) error

Expand Down Expand Up @@ -227,13 +221,6 @@ type OpenedEngine struct {
// // cleanup deletes the imported data.
// cleanup(ctx context.Context) error

// ClosedEngine represents a closed engine, allowing ingestion into the target.
// This type is goroutine safe: you can share an instance and execute any method
// anywhere.
type ClosedEngine struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to below where its methods are defined

engine
}

// LocalEngineWriter is a thread-local writer for writing rows into a single engine.
type LocalEngineWriter struct {
writer EngineWriter
Expand Down Expand Up @@ -265,56 +252,6 @@ func (be Backend) TotalMemoryConsume() int64 {
return be.abstract.TotalMemoryConsume()
}

// CheckDiskQuota verifies if the total engine file size is below the given
// quota. If the quota is exceeded, this method returns an array of engines,
// which after importing can decrease the total size below quota.
func (be Backend) CheckDiskQuota(quota int64) (
largeEngines []uuid.UUID,
inProgressLargeEngines int,
totalDiskSize int64,
totalMemSize int64,
) {
sizes := be.abstract.EngineFileSizes()
slices.SortFunc(sizes, func(i, j EngineFileSize) bool {
if i.IsImporting != j.IsImporting {
return i.IsImporting
}
return i.DiskSize+i.MemSize < j.DiskSize+j.MemSize
})
for _, size := range sizes {
totalDiskSize += size.DiskSize
totalMemSize += size.MemSize
if totalDiskSize+totalMemSize > quota {
if size.IsImporting {
inProgressLargeEngines++
} else {
largeEngines = append(largeEngines, size.UUID)
}
}
}
return
}

// UnsafeImportAndReset forces the backend to import the content of an engine
// into the target and then reset the engine to empty. This method will not
// close the engine. Make sure the engine is flushed manually before calling
// this method.
func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error {
// DO NOT call be.abstract.CloseEngine()! The engine should still be writable after
// calling UnsafeImportAndReset().
closedEngine := ClosedEngine{
engine: engine{
backend: be.abstract,
logger: makeLogger(log.FromContext(ctx), "<import-and-reset>", engineUUID),
uuid: engineUUID,
},
}
if err := closedEngine.Import(ctx, regionSplitSize, regionSplitKeys); err != nil {
return err
}
return be.abstract.ResetEngine(ctx, engineUUID)
}

// OpenEngine opens an engine with the given table name and engine ID.
func (be Backend) OpenEngine(ctx context.Context, config *EngineConfig, tableName string, engineID int32) (*OpenedEngine, error) {
tag, engineUUID := MakeUUID(tableName, engineID)
Expand Down Expand Up @@ -445,6 +382,25 @@ func (en engine) GetID() int32 {
return en.id
}

// ClosedEngine represents a closed engine, allowing ingestion into the target.
// This type is goroutine safe: you can share an instance and execute any method
// anywhere.
type ClosedEngine struct {
engine
}

// NewClosedEngine creates a new ClosedEngine.
func NewClosedEngine(backend AbstractBackend, logger log.Logger, uuid uuid.UUID, id int32) *ClosedEngine {
return &ClosedEngine{
engine: engine{
backend: backend,
logger: logger,
uuid: uuid,
id: id,
},
}
}

// Import the data written to the engine into the target.
func (engine *ClosedEngine) Import(ctx context.Context, regionSplitSize, regionSplitKeys int64) error {
var err error
Expand Down
74 changes: 0 additions & 74 deletions br/pkg/lightning/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,77 +336,3 @@ func TestNewEncoder(t *testing.T) {
require.Equal(t, realEncoder, encoder)
require.NoError(t, err)
}

func TestCheckDiskQuota(t *testing.T) {
s := createBackendSuite(t)
defer s.tearDownTest()

uuid1 := uuid.MustParse("11111111-1111-1111-1111-111111111111")
uuid3 := uuid.MustParse("33333333-3333-3333-3333-333333333333")
uuid5 := uuid.MustParse("55555555-5555-5555-5555-555555555555")
uuid7 := uuid.MustParse("77777777-7777-7777-7777-777777777777")
uuid9 := uuid.MustParse("99999999-9999-9999-9999-999999999999")

fileSizes := []backend.EngineFileSize{
{
UUID: uuid1,
DiskSize: 1000,
MemSize: 0,
IsImporting: false,
},
{
UUID: uuid3,
DiskSize: 2000,
MemSize: 1000,
IsImporting: true,
},
{
UUID: uuid5,
DiskSize: 1500,
MemSize: 3500,
IsImporting: false,
},
{
UUID: uuid7,
DiskSize: 0,
MemSize: 7000,
IsImporting: true,
},
{
UUID: uuid9,
DiskSize: 4500,
MemSize: 4500,
IsImporting: false,
},
}

s.mockBackend.EXPECT().EngineFileSizes().Return(fileSizes).Times(4)

// No quota exceeded
le, iple, ds, ms := s.backend.CheckDiskQuota(30000)
require.Len(t, le, 0)
require.Equal(t, 0, iple)
require.Equal(t, int64(9000), ds)
require.Equal(t, int64(16000), ms)

// Quota exceeded, the largest one is out
le, iple, ds, ms = s.backend.CheckDiskQuota(20000)
require.Equal(t, []uuid.UUID{uuid9}, le)
require.Equal(t, 0, iple)
require.Equal(t, int64(9000), ds)
require.Equal(t, int64(16000), ms)

// Quota exceeded, the importing one should be ranked least priority
le, iple, ds, ms = s.backend.CheckDiskQuota(12000)
require.Equal(t, []uuid.UUID{uuid5, uuid9}, le)
require.Equal(t, 0, iple)
require.Equal(t, int64(9000), ds)
require.Equal(t, int64(16000), ms)

// Quota exceeded, the importing ones should not be visible
le, iple, ds, ms = s.backend.CheckDiskQuota(5000)
require.Equal(t, []uuid.UUID{uuid1, uuid5, uuid9}, le)
require.Equal(t, 1, iple)
require.Equal(t, int64(9000), ds)
require.Equal(t, int64(16000), ms)
}
2 changes: 2 additions & 0 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go_library(
name = "local",
srcs = [
"compress.go",
"disk_quota.go",
"duplicate.go",
"engine.go",
"iterator.go",
Expand Down Expand Up @@ -91,6 +92,7 @@ go_test(
timeout = "short",
srcs = [
"compress_test.go",
"disk_quota_test.go",
"duplicate_test.go",
"engine_test.go",
"iterator_test.go",
Expand Down
59 changes: 59 additions & 0 deletions br/pkg/lightning/backend/local/disk_quota.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package local

import (
"github.com/google/uuid"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"golang.org/x/exp/slices"
)

// DiskUsage is an interface to obtain the size occupied locally of all engines
type DiskUsage interface {
// EngineFileSizes obtains the size occupied locally of all engines managed
// by this backend. This method is used to compute disk quota.
// It can return nil if the content are all stored remotely.
EngineFileSizes() (res []backend.EngineFileSize)
}

// CheckDiskQuota verifies if the total engine file size is below the given
// quota. If the quota is exceeded, this method returns an array of engines,
// which after importing can decrease the total size below quota.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we use inProgressLargeEngines?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't look in detail, just copied out

func CheckDiskQuota(mgr DiskUsage, quota int64) (
largeEngines []uuid.UUID,
inProgressLargeEngines int,
totalDiskSize int64,
totalMemSize int64,
) {
sizes := mgr.EngineFileSizes()
slices.SortFunc(sizes, func(i, j backend.EngineFileSize) bool {
if i.IsImporting != j.IsImporting {
return i.IsImporting
}
return i.DiskSize+i.MemSize < j.DiskSize+j.MemSize
})
for _, size := range sizes {
totalDiskSize += size.DiskSize
totalMemSize += size.MemSize
if totalDiskSize+totalMemSize > quota {
if size.IsImporting {
inProgressLargeEngines++
} else {
largeEngines = append(largeEngines, size.UUID)
}
}
}
return
}
99 changes: 99 additions & 0 deletions br/pkg/lightning/backend/local/disk_quota_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package local

import (
"testing"

"github.com/golang/mock/gomock"
"github.com/google/uuid"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/mock/mocklocal"
"github.com/stretchr/testify/require"
)

func TestCheckDiskQuota(t *testing.T) {
controller := gomock.NewController(t)
mockDiskUsage := mocklocal.NewMockDiskUsage(controller)

uuid1 := uuid.MustParse("11111111-1111-1111-1111-111111111111")
uuid3 := uuid.MustParse("33333333-3333-3333-3333-333333333333")
uuid5 := uuid.MustParse("55555555-5555-5555-5555-555555555555")
uuid7 := uuid.MustParse("77777777-7777-7777-7777-777777777777")
uuid9 := uuid.MustParse("99999999-9999-9999-9999-999999999999")

fileSizes := []backend.EngineFileSize{
{
UUID: uuid1,
DiskSize: 1000,
MemSize: 0,
IsImporting: false,
},
{
UUID: uuid3,
DiskSize: 2000,
MemSize: 1000,
IsImporting: true,
},
{
UUID: uuid5,
DiskSize: 1500,
MemSize: 3500,
IsImporting: false,
},
{
UUID: uuid7,
DiskSize: 0,
MemSize: 7000,
IsImporting: true,
},
{
UUID: uuid9,
DiskSize: 4500,
MemSize: 4500,
IsImporting: false,
},
}

mockDiskUsage.EXPECT().EngineFileSizes().Return(fileSizes).Times(4)

// No quota exceeded
le, iple, ds, ms := CheckDiskQuota(mockDiskUsage, 30000)
require.Len(t, le, 0)
require.Equal(t, 0, iple)
require.Equal(t, int64(9000), ds)
require.Equal(t, int64(16000), ms)

// Quota exceeded, the largest one is out
le, iple, ds, ms = CheckDiskQuota(mockDiskUsage, 20000)
require.Equal(t, []uuid.UUID{uuid9}, le)
require.Equal(t, 0, iple)
require.Equal(t, int64(9000), ds)
require.Equal(t, int64(16000), ms)

// Quota exceeded, the importing one should be ranked least priority
le, iple, ds, ms = CheckDiskQuota(mockDiskUsage, 12000)
require.Equal(t, []uuid.UUID{uuid5, uuid9}, le)
require.Equal(t, 0, iple)
require.Equal(t, int64(9000), ds)
require.Equal(t, int64(16000), ms)

// Quota exceeded, the importing ones should not be visible
le, iple, ds, ms = CheckDiskQuota(mockDiskUsage, 5000)
require.Equal(t, []uuid.UUID{uuid1, uuid5, uuid9}, le)
require.Equal(t, 1, iple)
require.Equal(t, int64(9000), ds)
require.Equal(t, int64(16000), ms)
}
Loading