Skip to content

Commit

Permalink
Merge "CVE-2024-32498: Check for external qcow2 data file"
Browse files Browse the repository at this point in the history
  • Loading branch information
Zuul authored and openstack-gerrit committed Jul 3, 2024
2 parents 0f649dc + d6a1869 commit 78f85c1
Show file tree
Hide file tree
Showing 12 changed files with 2,009 additions and 58 deletions.
938 changes: 938 additions & 0 deletions cinder/image/format_inspector.py

Large diffs are not rendered by default.

86 changes: 81 additions & 5 deletions cinder/image/image_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
from cinder.i18n import _
from cinder.image import accelerator
from cinder.image import glance
import cinder.privsep.format_inspector
from cinder import utils
from cinder.volume import throttling
from cinder.volume import volume_utils
Expand Down Expand Up @@ -158,11 +159,26 @@ def from_qemu_img_disk_format(disk_format: str) -> str:
return QEMU_IMG_FORMAT_MAP_INV.get(disk_format, disk_format)


def qemu_img_info(path: str,
run_as_root: bool = True,
force_share: bool = False) -> imageutils.QemuImgInfo:
def qemu_img_info(
path: str,
run_as_root: bool = True,
force_share: bool = False,
allow_qcow2_backing_file: bool = False) -> imageutils.QemuImgInfo:
"""Return an object containing the parsed output from qemu-img info."""
cmd = ['env', 'LC_ALL=C', 'qemu-img', 'info', '--output=json']

format_name = cinder.privsep.format_inspector.get_format_if_safe(
path=path,
allow_qcow2_backing_file=allow_qcow2_backing_file)
if format_name is None:
LOG.warning('Image/Volume %s failed safety check', path)
# NOTE(danms): This is the same exception as would be raised
# by qemu_img_info() if the disk format was unreadable or
# otherwise unsuitable.
raise exception.Invalid(
reason=_('Image/Volume failed safety check'))

cmd = ['env', 'LC_ALL=C', 'qemu-img', 'info',
'-f', format_name, '--output=json']
if force_share:
cmd.append('--force-share')
cmd.append(path)
Expand All @@ -173,8 +189,32 @@ def qemu_img_info(path: str,
prlimit=QEMU_IMG_LIMITS)
info = imageutils.QemuImgInfo(out, format='json')

# FIXME: figure out a more elegant way to do this
if info.file_format == 'raw':
# The format_inspector will detect a luks image as 'raw', and then when
# we call qemu-img info -f raw above, we don't get any of the luks
# format-specific info (some of which is used in the create_volume
# flow). So we need to check if this is really a luks container.
# (We didn't have to do this in the past because we called
# qemu-img info without -f.)
cmd = ['env', 'LC_ALL=C', 'qemu-img', 'info',
'-f', 'luks', '--output=json']
if force_share:
cmd.append('--force-share')
cmd.append(path)
if os.name == 'nt':
cmd = cmd[2:]
try:
out, _err = utils.execute(*cmd, run_as_root=run_as_root,
prlimit=QEMU_IMG_LIMITS)
info = imageutils.QemuImgInfo(out, format='json')
except processutils.ProcessExecutionError:
# we'll just use the info object we already got earlier
pass

# From Cinder's point of view, any 'luks' formatted images
# should be treated as 'raw'.
# should be treated as 'raw'. (This changes the file_format, but
# not any of the format-specific information.)
if info.file_format == 'luks':
info.file_format = 'raw'

Expand Down Expand Up @@ -681,6 +721,35 @@ def get_qemu_data(image_id: str,
return data


def check_qcow2_image(image_id: str, data: imageutils.QemuImgInfo) -> None:
"""Check some rules about qcow2 images.
Does not check for a backing_file, because cinder has some legitimate
use cases for qcow2 backing files.
Makes sure the image:
- does not have a data_file
:param image_id: the image id
:param data: an imageutils.QemuImgInfo object
:raises ImageUnacceptable: when the image fails the check
"""
try:
data_file = data.format_specific['data'].get('data-file')
except (KeyError, TypeError):
LOG.debug('Unexpected response from qemu-img info when processing '
'image %s: missing format-specific info for a qcow2 image',
image_id)
msg = _('Cannot determine format-specific information')
raise exception.ImageUnacceptable(image_id=image_id, reason=msg)
if data_file:
LOG.warning("Refusing to process qcow2 file with data-file '%s'",
data_file)
msg = _('A qcow2 format image is not allowed to have a data file')
raise exception.ImageUnacceptable(image_id=image_id, reason=msg)


def check_vmdk_image(image_id: str, data: imageutils.QemuImgInfo) -> None:
"""Check some rules about VMDK images.
Expand Down Expand Up @@ -771,6 +840,8 @@ def check_image_format(source: str,

if data.file_format == 'vmdk':
check_vmdk_image(image_id, data)
if data.file_format == 'qcow2':
check_qcow2_image(image_id, data)


def fetch_verify_image(context: context.RequestContext,
Expand Down Expand Up @@ -813,6 +884,11 @@ def fetch_verify_image(context: context.RequestContext,
if fmt == 'vmdk':
check_vmdk_image(image_id, data)

# Bug #2059809: a qcow2 can have a data file that's similar
# to a backing file and is also unacceptable
if fmt == 'qcow2':
check_qcow2_image(image_id, data)


def fetch_to_vhd(context: context.RequestContext,
image_service: glance.GlanceImageService,
Expand Down
38 changes: 38 additions & 0 deletions cinder/privsep/format_inspector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Copyright 2024 Red Hat, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

"""
Helpers for the image format_inspector.
"""


from cinder.image import format_inspector
import cinder.privsep


@cinder.privsep.sys_admin_pctxt.entrypoint
def get_format_if_safe(path, allow_qcow2_backing_file):
"""Returns a str format name if the format is safe, otherwise None"""
return _get_format_if_safe(path, allow_qcow2_backing_file)


def _get_format_if_safe(path, allow_qcow2_backing_file):
"""Returns a str format name if the format is safe, otherwise None"""
inspector = format_inspector.detect_file_format(path)
format_name = str(inspector)
safe = inspector.safety_check()
if not safe and format_name == 'qcow2' and allow_qcow2_backing_file:
safe = inspector.safety_check_allow_backing_file()
if safe:
return format_name
Loading

0 comments on commit 78f85c1

Please sign in to comment.