Skip to content

Commit eb3115f

Browse files
committed
feat(make_backup): add root_path for backup
1 parent 90dad61 commit eb3115f

File tree

23 files changed

+441
-187
lines changed

23 files changed

+441
-187
lines changed

.github/workflows/unit-test.yml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,3 +93,20 @@ jobs:
9393
- name: docker compose down
9494
run: |
9595
docker compose down
96+
- name: docker compose up
97+
if: ${{ matrix.enable_new_paths_format }}
98+
run: |
99+
docker compose up -d
100+
- name: run new_paths_format tests
101+
if: ${{ matrix.enable_new_paths_format }}
102+
run: |
103+
while [ "$(docker inspect -f {{.State.Health.Status}} local-ydbcp)" != "healthy" ]; do
104+
echo "Waiting for container to become healthy..."
105+
sleep 1
106+
done
107+
echo "Starting new_paths_format tests!"
108+
docker exec local-ydbcp sh -c './test_new_paths_format'
109+
- name: docker compose down
110+
if: ${{ matrix.enable_new_paths_format }}
111+
run: |
112+
docker compose down

Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ RUN go build -o . ./cmd/ydbcp/main.go
2020
RUN go build -o ./make_backup ./cmd/integration/make_backup/main.go
2121
RUN go build -o ./list_entities ./cmd/integration/list_entities/main.go
2222
RUN go build -o ./orm ./cmd/integration/orm/main.go
23+
RUN go build -o ./test_new_paths_format ./cmd/integration/new_paths_format/main.go
2324

2425
# Command to run the executable
2526
CMD ["./main", "--config=local_config.yaml"]
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"log"
6+
"time"
7+
"ydbcp/cmd/integration/common"
8+
pb "ydbcp/pkg/proto/ydbcp/v1alpha1"
9+
10+
"google.golang.org/grpc"
11+
)
12+
13+
const (
14+
containerID = "abcde"
15+
databaseName = "/local"
16+
ydbcpEndpoint = "0.0.0.0:50051"
17+
databaseEndpoint = "grpcs://local-ydb:2135"
18+
)
19+
20+
type backupScenario struct {
21+
name string
22+
request *pb.MakeBackupRequest
23+
expectedRootPath string
24+
expectedSourcePaths []string
25+
}
26+
27+
func newMakeBackupRequest(rootPath string, sourcePaths []string) *pb.MakeBackupRequest {
28+
return &pb.MakeBackupRequest{
29+
ContainerId: containerID,
30+
DatabaseName: databaseName,
31+
DatabaseEndpoint: databaseEndpoint,
32+
RootPath: rootPath,
33+
SourcePaths: sourcePaths,
34+
}
35+
}
36+
37+
func runBackupScenario(ctx context.Context, backupClient pb.BackupServiceClient, opClient pb.OperationServiceClient, scenario backupScenario) {
38+
tbwr, err := backupClient.MakeBackup(ctx, scenario.request)
39+
if err != nil {
40+
log.Panicf("scenario %s: failed to make backup: %v", scenario.name, err)
41+
}
42+
op, err := opClient.GetOperation(
43+
ctx, &pb.GetOperationRequest{
44+
Id: tbwr.Id,
45+
},
46+
)
47+
if err != nil {
48+
log.Panicf("scenario %s: failed to get operation: %v", scenario.name, err)
49+
}
50+
51+
if op.GetRootPath() != scenario.expectedRootPath {
52+
log.Panicf("scenario %s: expected root path %q, got %q", scenario.name, scenario.expectedRootPath, op.GetRootPath())
53+
}
54+
55+
if !equalStringSlices(op.GetSourcePaths(), scenario.expectedSourcePaths) {
56+
log.Panicf("scenario %s: expected source paths %v, got %v", scenario.name, scenario.expectedSourcePaths, op.GetSourcePaths())
57+
}
58+
59+
log.Printf("scenario %s: passed", scenario.name)
60+
}
61+
62+
func equalStringSlices(a, b []string) bool {
63+
if len(a) != len(b) {
64+
return false
65+
}
66+
for i := range a {
67+
if a[i] != b[i] {
68+
return false
69+
}
70+
}
71+
return true
72+
}
73+
74+
func main() {
75+
conn := common.CreateGRPCClient(ydbcpEndpoint)
76+
defer func(conn *grpc.ClientConn) {
77+
err := conn.Close()
78+
if err != nil {
79+
log.Panicln("failed to close connection")
80+
}
81+
}(conn)
82+
backupClient := pb.NewBackupServiceClient(conn)
83+
opClient := pb.NewOperationServiceClient(conn)
84+
85+
ctx := context.Background()
86+
87+
scenarios := []backupScenario{
88+
{
89+
name: "full backup",
90+
request: newMakeBackupRequest("", nil),
91+
expectedRootPath: "",
92+
expectedSourcePaths: []string{},
93+
},
94+
{
95+
name: "full backup with specified root path",
96+
request: newMakeBackupRequest("stocks", nil),
97+
expectedRootPath: "stocks",
98+
expectedSourcePaths: []string{},
99+
},
100+
{
101+
name: "partial backup",
102+
request: newMakeBackupRequest("", []string{"kv_test"}),
103+
expectedRootPath: "",
104+
expectedSourcePaths: []string{"kv_test"},
105+
},
106+
{
107+
name: "partial backup with specified root path",
108+
request: newMakeBackupRequest("stocks", []string{"orders", "orderLines"}),
109+
expectedRootPath: "stocks",
110+
expectedSourcePaths: []string{"orders", "orderLines"},
111+
},
112+
}
113+
114+
for _, scenario := range scenarios {
115+
runBackupScenario(ctx, backupClient, opClient, scenario)
116+
time.Sleep(2 * time.Second)
117+
}
118+
}

init_db/create_tables.sh

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,16 @@
33

44
# create and fill user table kv_test
55
./ydb -e ${YDB_ENDPOINT} -d /local workload kv init
6-
./ydb -e ${YDB_ENDPOINT} -d /local workload kv run upsert --rows 100
6+
./ydb -e ${YDB_ENDPOINT} -d /local workload kv run upsert --rows 100
7+
8+
# create and fill user tables: stock, orders, orderLines
9+
./ydb -e ${YDB_ENDPOINT} -d /local workload stock init -p 10 -q 10 -o 10
10+
11+
# create directory for user tables
12+
./ydb -e ${YDB_ENDPOINT} -d /local scheme mkdir stocks
13+
14+
# move user tables (stock, orders, orderLines) to separate directory
15+
./ydb -e ${YDB_ENDPOINT} -d /local tools rename \
16+
--item source=/local/stock,destination=/local/stocks/stock \
17+
--item source=/local/orders,destination=/local/stocks/orders \
18+
--item source=/local/orderLines,destination=/local/stocks/orderLines

internal/backup_operations/make_backup.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type MakeBackupInternalRequest struct {
3030
ContainerID string
3131
DatabaseEndpoint string
3232
DatabaseName string
33+
RootPath string
3334
SourcePaths []string
3435
SourcePathsToExclude []string
3536
ScheduleID *string
@@ -42,6 +43,7 @@ func FromBackupSchedule(schedule *types.BackupSchedule) MakeBackupInternalReques
4243
ContainerID: schedule.ContainerID,
4344
DatabaseEndpoint: schedule.DatabaseEndpoint,
4445
DatabaseName: schedule.DatabaseName,
46+
RootPath: schedule.RootPath,
4547
SourcePaths: schedule.SourcePaths,
4648
SourcePathsToExclude: schedule.SourcePathsToExclude,
4749
ScheduleID: &schedule.ID,
@@ -57,6 +59,7 @@ func FromTBWROperation(tbwr *types.TakeBackupWithRetryOperation) MakeBackupInter
5759
ContainerID: tbwr.ContainerID,
5860
DatabaseEndpoint: tbwr.YdbConnectionParams.Endpoint,
5961
DatabaseName: tbwr.YdbConnectionParams.DatabaseName,
62+
RootPath: tbwr.RootPath,
6063
SourcePaths: tbwr.SourcePaths,
6164
SourcePathsToExclude: tbwr.SourcePathsToExclude,
6265
ScheduleID: tbwr.ScheduleID,
@@ -141,9 +144,14 @@ func ValidateSourcePaths(
141144
if req.ScheduleID != nil {
142145
ctx = xlog.With(ctx, zap.String("ScheduleID", *req.ScheduleID))
143146
}
147+
basePath, ok := SafePathJoin(req.DatabaseName, req.RootPath)
148+
if !ok {
149+
xlog.Error(ctx, "incorrect root path", zap.String("path", req.RootPath))
150+
return nil, status.Errorf(codes.InvalidArgument, "incorrect root path %s", req.RootPath)
151+
}
144152
sourcePaths := make([]string, 0, len(req.SourcePaths))
145153
for _, p := range req.SourcePaths {
146-
fullPath, ok := SafePathJoin(req.DatabaseName, p)
154+
fullPath, ok := SafePathJoin(basePath, p)
147155
if !ok {
148156
xlog.Error(ctx, "incorrect source path", zap.String("path", p))
149157
return nil, status.Errorf(codes.InvalidArgument, "incorrect source path %s", p)
@@ -345,6 +353,7 @@ func MakeBackup(
345353
SecretKey: secretKey,
346354
Description: "ydbcp backup", // TODO: the description shoud be better
347355
NumberOfRetries: 10, // TODO: get it from configuration
356+
RootPath: req.RootPath,
348357
SourcePaths: pathsForExport,
349358
DestinationPrefix: destinationPrefix,
350359
S3ForcePathStyle: s3.S3ForcePathStyle,
@@ -393,6 +402,7 @@ func MakeBackup(
393402
Endpoint: req.DatabaseEndpoint,
394403
DatabaseName: req.DatabaseName,
395404
},
405+
RootPath: req.RootPath,
396406
SourcePaths: req.SourcePaths,
397407
SourcePathsToExclude: req.SourcePathsToExclude,
398408
Audit: &pb.AuditInfo{

internal/connectors/client/connector.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ func (d *ClientYdbConnector) ExportToS3(
268268
}
269269

270270
if featureFlags.EnableNewPathsFormat {
271-
exportRequest.Settings.SourcePath = clientDb.Name()
271+
exportRequest.Settings.SourcePath = path.Join(clientDb.Name(), s3Settings.RootPath)
272272
exportRequest.Settings.DestinationPrefix = s3Settings.DestinationPrefix
273273
}
274274

internal/connectors/db/process_result_set.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ func ReadOperationFromResultSet(res query.Row) (types.Operation, error) {
143143
ydbOperationId *string
144144
operationStateBuf *string
145145
message *string
146+
rootPath *string
146147
sourcePaths *string
147148
sourcePathsToExclude *string
148149
creator *string
@@ -168,6 +169,7 @@ func ReadOperationFromResultSet(res query.Row) (types.Operation, error) {
168169
query.Named("operation_id", &ydbOperationId),
169170
query.Named("status", &operationStateBuf),
170171
query.Named("message", &message),
172+
query.Named("root_path", &rootPath),
171173
query.Named("paths", &sourcePaths),
172174
query.Named("paths_to_exclude", &sourcePathsToExclude),
173175

@@ -223,6 +225,7 @@ func ReadOperationFromResultSet(res query.Row) (types.Operation, error) {
223225
Endpoint: databaseEndpoint,
224226
DatabaseName: databaseName,
225227
},
228+
RootPath: StringOrEmpty(rootPath),
226229
SourcePaths: sourcePathsSlice,
227230
SourcePathsToExclude: sourcePathsToExcludeSlice,
228231
YdbOperationId: StringOrEmpty(ydbOperationId),
@@ -301,6 +304,7 @@ func ReadOperationFromResultSet(res query.Row) (types.Operation, error) {
301304
Endpoint: databaseEndpoint,
302305
DatabaseName: databaseName,
303306
},
307+
RootPath: StringOrEmpty(rootPath),
304308
SourcePaths: sourcePathsSlice,
305309
SourcePathsToExclude: sourcePathsToExcludeSlice,
306310
Audit: auditFromDb(creator, createdAt, completedAt),
@@ -330,6 +334,7 @@ func ReadBackupScheduleFromResultSet(res query.Row, withRPOInfo bool) (*types.Ba
330334
createdAt *time.Time
331335
name *string
332336
ttl *time.Duration
337+
rootPath *string
333338
sourcePaths *string
334339
sourcePathsToExclude *string
335340
recoveryPointObjective *time.Duration
@@ -351,6 +356,7 @@ func ReadBackupScheduleFromResultSet(res query.Row, withRPOInfo bool) (*types.Ba
351356
query.Named("created_at", &createdAt),
352357
query.Named("name", &name),
353358
query.Named("ttl", &ttl),
359+
query.Named("root_path", &rootPath),
354360
query.Named("paths", &sourcePaths),
355361
query.Named("paths_to_exclude", &sourcePathsToExclude),
356362
query.Named("recovery_point_objective", &recoveryPointObjective),
@@ -400,6 +406,7 @@ func ReadBackupScheduleFromResultSet(res query.Row, withRPOInfo bool) (*types.Ba
400406
ContainerID: containerID,
401407
DatabaseName: databaseName,
402408
DatabaseEndpoint: databaseEndpoint,
409+
RootPath: StringOrEmpty(rootPath),
403410
SourcePaths: sourcePathsSlice,
404411
SourcePathsToExclude: sourcePathsToExcludeSlice,
405412
Audit: auditFromDb(initiated, createdAt, nil),

internal/connectors/db/yql/queries/write.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,9 @@ func BuildCreateOperationQuery(operation types.Operation, index int) WriteSingle
117117
"$operation_id",
118118
table_types.StringValueFromString(tb.YdbOperationId),
119119
)
120+
if len(tb.RootPath) > 0 {
121+
d.AddValueParam("$root_path", table_types.StringValueFromString(tb.RootPath))
122+
}
120123
if len(tb.SourcePaths) > 0 {
121124
d.AddValueParam("$paths", table_types.StringValueFromString(types.SerializeSourcePaths(tb.SourcePaths)))
122125
}
@@ -146,6 +149,9 @@ func BuildCreateOperationQuery(operation types.Operation, index int) WriteSingle
146149
"$endpoint",
147150
table_types.StringValueFromString(tbwr.YdbConnectionParams.Endpoint),
148151
)
152+
if len(tbwr.RootPath) > 0 {
153+
d.AddValueParam("$root_path", table_types.StringValueFromString(tbwr.RootPath))
154+
}
149155
if len(tbwr.SourcePaths) > 0 {
150156
d.AddValueParam("$paths", table_types.StringValueFromString(types.SerializeSourcePaths(tbwr.SourcePaths)))
151157
}
@@ -357,6 +363,9 @@ func BuildCreateBackupScheduleQuery(schedule types.BackupSchedule, index int) Wr
357363
if schedule.ScheduleSettings.Ttl != nil {
358364
d.AddValueParam("$ttl", table_types.IntervalValueFromDuration(schedule.ScheduleSettings.Ttl.AsDuration()))
359365
}
366+
if len(schedule.RootPath) > 0 {
367+
d.AddValueParam("$root_path", table_types.StringValueFromString(schedule.RootPath))
368+
}
360369
if len(schedule.SourcePaths) > 0 {
361370
d.AddValueParam("$paths", table_types.StringValueFromString(types.SerializeSourcePaths(schedule.SourcePaths)))
362371
}

internal/connectors/db/yql/schema/create_tables.yql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ CREATE TABLE Operations (
5151
status String,
5252
message String,
5353

54+
root_path String,
5455
paths String,
5556
paths_to_exclude String,
5657
operation_id String,
@@ -79,6 +80,7 @@ CREATE TABLE BackupSchedules (
7980

8081
crontab String NOT NULL,
8182
ttl Interval,
83+
root_path String,
8284
paths String,
8385
paths_to_exclude String,
8486

internal/handlers/schedule_backup.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ func BackupScheduleHandler(
6767
Endpoint: schedule.DatabaseEndpoint,
6868
DatabaseName: schedule.DatabaseName,
6969
},
70+
RootPath: schedule.RootPath,
7071
SourcePaths: schedule.SourcePaths,
7172
SourcePathsToExclude: schedule.SourcePathsToExclude,
7273
Audit: &pb.AuditInfo{

0 commit comments

Comments
 (0)