Skip to content

Commit

Permalink
mover: list chunks by calling the rawx instead of rdir
Browse files Browse the repository at this point in the history
  • Loading branch information
IrakozeFD committed Aug 20, 2024
1 parent ca6f60f commit c7d5d0f
Show file tree
Hide file tree
Showing 9 changed files with 446 additions and 79 deletions.
75 changes: 74 additions & 1 deletion oio/blob/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
# License along with this library.


import json
from oio.common.green import GreenPile

import random
from email.utils import parsedate
from functools import wraps
from urllib.parse import unquote
from time import mktime
from time import mktime, sleep

from oio.common.kafka import GetTopicMixin, KafkaProducerMixin
from oio.common.logger import get_logger
Expand All @@ -35,6 +36,7 @@
CHUNK_HEADERS,
CHUNK_XATTR_KEYS_OPTIONAL,
FETCHXATTR_HEADER,
HTTP_CONTENT_TYPE_JSON,
REQID_HEADER,
CHECKHASH_HEADER,
)
Expand Down Expand Up @@ -396,6 +398,77 @@ def chunk_link(
raise exc.from_response(resp)
return resp, link

@ensure_request_id
def chunk_list(
self,
volume,
min_to_return=1000,
max_attempts=3,
start_after=None,
shuffle=False,
full_urls=False,
**kwargs
):
"""Fetch the list of chunks belonging to the specified volume.
:param volume: rawx volume to list the chunks
:type volume: str
:param min_to_return: minimum items returned, defaults to 1000
:type min_to_return: int, optional
:param max_attempts: max attemps while retrieving chunks, defaults to 3
:type max_attempts: int, optional
:param start_after: list chunks after this marker, defaults to None
:type start_after: str, optional
"""
req_body = {"min_to_return": min_to_return}
if start_after:
req_body["start_after"] = start_after
url = self._make_list_uri(volume, **kwargs)
resp_body = ""
while True:
for i in range(max_attempts):
try:
headers = {}
headers["Content-Type"] = HTTP_CONTENT_TYPE_JSON
data = json.dumps(req_body, separators=(",", ":"))
kwargs["body"] = data
kwargs["headers"] = headers
resp = self._request("GET", url, **kwargs)
if resp.data:
resp_body = json.loads(resp.data)
break
except exc.OioNetworkException:
# Monotonic backoff
if i < max_attempts - 1:
sleep(i * 1.0)
continue
# Too many attempts
raise
if not resp_body:
break
truncated = resp_body["is_truncated"]
req_body["start_after"] = resp_body["marker"]
chunks = resp_body["chunks"]
if not chunks:
break
if shuffle:
random.shuffle(chunks)
for chunk_data in chunks:
chunk = chunk_data["chunk_id"]
if full_urls:
chunk = "http://%s/%s" % (volume, chunk)
yield chunk

if not truncated:
break

def _make_list_uri(self, volume, **kwargs):
"""Returns the rawx list chunk url"""
url = self.conscience_client.resolve_service_id(
"rawx", volume, check_format=False, **kwargs
)
return url + "/list"

def _request(
self, method, url, connection_timeout=None, read_timeout=None, **kwargs
):
Expand Down
46 changes: 28 additions & 18 deletions oio/cli/admin/xcute/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Copyright (C) 2019-2020 OpenIO SAS, as part of OpenIO SDS
# Copyright (C) 2021 OVH SAS
# Copyright (C) 2021-2024 OVH SAS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
Expand Down Expand Up @@ -27,15 +27,15 @@ def xcute(self):
return self.app.client_manager.xcute_client


class XcuteRdirCommand(XcuteCommand, ShowOne):
class XcuteCommonCommand(XcuteCommand, ShowOne):
"""
Class holding rdir-related parameters.
Class holding xcute command common parameters.
"""

JOB_CLASS = None

def get_parser(self, prog_name):
parser = super(XcuteRdirCommand, self).get_parser(prog_name)
parser = super(XcuteCommonCommand, self).get_parser(prog_name)

parser.add_argument(
"--put-on-hold-if-locked",
Expand All @@ -46,6 +46,30 @@ def get_parser(self, prog_name):
""",
action="store_true",
)
return parser

def get_job_config(self, parsed_args):
raise NotImplementedError()

def take_action(self, parsed_args):
self.logger.debug("take_action(%s)", parsed_args)

job_config = self.get_job_config(parsed_args)
job_info = self.xcute.job_create(
self.JOB_CLASS.JOB_TYPE,
job_config=job_config,
put_on_hold_if_locked=parsed_args.put_on_hold_if_locked,
)
return zip(*sorted(flat_dict_from_dict(parsed_args, job_info).items()))


class XcuteRdirCommand(XcuteCommonCommand):
"""
Class holding rdir-related parameters.
"""

def get_parser(self, prog_name):
parser = super(XcuteRdirCommand, self).get_parser(prog_name)

parser.add_argument(
"--rdir-fetch-limit",
Expand All @@ -63,17 +87,3 @@ def get_parser(self, prog_name):
)

return parser

def get_job_config(self, parsed_args):
raise NotImplementedError()

def take_action(self, parsed_args):
self.logger.debug("take_action(%s)", parsed_args)

job_config = self.get_job_config(parsed_args)
job_info = self.xcute.job_create(
self.JOB_CLASS.JOB_TYPE,
job_config=job_config,
put_on_hold_if_locked=parsed_args.put_on_hold_if_locked,
)
return zip(*sorted(flat_dict_from_dict(parsed_args, job_info).items()))
15 changes: 11 additions & 4 deletions oio/cli/admin/xcute/rawx.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# License along with this library.

from oio.cli.admin.common import SingleServiceCommandMixin
from oio.cli.admin.xcute import XcuteRdirCommand
from oio.cli.admin.xcute import XcuteCommonCommand, XcuteRdirCommand
from oio.common.easy_value import boolean_value
from oio.xcute.jobs.blob_mover import RawxDecommissionJob
from oio.xcute.jobs.blob_rebuilder import RawxRebuildJob
Expand Down Expand Up @@ -107,7 +107,7 @@ def get_job_config(self, parsed_args):
return {"tasks_per_second": parsed_args.chunks_per_second, "params": job_params}


class RawxDecommission(SingleServiceCommandMixin, XcuteRdirCommand):
class RawxDecommission(SingleServiceCommandMixin, XcuteCommonCommand):
"""
Decommission the specified service.
All chunks matching the size constraints
Expand All @@ -134,6 +134,14 @@ def get_parser(self, prog_name):
help="Timeout for rawx operations, in seconds. (default=%f)"
% self.JOB_CLASS.DEFAULT_RAWX_TIMEOUT,
)
parser.add_argument(
"--rawx-list-limit",
type=int,
help=(
"Maximum number of entries returned in each rawx response. (default=%d)"
)
% self.JOB_CLASS.DEFAULT_RAWX_LIST_LIMIT,
)
parser.add_argument(
"--min-chunk-size",
type=int,
Expand Down Expand Up @@ -202,8 +210,7 @@ def get_parser(self, prog_name):
def get_job_config(self, parsed_args):
job_params = {
"service_id": parsed_args.service,
"rdir_fetch_limit": parsed_args.rdir_fetch_limit,
"rdir_timeout": parsed_args.rdir_timeout,
"rawx_list_limit": parsed_args.rawx_list_limit,
"rawx_timeout": parsed_args.rawx_timeout,
"min_chunk_size": parsed_args.min_chunk_size,
"max_chunk_size": parsed_args.max_chunk_size,
Expand Down
77 changes: 38 additions & 39 deletions oio/xcute/jobs/blob_mover.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,13 @@
NotFound,
OrphanChunk,
)
from oio.common.green import time
from oio.common.green import get_watchdog, time
from oio.common.kafka import DEFAULT_REBUILD_TOPIC
from oio.conscience.client import ConscienceClient
from oio.content.factory import ContentFactory
from oio.event.evob import EventTypes
from oio.rdir.client import RdirClient
from oio.xcute.common.job import XcuteTask
from oio.xcute.jobs.common import XcuteRdirJob
from oio.xcute.jobs.common import XcuteUsageTargetJob


class RawxDecommissionTask(XcuteTask):
Expand Down Expand Up @@ -78,12 +77,7 @@ def _generate_fake_excluded_chunks(self, excluded_rawx):
return fake_excluded_chunks

def process(self, task_id, task_payload, reqid=None):
container_id = task_payload["container_id"]
content_id = task_payload["content_id"]
path = task_payload["path"]
version = task_payload["version"]
chunk_id = task_payload["chunk_id"]

chunk_url = "http://{}/{}".format(self.service_id, chunk_id)
try:
meta = self.blob_client.chunk_head(
Expand All @@ -94,14 +88,19 @@ def process(self, task_id, task_payload, reqid=None):
# but the chunk no longer exists in the rawx.
# We ignore it because there is nothing to move.
return {"skipped_chunks_no_longer_exist": 1}
if container_id != meta["container_id"]:
raise ValueError(
"Mismatch container ID: %s != %s", container_id, meta["container_id"]
)
if content_id != meta["content_id"]:
raise ValueError(
"Mismatch content ID: %s != %s", content_id, meta["content_id"]
)
container_id = meta["container_id"]
content_id = meta["content_id"]
path = meta["content_path"]
version = meta["content_version"]

for id_type, payload_key, value in (
("container", "container_id", container_id),
("content", "content_id", content_id),
):
if payload_key in task_payload and value != task_payload[payload_key]:
raise ValueError(
f"Mismatch {id_type} ID: {task_payload[payload_key]} != {value}"
)
chunk_size = int(meta["chunk_size"])

# Maybe skip the chunk because it doesn't match the size constraint
Expand Down Expand Up @@ -159,11 +158,12 @@ def process(self, task_id, task_payload, reqid=None):
return {"moved_chunks": 1, "moved_bytes": chunk_size}


class RawxDecommissionJob(XcuteRdirJob):
class RawxDecommissionJob(XcuteUsageTargetJob):
JOB_TYPE = "rawx-decommission"
TASK_CLASS = RawxDecommissionTask

DEFAULT_RAWX_TIMEOUT = 60.0
DEFAULT_RAWX_LIST_LIMIT = 1000
DEFAULT_MIN_CHUNK_SIZE = 0
DEFAULT_MAX_CHUNK_SIZE = 0
DEFAULT_USAGE_CHECK_INTERVAL = 60.0
Expand All @@ -187,6 +187,10 @@ def sanitize_params(cls, job_params):
job_params.get("rawx_timeout"), cls.DEFAULT_RAWX_TIMEOUT
)

sanitized_job_params["rawx_list_limit"] = int_value(
job_params.get("rawx_list_limit"), cls.DEFAULT_RAWX_LIST_LIMIT
)

sanitized_job_params["min_chunk_size"] = int_value(
job_params.get("min_chunk_size"), cls.DEFAULT_MIN_CHUNK_SIZE
)
Expand Down Expand Up @@ -226,9 +230,9 @@ def sanitize_params(cls, job_params):
def __init__(self, conf, logger=None, **kwargs):
super(RawxDecommissionJob, self).__init__(conf, logger=logger, **kwargs)
self.rdir_client = RdirClient(self.conf, logger=self.logger)
self.conscience_client = ConscienceClient(
self.conf, logger=self.logger, pool_manager=self.rdir_client.pool_manager
)
watchdog = kwargs.get("watchdog", get_watchdog())
self.blob_client = BlobClient(self.conf, logger=self.logger, watchdog=watchdog)
self.conscience_client = self.blob_client.conscience_client
self.must_auto_exclude_rawx = False

def auto_exclude_rawx(self, job_params, services):
Expand Down Expand Up @@ -299,14 +303,10 @@ def get_tasks(self, job_params, marker=None, reqid=None):
return
last_usage_check = now

chunk_info = self.get_chunk_info(job_params, marker=marker, reqid=reqid)
for container_id, chunk_id, descr in chunk_info:
task_id = "|".join((container_id, chunk_id))
chunk_info = self.get_chunk_list(job_params, marker=marker, reqid=reqid)
for chunk_id in chunk_info:
task_id = chunk_id
yield task_id, {
"container_id": container_id,
"content_id": descr["content_id"],
"path": descr["path"],
"version": descr["version"],
"chunk_id": chunk_id,
}

Expand All @@ -326,32 +326,31 @@ def get_total_tasks(self, job_params, marker=None, reqid=None):
return

kept_chunks_ratio = 1 - (usage_target / float(current_usage))
chunk_info = self.get_chunk_info(job_params, marker=marker, reqid=reqid)
chunk_info = self.get_chunk_list(job_params, marker=marker, reqid=reqid)
i = 0
for i, (container_id, chunk_id, _) in enumerate(chunk_info, 1):
for i, chunk_id in enumerate(chunk_info, 1):
if i % 1000 == 0:
yield (
"|".join((container_id, chunk_id)),
chunk_id,
int(math.ceil(1000 * kept_chunks_ratio)),
)

remaining = int(math.ceil(i % 1000 * kept_chunks_ratio))
if remaining > 0:
yield ("|".join((container_id, chunk_id)), remaining)
yield (chunk_id, remaining)

def get_chunk_info(self, job_params, marker=None, reqid=None):
def get_chunk_list(self, job_params, marker=None, reqid=None):
"""Request rawx to gather list of chunks (chunk id)"""
service_id = job_params["service_id"]
rdir_fetch_limit = job_params["rdir_fetch_limit"]
rdir_timeout = job_params["rdir_timeout"]

chunk_info = self.rdir_client.chunk_fetch(
rawx_list_limit = job_params["rawx_list_limit"]
rawx_timeout = job_params["rawx_timeout"]
chunk_info = self.blob_client.chunk_list(
service_id,
timeout=rdir_timeout,
limit=rdir_fetch_limit,
start_after=marker,
min_to_return=rawx_list_limit,
reqid=reqid,
timeout=rawx_timeout,
)

return chunk_info

def set_topic_suffix(self, job_params, reqid=None):
Expand Down
Loading

0 comments on commit c7d5d0f

Please sign in to comment.