Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support migrating an instance to sharding #48795

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions lib/private/DB/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public function __construct(
$this->_config->setSQLLogger($debugStack);
}

/** @var array<string, array{shards: array[], mapper: ?string}> $shardConfig */
/** @var array<string, array{shards: array[], mapper: ?string, from_primary_key: ?int, from_shard_key: ?int}> $shardConfig */
$shardConfig = $this->params['sharding'] ?? [];
$shardNames = array_keys($shardConfig);
$this->shards = array_map(function (array $config, string $name) {
Expand All @@ -180,7 +180,9 @@ public function __construct(
self::SHARD_PRESETS[$name]['shard_key'],
$shardMapper,
self::SHARD_PRESETS[$name]['companion_tables'],
$config['shards']
$config['shards'],
$config['from_primary_key'] ?? 0,
$config['from_shard_key'] ?? 0,
);
}, $shardConfig, $shardNames);
$this->shards = array_combine($shardNames, $this->shards);
Expand All @@ -199,8 +201,10 @@ public function getShardConnections(): array {
if ($this->isShardingEnabled) {
foreach ($this->shards as $shardDefinition) {
foreach ($shardDefinition->getAllShards() as $shard) {
/** @var ConnectionAdapter $connection */
$connections[] = $this->shardConnectionManager->getConnection($shardDefinition, $shard);
if ($shard !== ShardDefinition::MIGRATION_SHARD) {
/** @var ConnectionAdapter $connection */
$connections[] = $this->shardConnectionManager->getConnection($shardDefinition, $shard);
}
}
}
}
Expand Down
21 changes: 12 additions & 9 deletions lib/private/DB/QueryBuilder/Sharded/AutoIncrementHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ private function getNextInner(ShardDefinition $shardDefinition): ?int {
}

// discard the encoded initial shard
$current = $this->getMaxFromDb($shardDefinition) >> 8;
$current = $this->getMaxFromDb($shardDefinition);
$next = max($current, self::MIN_VALID_KEY) + 1;
if ($cache->cas($shardDefinition->table, 'empty-placeholder', $next)) {
return $next;
Expand All @@ -131,19 +131,22 @@ private function getNextInner(ShardDefinition $shardDefinition): ?int {
}

/**
* Get the maximum primary key value from the shards
* Get the maximum primary key value from the shards, note that this has already stripped any embedded shard id
*/
private function getMaxFromDb(ShardDefinition $shardDefinition): int {
$max = 0;
$max = $shardDefinition->fromFileId;
$query = $this->shardConnectionManager->getConnection($shardDefinition, 0)->getQueryBuilder();
$query->select($shardDefinition->primaryKey)
->from($shardDefinition->table)
->orderBy($shardDefinition->primaryKey, 'DESC')
->setMaxResults(1);
foreach ($shardDefinition->getAllShards() as $shard) {
$connection = $this->shardConnectionManager->getConnection($shardDefinition, $shard);
$query = $connection->getQueryBuilder();
$query->select($shardDefinition->primaryKey)
->from($shardDefinition->table)
->orderBy($shardDefinition->primaryKey, 'DESC')
->setMaxResults(1);
$result = $query->executeQuery()->fetchOne();
$result = $query->executeQuery($connection)->fetchOne();
if ($result) {
if ($result > $shardDefinition->fromFileId) {
$result = $result >> 8;
}
$max = max($max, $result);
}
}
Expand Down
11 changes: 10 additions & 1 deletion lib/private/DB/QueryBuilder/Sharded/ShardConnectionManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,17 @@ public function __construct(

public function getConnection(ShardDefinition $shardDefinition, int $shard): IDBConnection {
$connectionKey = $shardDefinition->table . '_' . $shard;
if (!isset($this->connections[$connectionKey])) {

if (isset($this->connections[$connectionKey])) {
return $this->connections[$connectionKey];
}

if ($shard === ShardDefinition::MIGRATION_SHARD) {
$this->connections[$connectionKey] = \OC::$server->get(IDBConnection::class);
} elseif (isset($shardDefinition->shards[$shard])) {
$this->connections[$connectionKey] = $this->createConnection($shardDefinition->shards[$shard]);
} else {
throw new \InvalidArgumentException("invalid shard key $shard only " . count($shardDefinition->shards) . ' configured');
}

return $this->connections[$connectionKey];
Expand Down
22 changes: 18 additions & 4 deletions lib/private/DB/QueryBuilder/Sharded/ShardDefinition.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
*/
class ShardDefinition {
// we reserve the bottom byte of the primary key for the initial shard, so the total shard count is limited to what we can fit there
public const MAX_SHARDS = 256;
// additionally, shard id 255 is reserved for migration purposes
public const MAX_SHARDS = 255;
public const MIGRATION_SHARD = 255;

public const PRIMARY_KEY_MASK = 0x7F_FF_FF_FF_FF_FF_FF_00;
public const PRIMARY_KEY_SHARD_MASK = 0x00_00_00_00_00_00_00_FF;
Expand All @@ -37,8 +39,10 @@ public function __construct(
public array $companionKeys,
public string $shardKey,
public IShardMapper $shardMapper,
public array $companionTables = [],
public array $shards = [],
public array $companionTables,
public array $shards,
public int $fromFileId,
public int $fromStorageId,
) {
if (count($this->shards) >= self::MAX_SHARDS) {
throw new \Exception('Only allowed maximum of ' . self::MAX_SHARDS . ' shards allowed');
Expand All @@ -53,11 +57,21 @@ public function hasTable(string $table): bool {
}

public function getShardForKey(int $key): int {
if ($key < $this->fromStorageId) {
return self::MIGRATION_SHARD;
}
return $this->shardMapper->getShardForKey($key, count($this->shards));
}

/**
* @return list<int>
*/
public function getAllShards(): array {
return array_keys($this->shards);
if ($this->fromStorageId !== 0) {
return array_merge(array_keys($this->shards), [self::MIGRATION_SHARD]);
} else {
return array_keys($this->shards);
}
}

public function isKey(string $column): bool {
Expand Down
3 changes: 3 additions & 0 deletions lib/private/DB/QueryBuilder/Sharded/ShardQueryRunner.php
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ public function getShards(bool $allShards, array $shardKeys): ?array {
private function getLikelyShards(array $primaryKeys): array {
$shards = [];
foreach ($primaryKeys as $primaryKey) {
if ($primaryKey < $this->shardDefinition->fromFileId && !in_array(ShardDefinition::MIGRATION_SHARD, $shards)) {
$shards[] = ShardDefinition::MIGRATION_SHARD;
}
$encodedShard = $primaryKey & ShardDefinition::PRIMARY_KEY_SHARD_MASK;
if ($encodedShard < count($this->shardDefinition->shards) && !in_array($encodedShard, $shards)) {
$shards[] = $encodedShard;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ private function getQueryBuilder(string $table, string $shardColumn, string $pri
return new ShardedQueryBuilder(
$this->connection->getQueryBuilder(),
[
new ShardDefinition($table, $primaryColumn, [], $shardColumn, new RoundRobinShardMapper(), $companionTables, []),
new ShardDefinition($table, $primaryColumn, [], $shardColumn, new RoundRobinShardMapper(), $companionTables, [], 0, 0),
],
$this->createMock(ShardConnectionManager::class),
$this->autoIncrementHandler,
Expand Down
Loading