Skip to content
Closed
66 changes: 66 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

name: Build and Lint

on:
push:
branches:
- unstable
pull_request:
branches:
- unstable

jobs:
build-and-lint:
name: Build and Lint
runs-on: ubuntu-latest
steps:
- name: Checkout Code Base
uses: actions/checkout@v4
with:
fetch-depth: 0

- name: Install Go
uses: actions/setup-go@v5
with:
go-version: '1.23'

- name: Docker Login
if: ${{ secrets.DOCKER_USERNAME != '' && secrets.DOCKER_PASSWORD != '' }}
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}

- name: Restore Go Module Cache
uses: actions/cache@v4
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-1.23-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-1.23-

- name: Build Project
run: go build -v ./...

- name: Run golangci-lint
uses: golangci/golangci-lint-action@v6
with:
version: v1.63.4
args: --timeout=5m
77 changes: 77 additions & 0 deletions .github/workflows/nightly-image.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
name: Nightly Docker Image

on:
push:
branches:
- unstable
tags:
- v*
pull_request:
branches:
- unstable
schedule:
- cron: '0 2 * * *'
workflow_dispatch:

permissions:
contents: read

jobs:
build-and-push:
name: Build and push Docker image
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4

- name: Set up QEMU
uses: docker/setup-qemu-action@v3

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3

- name: Login to DockerHub
if: github.event_name != 'pull_request' && secrets.DOCKERHUB_USERNAME && secrets.DOCKERHUB_TOKEN
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}

- name: Docker Meta
id: meta
uses: docker/metadata-action@v5
with:
images: apache/kvrocks-controller
tags: |
type=schedule
type=ref,event=branch
type=ref,event=pr
type=semver,pattern={{version}}
type=semver,pattern={{major}}.{{minor}}
type=sha

- name: Build and push
uses: docker/build-push-action@v6
with:
context: .
platforms: linux/amd64,linux/arm64
push: ${{ github.event_name != 'pull_request' }}
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
2 changes: 1 addition & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func init() {

func registerSignal(closeFn func()) {
c := make(chan os.Signal, 1)
signal.Notify(c, []os.Signal{syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGUSR1}...)
signal.Notify(c, []os.Signal{syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM}...)
go func() {
for sig := range c {
if handleSignals(sig) {
Expand Down
98 changes: 98 additions & 0 deletions controller/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,103 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedClu
}
}

func (c *ClusterChecker) processMigrationQueue(ctx context.Context, clonedCluster *store.Cluster) {
log := logger.Get().With(
zap.String("namespace", c.namespace),
zap.String("cluster", c.clusterName))

// Check if any shard is currently migrating
for _, shard := range clonedCluster.Shards {
if shard.IsMigrating() {
return // Wait for current migration to finish
}
}

// Process pending tasks
if len(clonedCluster.MigrationTasks) == 0 {
return
}

task := clonedCluster.MigrationTasks[0]
if task.Status == store.MigrationTaskPending {
// Update task status to Migrating
task.Status = store.MigrationTaskMigrating
task.StartTime = time.Now().Unix()
if err := c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil {
log.Error("Failed to update migration task status", zap.Error(err))
return
}
c.updateCluster(clonedCluster)
}

if task.Status == store.MigrationTaskMigrating {
if len(task.PendingSlotRanges) == 0 {
// All slots migrated, mark task as Success
task.Status = store.MigrationTaskSuccess
task.FinishTime = time.Now().Unix()
// Remove task from queue
if len(clonedCluster.MigrationTasks) > 1 {
clonedCluster.MigrationTasks = clonedCluster.MigrationTasks[1:]
} else {
clonedCluster.MigrationTasks = []*store.MigrationTask{}
}
if err := c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil {
log.Error("Failed to update migration task status to success", zap.Error(err))
return
}
c.updateCluster(clonedCluster)
return
}

// Pick next slot range to migrate
slotRange := task.PendingSlotRanges[0]
err := clonedCluster.MigrateSlot(ctx, slotRange, task.TargetShardIdx, task.SlotOnly)
if err != nil {
log.Error("Failed to start migration for slot range",
zap.String("slot", slotRange.String()),
zap.Error(err))

task.Retries++
if task.MaxRetries > 0 && task.Retries > task.MaxRetries {
switch strings.ToLower(task.FailurePolicy) {
case "skip":
log.Warn("Skip failed slot range after max retries", zap.String("slot", slotRange.String()))
task.PendingSlotRanges = task.PendingSlotRanges[1:]
task.Retries = 0
case "abort":
log.Error("Abort migration task after max retries", zap.String("task_id", task.TaskID))
task.Status = store.MigrationTaskFailed
task.FinishTime = time.Now().Unix()
task.Error = err.Error()
default:
// keep retrying
}
}

if err := c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil {
log.Error("Failed to persist migration retry state", zap.Error(err))
return
}
c.updateCluster(clonedCluster)
return
}

task.MigratingSlot = slotRange
if len(task.PendingSlotRanges) > 1 {
task.PendingSlotRanges = task.PendingSlotRanges[1:]
} else {
task.PendingSlotRanges = []store.SlotRange{}
}
task.Retries = 0

if err := c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil {
log.Error("Failed to persist migration start", zap.Error(err))
return
}
c.updateCluster(clonedCluster)
}
}

func (c *ClusterChecker) migrationLoop() {
defer c.wg.Done()

Expand All @@ -423,6 +520,7 @@ func (c *ClusterChecker) migrationLoop() {
continue
}
c.tryUpdateMigrationStatus(c.ctx, clonedCluster)
c.processMigrationQueue(c.ctx, clonedCluster)
}
}
}
Expand Down
87 changes: 87 additions & 0 deletions controller/migration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package controller

import (
"context"
"testing"

"github.com/stretchr/testify/require"

"github.com/apache/kvrocks-controller/store"
"github.com/apache/kvrocks-controller/store/engine"
)

func TestMigrationQueue(t *testing.T) {
ns := "test-ns"
clusterName := "test-cluster"
s := store.NewClusterStore(engine.NewMock())
ctx := context.Background()

cluster, err := store.NewCluster(clusterName, []string{"127.0.0.1:1111", "127.0.0.1:2222"}, 1)
require.NoError(t, err)
require.NoError(t, s.CreateNamespace(ctx, ns))
require.NoError(t, s.CreateCluster(ctx, ns, cluster))

checker := NewClusterChecker(s, ns, clusterName)

t.Run("Queue and Process Multiple Ranges", func(t *testing.T) {
slots := []store.SlotRange{{Start: 10, Stop: 12}, {Start: 20, Stop: 22}}
// Use slotOnly=true for easier unit testing (no node calls)
err := cluster.MigrateSlots(ctx, slots, 1, true, "append", "retry", 3)
require.NoError(t, err)
require.NoError(t, s.UpdateCluster(ctx, ns, cluster))

// First loop: Start task and immediately process first range
checker.processMigrationQueue(ctx, cluster)
require.Equal(t, store.MigrationTaskMigrating, cluster.MigrationTasks[0].Status)
require.Equal(t, store.SlotRange{Start: 10, Stop: 12}, cluster.MigrationTasks[0].MigratingSlot)
require.Len(t, cluster.MigrationTasks[0].PendingSlotRanges, 1)

// With slotOnly=true, the slot is moved immediately in MigrateSlot
// So we don't expect MigratingSlot to be set on the shard, and SlotRanges should be updated.
require.Nil(t, cluster.Shards[0].MigratingSlot)

// Second loop: Process second range
checker.processMigrationQueue(ctx, cluster)
require.Equal(t, store.SlotRange{Start: 20, Stop: 22}, cluster.MigrationTasks[0].MigratingSlot)
require.Len(t, cluster.MigrationTasks[0].PendingSlotRanges, 0)

// Third loop: Task success and removed
checker.processMigrationQueue(ctx, cluster)
require.Len(t, cluster.MigrationTasks, 0)
})

t.Run("Failure with Abort", func(t *testing.T) {
cluster.MigrationTasks = nil
slots := []store.SlotRange{{Start: 30, Stop: 32}}
// Migrate to non-existent shard to trigger index out of range error in MigrateSlot
err := cluster.MigrateSlots(ctx, slots, 99, true, "append", "abort", 1)
require.NoError(t, err)
require.NoError(t, s.UpdateCluster(ctx, ns, cluster))

checker.processMigrationQueue(ctx, cluster) // Pending -> Migrating and starts range (fails because targetShardIdx 99 is invalid)
require.Equal(t, 1, cluster.MigrationTasks[0].Retries)

checker.processMigrationQueue(ctx, cluster) // Attempt 2 (fail -> abort)

require.Equal(t, store.MigrationTaskFailed, cluster.MigrationTasks[0].Status)
})
}
Loading