Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve Blosc compression operator #2592

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions source/adios2/common/ADIOSTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ constexpr char compressor[] = "compressor";
constexpr char clevel[] = "clevel";
constexpr char doshuffle[] = "doshuffle";
constexpr char blocksize[] = "blocksize";
constexpr char threshold[] = "threshold";
}

namespace value
Expand Down
236 changes: 217 additions & 19 deletions source/adios2/operator/compress/CompressBlosc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*
* Created on: Jun 18, 2019
* Author: William F Godoy godoywf@ornl.gov
* Rene Widera r.widera@hzdr.de
*/

#include "CompressBlosc.h"
Expand All @@ -16,6 +17,10 @@ extern "C" {

#include "adios2/helper/adiosFunctions.h"

#include <algorithm>
#include <cassert>
#include <cstring>

namespace adios2
{
namespace core
Expand All @@ -36,6 +41,25 @@ CompressBlosc::CompressBlosc(const Params &parameters)
{
}

size_t CompressBlosc::BufferMaxSize(const size_t sizeIn) const
{
const size_t maxInputPerChunk = BLOSC_MAX_BUFFERSIZE;
const size_t numFullChunks = sizeIn / maxInputPerChunk;
const size_t sizeLastChunk = sizeIn % maxInputPerChunk;

const size_t maxOutputPerChunk = maxInputPerChunk + BLOSC_MAX_OVERHEAD;
const size_t maxOutputLastChunk = sizeLastChunk + BLOSC_MAX_OVERHEAD;

/* DataHeader is used to detect of old format which can only handle
* BLOSC_MAX_BUFFERSIZE (<2GiB) or the new adios2 chunked blosc format is
* used.
*/
const size_t maxRquiredDataMem = maxOutputPerChunk * numFullChunks +
maxOutputLastChunk + sizeof(DataHeader);

return maxRquiredDataMem;
}

size_t CompressBlosc::Compress(const void *dataIn, const Dims &dimensions,
const size_t elementSize, DataType type,
void *bufferOut, const Params &parameters,
Expand All @@ -44,6 +68,10 @@ size_t CompressBlosc::Compress(const void *dataIn, const Dims &dimensions,
const size_t sizeIn =
static_cast<size_t>(helper::GetTotalSize(dimensions) * elementSize);

bool useMemcpy = false;
/* input size under this bound would not compressed */
size_t thresholdSize = 128;

blosc_init();

size_t threads = 1; // defaults
Expand Down Expand Up @@ -95,48 +123,218 @@ size_t CompressBlosc::Compress(const void *dataIn, const Dims &dimensions,
{
throw std::invalid_argument(
"ERROR: invalid compressor " + compressor +
" valid values: blosclz (default), lz4, lz4hc, snappy, "
"zlib, or ztsd, in call to ADIOS2 Blosc Compression\n");
" valid values: blosclz (default), lz4, lz4hc, "
"snappy, "
"zlib, or zstd, in call to ADIOS2 Blosc Compression\n");
}
}
else if (key == "blocksize")
{
blockSize = static_cast<size_t>(helper::StringTo<uint64_t>(
value, "when setting Blosc blocksize parameter\n"));
}
else if (key == "threshold")
{
thresholdSize = static_cast<size_t>(helper::StringTo<uint64_t>(
value, "when setting Blosc threshold parameter\n"));
if (thresholdSize < 128u)
thresholdSize = 128u;
}
}

const int result = blosc_set_compressor(compressor.c_str());
if (result == -1)
// write header to detect new compression format (set first 8 byte to zero)
DataHeader *headerPtr = reinterpret_cast<DataHeader *>(bufferOut);

// set default header
*headerPtr = DataHeader{};

const uint8_t *inputDataBuff = reinterpret_cast<const uint8_t *>(dataIn);

int32_t typesize = elementSize;
if (typesize > BLOSC_MAX_TYPESIZE)
typesize = 1;

uint8_t *outputBuff = reinterpret_cast<uint8_t *>(bufferOut);
outputBuff += sizeof(DataHeader);

size_t currentOutputSize = 0u;
size_t inputOffset = 0u;

if (sizeIn < thresholdSize)
{
throw std::invalid_argument("ERROR: invalid compressor " + compressor +
" check if supported by blosc build, in "
"call to ADIOS2 Blosc Compression\n");
/* disable compression */
useMemcpy = true;
}

blosc_set_nthreads(threads);
blosc_set_blocksize(blockSize);
if (!useMemcpy)
{
const int result = blosc_set_compressor(compressor.c_str());
if (result == -1)
{
throw std::invalid_argument(
"ERROR: invalid compressor " + compressor +
" check if supported by blosc build, in "
"call to ADIOS2 Blosc Compression\n");
}
blosc_set_nthreads(threads);
blosc_set_blocksize(blockSize);

uint32_t chunk = 0;
for (; inputOffset < sizeIn; ++chunk)
{
size_t inputChunkSize =
std::min(sizeIn - inputOffset,
static_cast<size_t>(BLOSC_MAX_BUFFERSIZE));
bloscSize_t maxIntputSize =
static_cast<bloscSize_t>(inputChunkSize);

bloscSize_t maxChunkSize = maxIntputSize + BLOSC_MAX_OVERHEAD;

const uint8_t *in_ptr = inputDataBuff + inputOffset;
uint8_t *out_ptr = outputBuff + currentOutputSize;

const int compressedSize =
blosc_compress(compressionLevel, doShuffle, elementSize, sizeIn, dataIn,
bufferOut, sizeIn);
bloscSize_t compressedChunkSize =
blosc_compress(compressionLevel, doShuffle, typesize,
maxIntputSize, in_ptr, out_ptr, maxChunkSize);

if (compressedChunkSize > 0)
currentOutputSize += static_cast<size_t>(compressedChunkSize);
else
{
// something went wrong with the compression switch to memcopy
useMemcpy = true;
break;
}
/* add size to written output data */
inputOffset += static_cast<size_t>(maxIntputSize);
}

if (!useMemcpy)
{
// validate that all bytes are compressed
assert(inputOffset == sizeIn);
headerPtr->SetNumChunks(chunk);
}
}

if (compressedSize <= 0)
if (useMemcpy)
{
throw std::invalid_argument(
"ERROR: from blosc_compress return size: " +
std::to_string(compressedSize) +
", check operator parameters, "
" compression failed in ADIOS2 Blosc Compression\n");
std::memcpy(outputBuff, inputDataBuff, sizeIn);
currentOutputSize = sizeIn;
headerPtr->SetNumChunks(0u);
}

blosc_destroy();
return static_cast<size_t>(compressedSize);
return currentOutputSize + sizeof(DataHeader);
}

size_t CompressBlosc::Decompress(const void *bufferIn, const size_t sizeIn,
void *dataOut, const size_t sizeOut,
Params &info) const
{
assert(sizeIn >= sizeof(DataHeader));
const bool isChunked =
reinterpret_cast<const DataHeader *>(bufferIn)->IsChunked();

size_t decompressedSize = 0u;
if (isChunked)
decompressedSize =
DecompressChunkedFormat(bufferIn, sizeIn, dataOut, sizeOut, info);
else
decompressedSize =
DecompressOldFormat(bufferIn, sizeIn, dataOut, sizeOut, info);

return decompressedSize;
}

size_t CompressBlosc::DecompressChunkedFormat(const void *bufferIn,
const size_t sizeIn,
void *dataOut,
const size_t sizeOut,
Params &info) const
{
const DataHeader *dataPtr = reinterpret_cast<const DataHeader *>(bufferIn);
uint32_t num_chunks = dataPtr->GetNumChunks();
size_t inputDataSize = sizeIn - sizeof(DataHeader);

bool isCompressed = true;
if (num_chunks == 0)
isCompressed = false;

size_t inputOffset = 0u;
size_t currentOutputSize = 0u;

const uint8_t *inputDataBuff =
reinterpret_cast<const uint8_t *>(bufferIn) + sizeof(DataHeader);

size_t uncompressedSize = sizeOut;

if (isCompressed)
{
blosc_init();
uint8_t *outputBuff = reinterpret_cast<uint8_t *>(dataOut);

while (inputOffset < inputDataSize)
{
/* move over the size of the compressed data */
const uint8_t *in_ptr = inputDataBuff + inputOffset;

/** read the size of the compress block from the blosc meta data
*
* blosc meta data format (all little endian):
* - 1 byte blosc format version
* - 1 byte blosclz format version
* - 1 byte flags
* - 1 byte typesize
* - 4 byte uncompressed data size
* - 4 byte block size
* - 4 byte compressed data size
*
* we need only the compressed size ( source address + 12 byte)
*/
bloscSize_t max_inputDataSize =
*reinterpret_cast<const bloscSize_t *>(in_ptr + 12u);

uint8_t *out_ptr = outputBuff + currentOutputSize;

size_t outputChunkSize =
std::min(uncompressedSize - currentOutputSize,
static_cast<size_t>(BLOSC_MAX_BUFFERSIZE));
bloscSize_t max_output_size =
static_cast<bloscSize_t>(outputChunkSize);

bloscSize_t decompressdSize =
blosc_decompress(in_ptr, out_ptr, max_output_size);

if (decompressdSize > 0)
currentOutputSize += static_cast<size_t>(decompressdSize);
else
{
throw std::runtime_error(
"ERROR: ADIOS2 Blosc Decompress failed. Decompressed chunk "
"results in zero decompressed bytes.\n");
}
inputOffset += static_cast<size_t>(max_inputDataSize);
}
blosc_destroy();
}
else
{
std::memcpy(dataOut, inputDataBuff, inputDataSize);
currentOutputSize = inputDataSize;
inputOffset += inputDataSize;
}

assert(currentOutputSize == uncompressedSize);
assert(inputOffset == inputDataSize);

return currentOutputSize;
}

size_t CompressBlosc::DecompressOldFormat(const void *bufferIn,
const size_t sizeIn, void *dataOut,
const size_t sizeOut,
Params &info) const
{
blosc_init();
const int decompressedSize = blosc_decompress(bufferIn, dataOut, sizeOut);
Expand Down
50 changes: 49 additions & 1 deletion source/adios2/operator/compress/CompressBlosc.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*
* Created on: Jun 18, 2019
* Author: William F Godoy godoywf@ornl.gov
* Rene Widera r.widera@hzdr.de
*/

#ifndef ADIOS2_OPERATOR_COMPRESS_COMPRESSBLOSC_H_
Expand Down Expand Up @@ -34,12 +35,15 @@ class CompressBlosc : public Operator

~CompressBlosc() = default;

size_t BufferMaxSize(const size_t sizeIn) const final;

/**
* Compression signature for legacy libraries that use void*
* @param dataIn
* @param dimensions
* @param type
* @param bufferOut
* @param bufferOut format will be: 'DataHeader ; (BloscCompressedChunk |
* UncompressedData), [ BloscCompressedChunk, ...]'
* @param parameters
* @return size of compressed buffer in bytes
*/
Expand All @@ -60,6 +64,50 @@ class CompressBlosc : public Operator
const size_t sizeOut, Params &info) const final;

private:
using bloscSize_t = int32_t;

/** Decompress chunked data */
size_t DecompressChunkedFormat(const void *bufferIn, const size_t sizeIn,
void *dataOut, const size_t sizeOut,
Params &info) const;

/** Decompress data written before ADIOS2 supported large variables larger
* 2GiB. */
size_t DecompressOldFormat(const void *bufferIn, const size_t sizeIn,
void *dataOut, const size_t sizeOut,
Params &info) const;

class __attribute__((packed)) DataHeader
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pnorbert I fixed hopefully the ubsan issue in the CI by disallowing the compiler to add padding.
The disadvantage is that there is no standard attribute in C++ :-(
Is this way ok? If not, do you have a suggestion on how I can fix this issue instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it helps, there is an internal helper function to add padding in a portable way.

Copy link
Contributor Author

@psychocoderHPC psychocoderHPC Jan 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if this helps. The problem is that maybe the ptr for the output is not aligned during the compression. If I use the helper and shift my data a few bytes and later during decompress I get the data within an array with a different base address. If so it would be not possible to find the data because the required padding is different.

If I understand the ubsan error for the old code correctly and what I found in the net the packed attribute is required to guarantee that the struct has the same layout even if you compile blosc on a different system. This attributes forbids the compiler to transform the struct which is IMO correct.

I can also remove the header class und work with two pure integers but I found it better readable to use the header class because it allows the code to shift with sizeof(Header) instead of using a magic number 8byte.

I am not sure which solution guarantees portability.

Copy link
Contributor Author

@psychocoderHPC psychocoderHPC Jan 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it is enough and better if I align the struct to 4 byte.
Is ADIOS guaranteeing that my pointers given to compress operators are 4 byte aligned?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that maybe the ptr for the output is not aligned during the compression.

The function I shared can verify that returning the required padding in bytes (if 0 no padding is required).

Is ADIOS guaranteeing that my pointers given to compress operators are 4 byte aligned?

That's a good question. It depends on the serialization buffer consuming this interface. The BP buffer is char so it shouldn't be a problem. I'd check with @pnorbert for other existing strategies.

I am not sure which solution guarantees portability.

Yup, that's the challenge.

Copy link
Contributor

@ax3l ax3l Jun 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like Blosc is not covered by CI and thus breaking on MSVC: #2745

I found this on SO, we could add an extra handling for MSVC and assume other compilers support the currently GCC-like syntax for now: https://stackoverflow.com/a/3312896/2719194

{
/** compatible to the first 4 byte of blosc header
*
* blosc meta data format (all little endian):
* - 1 byte blosc format version
* - 1 byte blosclz format version
* - 1 byte flags
* - 1 byte typesize
*
* If zero we writing the new adios blosc format which can handle more
* than 2GiB data chunks.
*/
uint32_t format = 0u;
/** number of blosc chunks within the data blob
*
* If zero the data is not compressed and must be decompressed by using
* 'memcpy'
*/
uint32_t numberOfChunks = 0u;

public:
void SetNumChunks(const uint32_t numChunks)
{
numberOfChunks = numChunks;
}
uint32_t GetNumChunks() const { return numberOfChunks; }

bool IsChunked() const { return format == 0; }
};

static const std::map<std::string, uint32_t> m_Shuffles;
static const std::set<std::string> m_Compressors;
};
Expand Down
Loading