Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/meta/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,10 @@ pub static METACLI_COMMIT_SEMVER: LazyLock<Version> = LazyLock::new(|| {
/// - 2025-07-01: since TODO: add when enables sequence storage v1
/// 👥 client: new sequence API v1: depends on `FetchAddU64`.
///
/// - 2025-07-03: since TODO: add when merged
/// - 2025-07-03: since 1.2.770
/// 🖥 server: adaptive `expire_at` support both seconds and milliseconds.
///
/// - 2025-07-04: since TODO: add when merged
/// - 2025-07-04: since 1.2.770
/// 🖥 server: add `PutSequential`.
///
/// Server feature set:
Expand Down
35 changes: 35 additions & 0 deletions src/meta/service/src/meta_service/raft_service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,41 @@ impl RaftService for RaftServiceImpl {
.await
}

async fn vote_v001(
&self,
request: Request<pb::VoteRequest>,
) -> Result<Response<pb::VoteResponse>, Status> {
let root = databend_common_tracing::start_trace_for_remote_request(func_path!(), &request);
let remote_addr = remote_addr(&request);

async {
let v_req_pb = request.into_inner();

let v_req: VoteRequest = v_req_pb.into();

let v_req_summary = v_req.summary();

info!(
"RaftServiceImpl::vote_v001: from:{remote_addr} start: {}",
v_req_summary
);

let raft = &self.meta_node.raft;

let resp = raft.vote(v_req).await.map_err(GrpcHelper::internal_err)?;

info!(
"RaftServiceImpl::vote_v001: from:{remote_addr} done: {}",
v_req_summary
);

let resp_pb = pb::VoteResponse::from(resp);
Ok(Response::new(resp_pb))
}
.in_span(root)
.await
}

async fn transfer_leader(
&self,
request: Request<pb::TransferLeaderRequest>,
Expand Down
41 changes: 35 additions & 6 deletions src/meta/service/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,18 +601,47 @@ impl RaftNetworkV2<TypeConfig> for Network {
) -> Result<VoteResponse, RPCError> {
info!(id = self.id, target = self.target, rpc = rpc.summary(); "send_vote");

let raft_req = GrpcHelper::encode_raft_request(&rpc).map_err(|e| Unreachable::new(&e))?;
let mut client = self
.take_client()
.log_elapsed_debug("Raft NetworkConnection vote take_client()")
.await?;

// First, try VoteV001 with native protobuf types
let vote_req_pb = pb::VoteRequest::from(rpc.clone());
let req_v001 = GrpcHelper::traced_req(vote_req_pb);

let grpc_res_v001 = client.vote_v001(req_v001).await;
info!(
"vote_v001: resp from target={} {:?}",
self.target, grpc_res_v001
);

match grpc_res_v001 {
Ok(response) => {
// VoteV001 succeeded, parse the VoteResponse directly
self.client.lock().await.replace(client);
let vote_response = response.into_inner();
let vote_resp: VoteResponse = vote_response.into();
return Ok(vote_resp);
}
Err(e) => {
// Only fall back for specific status codes indicating method not implemented
if matches!(e.code(), tonic::Code::Unimplemented | tonic::Code::NotFound) {
warn!(target = self.target, rpc = rpc.summary(); "vote_v001 not implemented, falling back to vote: {}", e);
} else {
// For other errors, don't fall back - return the error
return Err(RPCError::Unreachable(self.status_to_unreachable(e.clone())));
}
}
}

// Fallback to old Vote RPC using RaftRequest
let raft_req = GrpcHelper::encode_raft_request(&rpc).map_err(|e| Unreachable::new(&e))?;
let req = GrpcHelper::traced_req(raft_req);

let bytes = req.get_ref().data.len() as u64;
raft_metrics::network::incr_sendto_bytes(&self.target, bytes);

let mut client = self
.take_client()
.log_elapsed_debug("Raft NetworkConnection vote take_client()")
.await?;

let grpc_res = client.vote(req).await;
info!("vote: resp from target={} {:?}", self.target, grpc_res);

Expand Down
1 change: 1 addition & 0 deletions src/meta/service/src/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ pub(crate) mod raft {
del_provide(("install_snapshot", 2), "2024-07-02", (1, 2, 552)),
add_provide(("install_snapshot", 3), "2024-07-02", (1, 2, 552)),
del_provide(("install_snapshot", 1), "2025-07-02", (1, 2, 769)),
add_provide(("vote", 1), "2025-07-20", (1, 0, 0)), // TODO: fix the version
];

/// The client features that raft server depends on.
Expand Down
1 change: 1 addition & 0 deletions src/meta/types/proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ service RaftService {
// Added in 1.2.547, 2024-06-27
rpc InstallSnapshotV003(stream SnapshotChunkRequestV003) returns (SnapshotResponseV003);
rpc Vote(RaftRequest) returns (RaftReply);
rpc VoteV001(VoteRequest) returns (VoteResponse);
rpc TransferLeader(TransferLeaderRequest) returns (Empty);
}

Expand Down
11 changes: 11 additions & 0 deletions src/meta/types/proto/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,15 @@ message LogId {
uint64 term = 1;
uint64 node_id = 2;
uint64 index = 3;
}

message VoteRequest {
Vote vote = 1;
LogId last_log_id = 2;
}

message VoteResponse {
Vote vote = 1;
bool vote_granted = 2;
LogId last_log_id = 3;
}
2 changes: 2 additions & 0 deletions src/meta/types/src/proto_ext/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,5 @@ mod txn_delete_request_ext;
mod txn_delete_response_ext;
mod txn_put_request_ext;
mod txn_put_response_ext;
mod vote_request_ext;
mod vote_response_ext;
33 changes: 33 additions & 0 deletions src/meta/types/src/proto_ext/vote_request_ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::protobuf as pb;
use crate::raft_types;

impl From<raft_types::VoteRequest> for pb::VoteRequest {
fn from(req: raft_types::VoteRequest) -> Self {
pb::VoteRequest {
vote: Some(req.vote.into()),
last_log_id: req.last_log_id.map(|log_id| log_id.into()),
}
}
}

impl From<pb::VoteRequest> for raft_types::VoteRequest {
fn from(req: pb::VoteRequest) -> Self {
let vote: raft_types::Vote = req.vote.unwrap_or_default().into();
let last_log_id = req.last_log_id.map(|log_id| log_id.into());
raft_types::VoteRequest::new(vote, last_log_id)
}
}
34 changes: 34 additions & 0 deletions src/meta/types/src/proto_ext/vote_response_ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::protobuf as pb;
use crate::raft_types;

impl From<raft_types::VoteResponse> for pb::VoteResponse {
fn from(resp: raft_types::VoteResponse) -> Self {
pb::VoteResponse {
vote: Some(resp.vote.into()),
vote_granted: resp.vote_granted,
last_log_id: resp.last_log_id.map(|log_id| log_id.into()),
}
}
}

impl From<pb::VoteResponse> for raft_types::VoteResponse {
fn from(resp: pb::VoteResponse) -> Self {
let vote: raft_types::Vote = resp.vote.unwrap_or_default().into();
let last_log_id = resp.last_log_id.map(|log_id| log_id.into());
raft_types::VoteResponse::new(vote, last_log_id, resp.vote_granted)
}
}
Loading