Skip to content

Commit

Permalink
grpc: allow get_subsystem concurrent to other grpc requests
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Indenbaum <aindenba@redhat.com>
  • Loading branch information
Alexander Indenbaum committed Aug 6, 2024
1 parent 9ff0b03 commit ecee380
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 12 deletions.
4 changes: 2 additions & 2 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ SPDK_CENTOS_REPO_VER="9.0-21.el9"

# Ceph Cluster
CEPH_CLUSTER_VERSION="${CEPH_VERSION}"
CEPH_BRANCH=wip-baum-20240725-00-centos9-only
CEPH_SHA=0ec90b1e61a7489b13d6d8432156a0417f35db7f
CEPH_BRANCH=wip-baum-main-202258fa-20240805-nvmeof-enabled
CEPH_SHA=2ef08fc34dbdd6cc5a59f45582bfe959b1940161

CEPH_DEVEL_MGR_PATH=../ceph

Expand Down
30 changes: 21 additions & 9 deletions control/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import os
import errno
import contextlib
import threading
import time
from typing import Callable
from collections import defaultdict
Expand Down Expand Up @@ -67,9 +68,14 @@ class GatewayService(pb2_grpc.GatewayServicer):
gateway_name: Gateway identifier
gateway_state: Methods for target state persistence
spdk_rpc_client: Client of SPDK RPC server
spdk_rpc_subsystems_client: Client of SPDK RPC server for get_subsystems
spdk_rpc_subsystems_lock: Mutex to hold while using get subsystems SPDK client
shared_state_lock: guard mutex for subsystem_nsid_bdev and cluster_nonce
subsystem_nsid_bdev: map of nsid to bdev
cluster_nonce: cluster context nonce map
"""

def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, rpc_lock, omap_lock: OmapLock, group_id: int, spdk_rpc_client, ceph_utils: CephUtils) -> None:
def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, rpc_lock, omap_lock: OmapLock, group_id: int, spdk_rpc_client, spdk_rpc_subsystems_client, ceph_utils: CephUtils) -> None:
"""Constructor"""
self.gw_logger_object = GatewayLogger(config)
self.logger = self.gw_logger_object.logger
Expand Down Expand Up @@ -121,6 +127,9 @@ def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, rp
self.omap_lock = omap_lock
self.group_id = group_id
self.spdk_rpc_client = spdk_rpc_client
self.spdk_rpc_subsystems_client = spdk_rpc_subsystems_client
self.spdk_rpc_subsystems_lock = threading.Lock()
self.shared_state_lock = threading.Lock()
self.gateway_name = self.config.get("gateway", "name")
if not self.gateway_name:
self.gateway_name = socket.gethostname()
Expand Down Expand Up @@ -245,8 +254,9 @@ def _alloc_cluster(self, anagrp: int) -> str:
user = self.rados_id,
core_mask = self.librbd_core_mask,
)
self.logger.info(f"Allocated cluster {name=} {nonce=} {anagrp=}")
self.cluster_nonce[name] = nonce
with self.shared_state_lock:
self.logger.info(f"Allocated cluster {name=} {nonce=} {anagrp=}")
self.cluster_nonce[name] = nonce
return name

def _grpc_function_with_lock(self, func, request, context):
Expand Down Expand Up @@ -788,8 +798,9 @@ def create_namespace(self, subsystem_nqn, bdev_name, nsid, anagrpid, uuid, conte
anagrpid=anagrpid,
uuid=uuid,
)
self.subsystem_nsid_bdev[subsystem_nqn][nsid] = bdev_name
self.logger.debug(f"subsystem_add_ns: {nsid}")
with self.shared_state_lock:
self.subsystem_nsid_bdev[subsystem_nqn][nsid] = bdev_name
self.logger.debug(f"subsystem_add_ns: {nsid}")
except Exception as ex:
self.logger.exception(add_namespace_error_prefix)
errmsg = f"{add_namespace_error_prefix}:\n{ex}"
Expand Down Expand Up @@ -2440,7 +2451,7 @@ def get_subsystems_safe(self, request, context):
self.logger.debug(f"Received request to get subsystems, context: {context}{peer_msg}")
subsystems = []
try:
ret = rpc_nvmf.nvmf_get_subsystems(self.spdk_rpc_client)
ret = rpc_nvmf.nvmf_get_subsystems(self.spdk_rpc_subsystems_client)
except Exception as ex:
self.logger.exception(f"get_subsystems failed")
context.set_code(grpc.StatusCode.INTERNAL)
Expand All @@ -2454,8 +2465,9 @@ def get_subsystems_safe(self, request, context):
for n in s[ns_key]:
nqn = s["nqn"]
nsid = n["nsid"]
bdev = self.subsystem_nsid_bdev[nqn][nsid]
nonce = self.cluster_nonce[self.bdev_cluster[bdev]]
with self.shared_state_lock:
bdev = self.subsystem_nsid_bdev[nqn][nsid]
nonce = self.cluster_nonce[self.bdev_cluster[bdev]]
n["nonce"] = nonce
# Parse the JSON dictionary into the protobuf message
subsystem = pb2.subsystem()
Expand All @@ -2468,7 +2480,7 @@ def get_subsystems_safe(self, request, context):
return pb2.subsystems_info(subsystems=subsystems)

def get_subsystems(self, request, context):
with self.rpc_lock:
with self.spdk_rpc_subsystems_lock:
return self.get_subsystems_safe(request, context)

def list_subsystems(self, request, context=None):
Expand Down
10 changes: 9 additions & 1 deletion control/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class GatewayServer:
server: gRPC server instance to receive gateway client requests
spdk_rpc_client: Client of SPDK RPC server
spdk_rpc_ping_client: Ping client of SPDK RPC server
spdk_rpc_subsystems_client: subsystems client of SPDK RPC server
spdk_process: Subprocess running SPDK NVMEoF target application
discovery_pid: Subprocess running Ceph nvmeof discovery service
"""
Expand Down Expand Up @@ -171,7 +172,7 @@ def serve(self):
# Register service implementation with server
gateway_state = GatewayStateHandler(self.config, local_state, omap_state, self.gateway_rpc_caller, f"gateway-{self.name}")
omap_lock = OmapLock(omap_state, gateway_state, self.rpc_lock)
self.gateway_rpc = GatewayService(self.config, gateway_state, self.rpc_lock, omap_lock, self.group_id, self.spdk_rpc_client, self.ceph_utils)
self.gateway_rpc = GatewayService(self.config, gateway_state, self.rpc_lock, omap_lock, self.group_id, self.spdk_rpc_client, self.spdk_rpc_subsystems_client, self.ceph_utils)
self.server = self._grpc_server(self._gateway_address())
pb2_grpc.add_GatewayServicer_to_server(self.gateway_rpc, self.server)

Expand Down Expand Up @@ -400,6 +401,13 @@ def _start_spdk(self, omap_state):
log_level=protocol_log_level,
conn_retries=conn_retries,
)
self.spdk_rpc_subsystems_client = rpc_client.JSONRPCClient(
self.spdk_rpc_socket_path,
None,
timeout,
log_level=protocol_log_level,
conn_retries=conn_retries,
)
except Exception:
self.logger.exception(f"Unable to initialize SPDK")
raise
Expand Down

0 comments on commit ecee380

Please sign in to comment.