Skip to content

Commit

Permalink
[NPU]Add ZeRO-Infinity feature for NPU (#4809)
Browse files Browse the repository at this point in the history
Add ZeRO-Infinity feature for NPU devices. 
I add a new `async_io.py` in `op_builder/npu` and compilation
preprocessing judgment in `deepspeed_aio_thread.cpp` specifically for
NPU, which will be isolated from other devices such as the GPU and will
not affect each other.
See what we have already done in
#4567 .

---------

Co-authored-by: Olatunji Ruwase <olruwase@microsoft.com>
  • Loading branch information
misstek and tjruwase authored Jan 5, 2024
1 parent af03383 commit b596963
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 2 deletions.
20 changes: 18 additions & 2 deletions csrc/aio/py_lib/deepspeed_aio_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ Functionality for swapping optimizer tensors to/from (NVMe) storage devices.

#include "deepspeed_aio_thread.h"

#if defined(__ENABLE_CANN__)
#include "torch_npu/csrc/framework/utils/OpAdapter.h"
#include "torch_npu/csrc/framework/utils/UtilForOpAdapter.h"
#endif

using namespace std;

io_op_desc_t::io_op_desc_t(const bool read_op,
Expand All @@ -24,8 +29,13 @@ io_op_desc_t::io_op_desc_t(const bool read_op,
_num_bytes(num_bytes),
_validate(validate)
{
_cpu_buffer = (_buffer.is_cuda() || _buffer.is_xpu()) ? _buffer.to(torch::kCPU).pin_memory()
: _buffer;
_cpu_buffer = (_buffer.is_cuda() || _buffer.is_xpu()
#if defined(__ENABLE_CANN__)
|| torch_npu::utils::is_npu(_buffer)
#endif
)
? _buffer.to(torch::kCPU).pin_memory()
: _buffer;
_contiguous_buffer = _cpu_buffer.contiguous();
}

Expand All @@ -35,6 +45,12 @@ void io_op_desc_t::fini()
{
if (_read_op && _buffer.is_cuda()) { _buffer.copy_(_cpu_buffer.to(torch::kCUDA)); }
if (_read_op && _buffer.is_xpu()) { _buffer.copy_(_cpu_buffer.to(torch::kXPU)); }
#if defined(__ENABLE_CANN__)
if (_read_op && torch_npu::utils::is_npu(_buffer)) {
auto device = at::Device("npu:0");
_buffer.copy_(_cpu_buffer.to(device));
}
#endif
}

deepspeed_aio_thread_t::deepspeed_aio_thread_t(const int tid, deepspeed_aio_config_t& aio_config)
Expand Down
1 change: 1 addition & 0 deletions op_builder/npu/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
'''Copyright The Microsoft DeepSpeed Team'''

from .fused_adam import FusedAdamBuilder
from .async_io import AsyncIOBuilder
from .no_impl import NotImplementedBuilder
from .cpu_adam import CPUAdamBuilder
from .cpu_adagrad import CPUAdagradBuilder
Expand Down
103 changes: 103 additions & 0 deletions op_builder/npu/async_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0

# DeepSpeed Team

import distutils.spawn
import subprocess

from .builder import NPUOpBuilder


class AsyncIOBuilder(NPUOpBuilder):
BUILD_VAR = "DS_BUILD_AIO"
NAME = "async_io"

def __init__(self):
super().__init__(name=self.NAME)

def absolute_name(self):
return f'deepspeed.ops.aio.{self.NAME}_op'

def sources(self):
return [
'csrc/aio/py_lib/deepspeed_py_copy.cpp', 'csrc/aio/py_lib/py_ds_aio.cpp',
'csrc/aio/py_lib/deepspeed_py_aio.cpp', 'csrc/aio/py_lib/deepspeed_py_aio_handle.cpp',
'csrc/aio/py_lib/deepspeed_aio_thread.cpp', 'csrc/aio/common/deepspeed_aio_utils.cpp',
'csrc/aio/common/deepspeed_aio_common.cpp', 'csrc/aio/common/deepspeed_aio_types.cpp',
'csrc/aio/py_lib/deepspeed_pin_tensor.cpp'
]

def include_paths(self):
args = super().include_paths()
args += ['csrc/aio/py_lib', 'csrc/aio/common']
return args

def cxx_args(self):
args = super().cxx_args()
# -O0 for improved debugging, since performance is bound by I/O
CPU_ARCH = self.cpu_arch()
SIMD_WIDTH = self.simd_width()
import torch # Keep this import here to avoid errors when building DeepSpeed wheel without torch installed
TORCH_MAJOR, TORCH_MINOR = map(int, torch.__version__.split('.')[0:2])
if TORCH_MAJOR >= 2 and TORCH_MINOR >= 1:
CPP_STD = '-std=c++17'
else:
CPP_STD = '-std=c++14'
return args + [
'-g',
'-Wall',
'-O0',
CPP_STD,
'-shared',
'-fPIC',
'-Wno-reorder',
CPU_ARCH,
'-fopenmp',
SIMD_WIDTH,
'-laio',
]

def extra_ldflags(self):
args = super().extra_ldflags()
return args + ['-laio']

def check_for_libaio_pkg(self):
libs = dict(
dpkg=["-l", "libaio-dev", "apt"],
pacman=["-Q", "libaio", "pacman"],
rpm=["-q", "libaio-devel", "yum"],
)

found = False
for pkgmgr, data in libs.items():
flag, lib, tool = data
path = distutils.spawn.find_executable(pkgmgr)
if path is not None:
cmd = f"{pkgmgr} {flag} {lib}"
result = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
if result.wait() == 0:
found = True
else:
self.warning(f"{self.NAME}: please install the {lib} package with {tool}")
break
return found

def is_compatible(self, verbose=True):
# Check for the existence of libaio by using distutils
# to compile and link a test program that calls io_submit,
# which is a function provided by libaio that is used in the async_io op.
# If needed, one can define -I and -L entries in CFLAGS and LDFLAGS
# respectively to specify the directories for libaio.h and libaio.so.
aio_compatible = self.has_function('io_pgetevents', ('aio', ))
if verbose and not aio_compatible:
self.warning(f"{self.NAME} requires the dev libaio .so object and headers but these were not found.")

# Check for the libaio package via known package managers
# to print suggestions on which package to install.
self.check_for_libaio_pkg()

self.warning(
"If libaio is already installed (perhaps from source), try setting the CFLAGS and LDFLAGS environment variables to where it can be found."
)
return super().is_compatible(verbose) and aio_compatible

0 comments on commit b596963

Please sign in to comment.