-
Notifications
You must be signed in to change notification settings - Fork 45
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Project import generated by Copybara. (#122)
GitOrigin-RevId: db1aeceab3849acc274df148925bc6b33016c470 Co-authored-by: A. Googler <cloud-spanner-emulator-bot@google.com>
- Loading branch information
1 parent
f9dcd54
commit 748544f
Showing
103 changed files
with
2,350 additions
and
790 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
adirastogi # KIR | ||
arawind #BLR |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
jinjj # CAM | ||
hengfeng # SYD |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,149 @@ | ||
// | ||
// Copyright 2020 Google LLC | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
// | ||
|
||
#include "backend/schema/backfills/change_stream_backfill.h" | ||
|
||
#include <cstdint> | ||
#include <cstdlib> | ||
#include <string> | ||
#include <vector> | ||
|
||
#include "zetasql/public/value.h" | ||
#include "absl/time/time.h" | ||
#include "backend/common/ids.h" | ||
#include "backend/common/rows.h" | ||
#include "backend/schema/catalog/column.h" | ||
#include "common/errors.h" | ||
#include "common/limits.h" | ||
#include "zetasql/base/status_macros.h" | ||
|
||
namespace google { | ||
namespace spanner { | ||
namespace emulator { | ||
namespace backend { | ||
|
||
absl::StatusOr<Key> ComputeChangeStreamPartitionTableKey( | ||
std::string partition_token_str, const ChangeStream* change_stream) { | ||
// Columns must be added to the key for each column in change stream partition | ||
// table primary key. | ||
Key key; | ||
key.AddColumn(zetasql::Value::String(partition_token_str), | ||
change_stream->change_stream_partition_table() | ||
->FindKeyColumn("partition_token") | ||
->is_descending()); | ||
const int64_t key_size = key.LogicalSizeInBytes(); | ||
if (key_size > limits::kMaxKeySizeBytes) { | ||
return error::KeyTooLarge( | ||
change_stream->change_stream_partition_table()->Name(), key_size, | ||
limits::kMaxKeySizeBytes); | ||
} | ||
return key; | ||
} | ||
|
||
std::vector<zetasql::Value> CreateInitialBackfillPartitions( | ||
std::vector<zetasql::Value> row_values, std::string partition_token_str, | ||
absl::Time start_time, absl::Time end_time) { | ||
// specify partition_token | ||
row_values.push_back(zetasql::Value::String(partition_token_str)); | ||
// Specify start_time | ||
row_values.push_back(zetasql::Value::Timestamp(start_time)); | ||
// Specify end_time | ||
row_values.push_back(zetasql::Value::Timestamp(end_time)); | ||
// Specify parents | ||
std::vector<zetasql::Value> parents_values; | ||
parents_values.push_back(zetasql::Value::NullString()); | ||
row_values.push_back(zetasql::Value::Array( | ||
zetasql::types::StringArrayType(), parents_values)); | ||
// Specify children | ||
std::vector<zetasql::Value> children_values; | ||
children_values.push_back(zetasql::Value::NullString()); | ||
row_values.push_back(zetasql::Value::Array( | ||
zetasql::types::StringArrayType(), children_values)); | ||
return row_values; | ||
} | ||
|
||
std::string gen_random(const int len) { | ||
static const char alphanum[] = | ||
"0123456789" | ||
"ABCDEFGHIJKLMNOPQRSTUVWXYZ" | ||
"abcdefghijklmnopqrstuvwxyz"; | ||
std::string tmp_s, token_string; | ||
tmp_s.reserve(len); | ||
|
||
for (int i = 0; i < len; ++i) { | ||
tmp_s += alphanum[rand() % (sizeof(alphanum) - 1)]; | ||
} | ||
absl::WebSafeBase64Escape(tmp_s, &token_string); | ||
return token_string; | ||
} | ||
|
||
std::string CreatePartitionTokenString() { | ||
const int64_t min_partition_token_len = 130; | ||
const int64_t max_partition_token_len = 170; | ||
const int64_t partition_token_len = | ||
min_partition_token_len + | ||
rand() % (max_partition_token_len - min_partition_token_len) + 1; | ||
return gen_random(partition_token_len); | ||
} | ||
|
||
absl::Status BackfillChangeStreamPartition( | ||
const SchemaValidationContext* context, const ChangeStream* change_stream, | ||
absl::Span<const Column* const> change_stream_partition_table_columns, | ||
std::vector<ColumnID> change_stream_partition_table_column_ids) { | ||
std::string partition_token_str = CreatePartitionTokenString(); | ||
// Populate values for the row representing the first partition | ||
std::vector<zetasql::Value> initial_row_values; | ||
initial_row_values.reserve(change_stream_partition_table_columns.size()); | ||
initial_row_values = CreateInitialBackfillPartitions( | ||
initial_row_values, partition_token_str, absl::UnixEpoch(), | ||
absl::FromUnixMicros(zetasql::types::kTimestampMax)); | ||
// Create Key for the change stream partition table | ||
ZETASQL_ASSIGN_OR_RETURN( | ||
Key change_stream_partition_table_key, | ||
ComputeChangeStreamPartitionTableKey(partition_token_str, change_stream), | ||
_.SetErrorCode(absl::StatusCode::kFailedPrecondition)); | ||
// Insert the second new row in the change_stream_partition_table. | ||
ZETASQL_RETURN_IF_ERROR(context->storage()->Write( | ||
context->pending_commit_timestamp(), | ||
change_stream->change_stream_partition_table()->id(), | ||
change_stream_partition_table_key, | ||
change_stream_partition_table_column_ids, initial_row_values)); | ||
return absl::OkStatus(); | ||
} | ||
|
||
absl::Status BackfillChangeStream(const ChangeStream* change_stream, | ||
const SchemaValidationContext* context) { | ||
const Table* change_stream_partition_table = | ||
change_stream->change_stream_partition_table(); | ||
absl::Span<const Column* const> change_stream_partition_table_columns = | ||
change_stream_partition_table->columns(); | ||
std::vector<ColumnID> change_stream_partition_table_column_ids = | ||
GetColumnIDs(change_stream_partition_table_columns); | ||
// Backfill the first partition | ||
// Generate partition token | ||
const int64_t num_initial_partitions = 2; | ||
for (int i = 0; i < num_initial_partitions; ++i) { | ||
ZETASQL_RETURN_IF_ERROR(BackfillChangeStreamPartition( | ||
context, change_stream, change_stream_partition_table_columns, | ||
change_stream_partition_table_column_ids)); | ||
} | ||
return absl::OkStatus(); | ||
} | ||
|
||
} // namespace backend | ||
} // namespace emulator | ||
} // namespace spanner | ||
} // namespace google |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
// | ||
// Copyright 2020 Google LLC | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
// | ||
|
||
#ifndef THIRD_PARTY_CLOUD_SPANNER_EMULATOR_BACKEND_SCHEMA_BACKFILLS_CHANGE_STREAM_BACKFILL_H_ | ||
#define THIRD_PARTY_CLOUD_SPANNER_EMULATOR_BACKEND_SCHEMA_BACKFILLS_CHANGE_STREAM_BACKFILL_H_ | ||
|
||
#include "backend/schema/catalog/change_stream.h" | ||
#include "backend/schema/updater/schema_validation_context.h" | ||
|
||
namespace google { | ||
namespace spanner { | ||
namespace emulator { | ||
namespace backend { | ||
|
||
// Handles backfilling of a newly created change stream partition table and | ||
// change stream data table. | ||
absl::Status BackfillChangeStream(const ChangeStream* change_stream, | ||
const SchemaValidationContext* context); | ||
|
||
} // namespace backend | ||
} // namespace emulator | ||
} // namespace spanner | ||
} // namespace google | ||
|
||
#endif // THIRD_PARTY_CLOUD_SPANNER_EMULATOR_BACKEND_SCHEMA_BACKFILLS_CHANGE_STREAM_BACKFILL_H_ |
Oops, something went wrong.