Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make zfp buffer self-contained and backward-compatible #2885

Merged
merged 4 commits into from
Sep 27, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
make zfp buffer self-contained and backward-compatible
  • Loading branch information
JasonRuonanWang committed Sep 27, 2021
commit 80abbc913efc6aa04d14096cdd8c5d740d74a3af
42 changes: 42 additions & 0 deletions source/adios2/core/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
/// \cond EXCLUDE_FROM_DOXYGEN
#include <cstring>
#include <functional>
#include <iostream>
#include <string>
#include <vector>
/// \endcond
Expand Down Expand Up @@ -125,6 +126,47 @@ class Operator
return ret;
}

template <typename U>
void PutParameters(char *buffer, U &pos, const Params &parameters)
{
uint8_t size = static_cast<uint8_t>(parameters.size());
PutParameter(buffer, pos, size);
for (const auto &p : parameters)
{
size = static_cast<uint8_t>(p.first.size());
PutParameter(buffer, pos, size);

std::memcpy(buffer + pos, p.first.data(), size);
pos += size;

size = static_cast<uint8_t>(p.second.size());
PutParameter(buffer, pos, size);

std::memcpy(buffer + pos, p.second.data(), size);
pos += size;
}
}

template <typename U>
Params GetParameters(const char *buffer, U &pos)
{
Params ret;
uint8_t params = GetParameter<uint8_t>(buffer, pos);
for (uint8_t i = 0; i < params; ++i)
{
uint8_t size = GetParameter<uint8_t>(buffer, pos);
std::string key =
std::string(reinterpret_cast<const char *>(buffer + pos), size);
pos += size;
size = GetParameter<uint8_t>(buffer, pos);
std::string value =
std::string(reinterpret_cast<const char *>(buffer + pos), size);
pos += size;
ret[key] = value;
}
return ret;
}

private:
void CheckCallbackType(const std::string type) const;
};
Expand Down
4 changes: 2 additions & 2 deletions source/adios2/operator/compress/CompressBZIP2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ size_t CompressBZIP2::DecompressV1(const char *bufferIn, const size_t sizeIn,

size_t bufferInOffset = 4; // skip the first four bytes

size_t sizeOut = GetParameter<size_t>(bufferIn, bufferInOffset);
size_t batches = GetParameter<size_t>(bufferIn, bufferInOffset);
size_t sizeOut = GetParameter<size_t, size_t>(bufferIn, bufferInOffset);
size_t batches = GetParameter<size_t, size_t>(bufferIn, bufferInOffset);

int small = 0;
int verbosity = 0;
Expand Down
2 changes: 1 addition & 1 deletion source/adios2/operator/compress/CompressBlosc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ size_t CompressBlosc::DecompressV1(const char *bufferIn, const size_t sizeIn,
// DecompressV2 and keep this function for decompressing lagacy data.

size_t bufferInOffset = 0;
size_t sizeOut = GetParameter<size_t>(bufferIn, bufferInOffset);
size_t sizeOut = GetParameter<size_t, size_t>(bufferIn, bufferInOffset);
if (sizeIn - bufferInOffset < sizeof(DataHeader))
{
throw("corrupted blosc buffer header");
Expand Down
4 changes: 2 additions & 2 deletions source/adios2/operator/compress/CompressMGARD.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,11 @@ size_t CompressMGARD::DecompressV1(const char *bufferIn, const size_t sizeIn,

size_t bufferInOffset = 0;

const size_t ndims = GetParameter<size_t>(bufferIn, bufferInOffset);
const size_t ndims = GetParameter<size_t, size_t>(bufferIn, bufferInOffset);
Dims blockCount(ndims);
for (size_t i = 0; i < ndims; ++i)
{
blockCount[i] = GetParameter<size_t>(bufferIn, bufferInOffset);
blockCount[i] = GetParameter<size_t, size_t>(bufferIn, bufferInOffset);
}
const DataType type = GetParameter<DataType>(bufferIn, bufferInOffset);

Expand Down
4 changes: 2 additions & 2 deletions source/adios2/operator/compress/CompressSZ.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,11 +319,11 @@ size_t CompressSZ::DecompressV1(const char *bufferIn, const size_t sizeIn,

size_t bufferInOffset = 0;

const size_t ndims = GetParameter<size_t>(bufferIn, bufferInOffset);
const size_t ndims = GetParameter<size_t, size_t>(bufferIn, bufferInOffset);
Dims blockCount(ndims);
for (size_t i = 0; i < ndims; ++i)
{
blockCount[i] = GetParameter<size_t>(bufferIn, bufferInOffset);
blockCount[i] = GetParameter<size_t, size_t>(bufferIn, bufferInOffset);
}
const DataType type = GetParameter<DataType>(bufferIn, bufferInOffset);

Expand Down
6 changes: 3 additions & 3 deletions source/adios2/operator/compress/CompressSirius.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,16 @@ size_t CompressSirius::DecompressV1(const char *bufferIn, const size_t sizeIn,
// DecompressV2 and keep this function for decompressing lagacy data.

size_t bufferInOffset = 0;
const size_t ndims = GetParameter<size_t>(bufferIn, bufferInOffset);
const size_t ndims = GetParameter<size_t, size_t>(bufferIn, bufferInOffset);
Dims blockStart(ndims);
Dims blockCount(ndims);
for (size_t i = 0; i < ndims; ++i)
{
blockStart[i] = GetParameter<size_t>(bufferIn, bufferInOffset);
blockStart[i] = GetParameter<size_t, size_t>(bufferIn, bufferInOffset);
}
for (size_t i = 0; i < ndims; ++i)
{
blockCount[i] = GetParameter<size_t>(bufferIn, bufferInOffset);
blockCount[i] = GetParameter<size_t, size_t>(bufferIn, bufferInOffset);
}
const DataType type = GetParameter<DataType>(bufferIn, bufferInOffset);

Expand Down
85 changes: 75 additions & 10 deletions source/adios2/operator/compress/CompressZFP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,33 @@ size_t CompressZFP::Compress(const char *dataIn, const Dims &blockStart,
Params &info)
{

Dims convertedDims = ConvertDims(blockCount, type, 3);
const uint8_t bufferVersion = 1;
size_t bufferOutOffset = 0;

// Universal operator metadata
PutParameter(bufferOut, bufferOutOffset, OperatorType::Sz);
PutParameter(bufferOut, bufferOutOffset, bufferVersion);
bufferOutOffset += 2;
// Universal operator metadata end

const size_t ndims = blockCount.size();

// zfp V1 metadata
PutParameter(bufferOut, bufferOutOffset, ndims);
for (const auto &d : blockCount)
{
PutParameter(bufferOut, bufferOutOffset, d);
}
PutParameter(bufferOut, bufferOutOffset, type);
PutParameters(bufferOut, bufferOutOffset, parameters);
// zfp V1 metadata end

Dims convertedDims = ConvertDims(blockCount, type, 3);
zfp_field *field = GetZFPField(dataIn, convertedDims, type);
zfp_stream *stream = GetZFPStream(convertedDims, type, parameters);
size_t maxSize = zfp_stream_maximum_size(stream, field);
// associate bitstream
bitstream *bitstream = stream_open(bufferOut, maxSize);
bitstream *bitstream = stream_open(bufferOut + bufferOutOffset, maxSize);
zfp_stream_set_bit_stream(stream, bitstream);
zfp_stream_rewind(stream);

Expand All @@ -46,24 +66,41 @@ size_t CompressZFP::Compress(const char *dataIn, const Dims &blockStart,
"size is 0, in call to Compress");
}

bufferOutOffset += sizeOut;

zfp_field_free(field);
zfp_stream_close(stream);
stream_close(bitstream);
return sizeOut;
return bufferOutOffset;
}

size_t CompressZFP::Decompress(const char *bufferIn, const size_t sizeIn,
char *dataOut, const DataType type,
const Dims &blockStart, const Dims &blockCount,
const Params &parameters, Params &info)
size_t CompressZFP::DecompressV1(const char *bufferIn, const size_t sizeIn,
char *dataOut)
{
// Do NOT remove even if the buffer version is updated. Data might be still
// in lagacy formats. This function must be kept for backward compatibility.
// If a newer buffer format is implemented, create another function, e.g.
// DecompressV2 and keep this function for decompressing lagacy data.

size_t bufferInOffset = 0;

const size_t ndims = GetParameter<size_t, size_t>(bufferIn, bufferInOffset);
Dims blockCount(ndims);
for (size_t i = 0; i < ndims; ++i)
{
blockCount[i] = GetParameter<size_t, size_t>(bufferIn, bufferInOffset);
}
const DataType type = GetParameter<DataType>(bufferIn, bufferInOffset);
const Params parameters = GetParameters(bufferIn, bufferInOffset);

Dims convertedDims = ConvertDims(blockCount, type, 3);

zfp_field *field = GetZFPField(dataOut, convertedDims, type);
zfp_stream *stream = GetZFPStream(convertedDims, type, parameters);

// associate bitstream
bitstream *bitstream = stream_open(const_cast<char *>(bufferIn), sizeIn);
bitstream *bitstream = stream_open(
const_cast<char *>(bufferIn + bufferInOffset), sizeIn - bufferInOffset);
zfp_stream_set_bit_stream(stream, bitstream);
zfp_stream_rewind(stream);

Expand All @@ -80,13 +117,41 @@ size_t CompressZFP::Decompress(const char *bufferIn, const size_t sizeIn,
zfp_stream_close(stream);
stream_close(bitstream);

const size_t typeSizeBytes = helper::GetDataTypeSize(type);
const size_t dataSizeBytes =
helper::GetTotalSize(convertedDims) * typeSizeBytes;
helper::GetTotalSize(convertedDims, helper::GetDataTypeSize(type));

return dataSizeBytes;
}

size_t CompressZFP::Decompress(const char *bufferIn, const size_t sizeIn,
char *dataOut, const DataType /*type*/,
const Dims & /*blockStart*/,
const Dims & /*blockCount*/,
const Params & /*parameters*/, Params &info)
{
size_t bufferInOffset = 1; // skip operator type
const uint8_t bufferVersion =
GetParameter<uint8_t>(bufferIn, bufferInOffset);
bufferInOffset += 2; // skip two reserved bytes

if (bufferVersion == 1)
{
return DecompressV1(bufferIn + bufferInOffset, sizeIn - bufferInOffset,
dataOut);
}
else if (bufferVersion == 2)
{
// TODO: if a Version 2 zfp buffer is being implemented, put it here
// and keep the DecompressV1 routine for backward compatibility
}
else
{
throw("unknown zfp buffer version");
}

return 0;
}

bool CompressZFP::IsDataTypeValid(const DataType type) const
{
#define declare_type(T) \
Expand Down
12 changes: 12 additions & 0 deletions source/adios2/operator/compress/CompressZFP.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,18 @@ class CompressZFP : public Operator
* @param hint extra exception information
*/
void CheckStatus(const int status, const std::string hint) const;

/**
* Decompress function for V1 buffer. Do NOT remove even if the buffer
* version is updated. Data might be still in lagacy formats. This function
* must be kept for backward compatibility
* @param bufferIn : compressed data buffer (V1 only)
* @param sizeIn : number of bytes in bufferIn
* @param dataOut : decompressed data buffer
* @return : number of bytes in dataOut
*/
size_t DecompressV1(const char *bufferIn, const size_t sizeIn,
char *dataOut);
};

} // end namespace compress
Expand Down