Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion api/openapi-rest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,21 @@ paths:
schema:
$ref: '#/components/schemas/ClusterNodesResponse'

/v1/cluster/recreate:
put:
tags:
- recreate
summary: Recreates the cluster
description: Recreates the cluster discovering the nodes
operationId: RecreateCluster
responses:
'201':
description: Cluster recreation has been launched
content:
application/json:
schema:
$ref: '#/components/schemas/ClusterRecreateResponse'

components:
schemas:
RedKeyClusterStatusResponse:
Expand Down Expand Up @@ -421,4 +436,14 @@ components:
linkStatus:
type: string
description: RedKey Cluster node link status
example: connected
example: connected
ClusterRecreateResponse:
type: object
properties:
status:
type: string
description: RedKey Cluster recreation status
example: InProgress
enum:
- InProgress
- Error
5 changes: 5 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ The following endpoints are provided:
- `PUT`: performs the node reset and returns the result.
- `/v1/cluster/nodes`: endpoint that returns the information that Robin has about the nodes. Methods provided:
- `GET`: returns a JSON with a list of object, each containing `id` (string), `name` (string), `flags` (string), `slots` (array of objects with `start` (integer) and `end` (integer)), `ip` (string), `masterId` (string), `failures` (integer), `sent` (integer), `recv` (integer) and `linkStatus` (string).
- `/v1/cluster/recreate`: endpoint to force cluster recreations, rediscovering nodes. Methods provided:
- `PUT`: performs the recreate operation.

This specification can also be observed in `api/openapi-rest.yml` file

Expand All @@ -42,12 +44,15 @@ Some operations described above are done asynchronously since they are time cons
- `PUT` to `/v1/cluster/replicas`: Robin will change the replicas, launch a separate goroutine to perform the associated operations and returns.
- `PUT` to `/v1/cluster/move`: Robin will launch a separate goroutine to do the resharding and returns.
- `PUT` to `/v1/cluster/fix`: Robin will launch a separate goroutine to perform the fix operations and returns.
- `PUT` to `/v1/cluster/recreate`: Robin will launch a separate goroutine to perform the recreate operation and return.


In order to have traceability, know what Robin is doing and not launching the same operation twice, Robin will maintain in its internal state a reference to the running operations. This way, the same operation will only be launched once even if it is invoked more than once. Callers can know this by analyzing the HTTP status code returned by these endpoints:

- 200 OK: the request is already done. This response will depend on the called endpoint:
- `/v1/cluster/replicas`: the RedKey Cluster has already the requested replicas.
- `/v1/cluster/move`: the node received at `from` is already empty, `from` node is a replica or `from` node has replicas (and one of them has been promoted to master).
- `/v1/cluster/fix`: this endpoint will not returns this status code.
- `/v1/cluster/recreate`: this endpoint will not returns this status code.
- 201 Created: the request triggers the action, that is, a new goroutine to perform the action has been launched.
- 202 Accepted: the requested action is already running, therefore, the request does not trigger a new goroutine. This status code is only used by `/v1/cluster/move`.
3 changes: 3 additions & 0 deletions internal/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
CheckingIntegrityError = "CheckingIntegrityError"
Resetting = "Resetting"
ResettingError = "ResettingError"
NoReconciling = "NoReconciling"
RedKeyClusterTotalSlots = 16384
RedisNodesUnbalancedThreshold = 2
)
Expand Down Expand Up @@ -118,6 +119,8 @@ type Cluster interface {
Upgrade(force bool) error
// ResetNode resets a node of the cluster.
ResetNode(node *redis.RedisNode) error
// Clears stored nodes.
ClearNodes() error
}

// NewCluster creates a new cluster. It returns a standalone or RedKey Cluster based on the cluster type.
Expand Down
6 changes: 6 additions & 0 deletions internal/cluster/redkey.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,12 @@ func (rc *RedKeyCluster) ResetNode(node *redis.RedisNode) error {
return rc.launchOperation(NewRedisOperationResetNode(rc.ctx, rc, node), Upgrading, false)
}

// Clears the stored nodes creating a new map.
func (rc *RedKeyCluster) ClearNodes() error {
rc.nodes = make(map[string]*redis.RedisNode)
return nil
}

// ----------------------------------------------------------------------------------------------------
// ---------------------------------------------- ASKERS ----------------------------------------------
// ----------------------------------------------------------------------------------------------------
Expand Down
5 changes: 5 additions & 0 deletions internal/cluster/standalone.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ func (rc *RedKeyStandalone) ResetNode(node *redis.RedisNode) error {
return nil
}

// Clears the stored nodes.
func (rc *RedKeyStandalone) ClearNodes() error {
return nil
}

// ----------------------------------------------------------------------------------------------------
// --------------------------------------------- PRIVATE ----------------------------------------------
// ----------------------------------------------------------------------------------------------------
Expand Down
26 changes: 26 additions & 0 deletions internal/httpserver/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,32 @@ func (s *Server) FixCluster(w http.ResponseWriter, r *http.Request) {
s.sendResponse(w, http.StatusCreated, response)
}

// FixCluster handles the PUT /v1/cluster/recreate endpoint. It recreates the RedKey cluster.
func (s *Server) RecreateCluster(w http.ResponseWriter, r *http.Request) {
s.logger.Info("Recreate cluster")

s.cluster.ClearNodes()
err := s.cluster.Init()
if err == nil {
err = s.cluster.CheckIntegrity(true, false)
}

// Send the response
response := ClusterRecreateResponse{
Status: "In progress",
}
if err != nil {
if _, ok := err.(*cluster.OperationInProgressError); ok {
s.sendResponse(w, http.StatusAccepted, response)
return
}

s.sendError(w, http.StatusInternalServerError, fmt.Sprintf("Error recreating cluster: %v", err))
return
}
s.sendResponse(w, http.StatusCreated, response)
}

// ResetNode handles the POST /v1/cluster/reset/{nodeIndex} endpoint. It resets a Redis node.
func (s *Server) ResetNode(w http.ResponseWriter, r *http.Request) {
s.logger.Info("Reset node")
Expand Down
29 changes: 29 additions & 0 deletions internal/httpserver/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,3 +456,32 @@ func TestGetNodes(t *testing.T) {
})
}
}

func TestRecreateCluster(t *testing.T) {
tests := []struct {
name string
request string
expectedBody ResponseInterface
expectedStatusCode int
}{
{
name: "good request",
expectedBody: ClusterRecreateResponse{
Status: "In progress",
},
expectedStatusCode: http.StatusCreated,
},
{
name: "fix in progress",
expectedBody: ClusterRecreateResponse{
Status: "In progress",
},
expectedStatusCode: http.StatusAccepted,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testRequest(t, "PUT", "/cluster/fix", tt.request, "", nil, server.FixCluster, tt.expectedStatusCode, tt.expectedBody)
})
}
}
1 change: 1 addition & 0 deletions internal/httpserver/httpserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func (s *Server) Init(opts *util.Options) error {
mux.HandleFunc("PUT /v1/cluster/fix", s.FixCluster)
mux.HandleFunc("PUT /v1/cluster/reset/{nodeIndex}", s.ResetNode)
mux.HandleFunc("GET /v1/cluster/nodes", s.GetNodes)
mux.HandleFunc("PUT /v1/cluster/recreate", s.RecreateCluster)
}

// Create the server
Expand Down
2 changes: 1 addition & 1 deletion internal/httpserver/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

var (
ValidRedKeyClusterStatus = []string{cluster.Initializing, cluster.Configuring, cluster.Ready, cluster.Error, cluster.Upgrading, cluster.ScalingDown, cluster.ScalingUp, cluster.Maintenance, cluster.Unknown}
ValidRedKeyClusterStatus = []string{cluster.Initializing, cluster.Configuring, cluster.Ready, cluster.Error, cluster.Upgrading, cluster.ScalingDown, cluster.ScalingUp, cluster.Maintenance, cluster.Unknown, cluster.NoReconciling}
)

type RequestInterface interface {
Expand Down
8 changes: 8 additions & 0 deletions internal/httpserver/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,11 @@ type ClusterNodesResponse struct {
func (r ClusterNodesResponse) GetKeys() []string {
return []string{"nodes"}
}

type ClusterRecreateResponse struct {
Status string `json:"status"`
}

func (r ClusterRecreateResponse) GetKeys() []string {
return []string{"status"}
}
Loading