Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
sudo apt-get install libpcre3 libpcre3-dev libzstd-dev ninja-build
- name: Install cppcheck
run: |
git clone https://github.com/danmar/cppcheck.git --branch 2.14.1
git clone https://github.com/danmar/cppcheck.git --branch 2.17.1
cd cppcheck
cmake -S. -B build \
-DCMAKE_BUILD_TYPE=Release \
Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## 0.34.2 - 2025-05-06

#### Bug fixes
- Fixed potential for unaligned records in live and historical streaming requests

## 0.34.1 - 2025-04-29

### Enhancements
Expand Down
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ cmake_minimum_required(VERSION 3.24..4.0)

project(
databento
VERSION 0.34.1
VERSION 0.34.2
LANGUAGES CXX
DESCRIPTION "Official Databento client library"
)
Expand Down
5 changes: 2 additions & 3 deletions include/databento/dbn_decoder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <string>

#include "databento/dbn.hpp"
#include "databento/detail/buffer.hpp"
#include "databento/enums.hpp" // Upgrade Policy
#include "databento/file_stream.hpp"
#include "databento/ireadable.hpp"
Expand Down Expand Up @@ -54,16 +55,14 @@ class DbnDecoder {
const std::byte* buffer_end);
bool DetectCompression();
std::size_t FillBuffer();
std::size_t GetReadBufferSize() const;
RecordHeader* BufferRecordHeader();

ILogReceiver* log_receiver_;
std::uint8_t version_{};
VersionUpgradePolicy upgrade_policy_;
bool ts_out_{};
std::unique_ptr<IReadable> input_;
std::vector<std::byte> read_buffer_;
std::size_t buffer_idx_{};
detail::Buffer buffer_{};
// Must be 8-byte aligned for records
alignas(RecordHeader) std::array<std::byte, kMaxRecordLen> compat_buffer_{};
Record current_record_{nullptr};
Expand Down
25 changes: 21 additions & 4 deletions include/databento/detail/buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <cstddef>
#include <memory>
#include <new>

#include "databento/ireadable.hpp"
#include "databento/iwritable.hpp"
Expand All @@ -11,7 +12,7 @@ class Buffer : public IReadable, public IWritable {
public:
Buffer() : Buffer(64 * std::size_t{1 << 10}) {}
explicit Buffer(std::size_t init_capacity)
: buf_{std::make_unique<std::byte[]>(init_capacity)},
: buf_{AlignedNew(init_capacity), AlignedDelete},
end_{buf_.get() + init_capacity},
read_pos_{buf_.get()},
write_pos_{buf_.get()} {}
Expand All @@ -22,7 +23,9 @@ class Buffer : public IReadable, public IWritable {
void WriteAll(const std::byte* data, std::size_t length) override;

std::byte*& WriteBegin() { return write_pos_; }
std::byte* WriteEnd() const { return end_; }
std::byte* WriteEnd() { return end_; }
const std::byte* WriteBegin() const { return write_pos_; }
const std::byte* WriteEnd() const { return end_; }
std::size_t WriteCapacity() const {
return static_cast<std::size_t>(end_ - write_pos_);
}
Expand All @@ -32,7 +35,9 @@ class Buffer : public IReadable, public IWritable {
std::size_t ReadSome(std::byte* buffer, std::size_t max_length) override;

std::byte*& ReadBegin() { return read_pos_; }
std::byte* ReadEnd() const { return write_pos_; }
std::byte* ReadEnd() { return write_pos_; }
const std::byte* ReadBegin() const { return read_pos_; }
const std::byte* ReadEnd() const { return write_pos_; }
std::size_t ReadCapacity() const {
return static_cast<std::size_t>(write_pos_ - read_pos_);
}
Expand All @@ -48,7 +53,19 @@ class Buffer : public IReadable, public IWritable {
void Shift();

private:
std::unique_ptr<std::byte[]> buf_;
static constexpr std::align_val_t kAlignment{8};

using UniqueBufPtr = std::unique_ptr<std::byte[], void (*)(std::byte*)>;

std::byte* AlignedNew(std::size_t capacity) {
// Can't use `new` expression due to MSVC bug
// See
// https://developercommunity.visualstudio.com/t/using-c17-new-stdalign-val-tn-syntax-results-in-er/528320
return static_cast<std::byte*>(operator new[](capacity, kAlignment));
}
static void AlignedDelete(std::byte* p) { operator delete[](p, kAlignment); }

UniqueBufPtr buf_;
std::byte* end_;
std::byte* read_pos_{};
std::byte* write_pos_{};
Expand Down
12 changes: 3 additions & 9 deletions include/databento/detail/dbn_buffer_decoder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

#include "databento/detail/buffer.hpp"
#include "databento/detail/zstd_stream.hpp"
#include "databento/ireadable.hpp"
#include "databento/record.hpp"
#include "databento/timeseries.hpp"

Expand All @@ -18,7 +17,8 @@ class DbnBufferDecoder {
const RecordCallback& record_callback)
: metadata_callback_{metadata_callback},
record_callback_{record_callback},
zstd_stream_{InitZstdBuffer()} {}
zstd_stream_{std::make_unique<Buffer>()},
zstd_buffer_{static_cast<Buffer*>(zstd_stream_.Input())} {}

KeepGoing Process(const char* data, std::size_t length);

Expand All @@ -29,12 +29,6 @@ class DbnBufferDecoder {
Records,
};

std::unique_ptr<IReadable> InitZstdBuffer() {
auto zstd_buffer = std::make_unique<Buffer>();
zstd_buffer_ = zstd_buffer.get();
return zstd_buffer;
}

const MetadataCallback& metadata_callback_;
const RecordCallback& record_callback_;
ZstdDecodeStream zstd_stream_;
Expand All @@ -43,7 +37,7 @@ class DbnBufferDecoder {
std::size_t bytes_needed_{};
alignas(RecordHeader) std::array<std::byte, kMaxRecordLen> compat_buffer_{};
std::uint8_t input_version_{};
bool ts_out_;
bool ts_out_{};
DecoderState state_{DecoderState::Init};
};
} // namespace databento::detail
6 changes: 4 additions & 2 deletions include/databento/detail/zstd_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <memory> // unique_ptr
#include <vector>

#include "databento/detail/buffer.hpp"
#include "databento/ireadable.hpp"
#include "databento/iwritable.hpp"
#include "databento/log.hpp"
Expand All @@ -14,15 +15,16 @@ namespace databento::detail {
class ZstdDecodeStream : public IReadable {
public:
explicit ZstdDecodeStream(std::unique_ptr<IReadable> input);
ZstdDecodeStream(std::unique_ptr<IReadable> input,
std::vector<std::byte>&& in_buffer);
ZstdDecodeStream(std::unique_ptr<IReadable> input, detail::Buffer& in_buffer);

// Read exactly `length` bytes into `buffer`.
void ReadExact(std::byte* buffer, std::size_t length) override;
// Read at most `length` bytes. Returns the number of bytes read. Will only
// return 0 if the end of the stream is reached.
std::size_t ReadSome(std::byte* buffer, std::size_t max_length) override;

IReadable* Input() const { return input_.get(); }

private:
std::unique_ptr<IReadable> input_;
std::unique_ptr<ZSTD_DStream, std::size_t (*)(ZSTD_DStream*)> z_dstream_;
Expand Down
2 changes: 1 addition & 1 deletion pkg/PKGBUILD
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Maintainer: Databento <support@databento.com>
_pkgname=databento-cpp
pkgname=databento-cpp-git
pkgver=0.34.1
pkgver=0.34.2
pkgrel=1
pkgdesc="Official C++ client for Databento"
arch=('any')
Expand Down
1 change: 0 additions & 1 deletion src/dbn_constants.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ constexpr std::size_t kFixedMetadataLen = 100;
constexpr std::size_t kDatasetCstrLen = 16;
constexpr std::size_t kMetadataReservedLen = 53;
constexpr std::size_t kMetadataReservedLenV1 = 47;
constexpr std::size_t kBufferCapacity = 8UL * 1024;
constexpr std::uint16_t kNullSchema = std::numeric_limits<std::uint16_t>::max();
constexpr std::uint8_t kNullSType = std::numeric_limits<std::uint8_t>::max();
constexpr std::uint64_t kNullRecordCount =
Expand Down
88 changes: 41 additions & 47 deletions src/dbn_decoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "databento/compat.hpp"
#include "databento/constants.hpp"
#include "databento/datetime.hpp"
#include "databento/detail/buffer.hpp"
#include "databento/detail/zstd_stream.hpp"
#include "databento/enums.hpp"
#include "databento/exceptions.hpp"
Expand All @@ -21,7 +22,7 @@ using databento::DbnDecoder;
namespace {
template <typename T>
T Consume(const std::byte*& buf) {
const auto res = *reinterpret_cast<const T*>(&*buf);
const auto res = *reinterpret_cast<const T*>(buf);
buf += sizeof(T);
return res;
}
Expand All @@ -34,7 +35,7 @@ std::uint8_t Consume(const std::byte*& buf) {
}

const char* Consume(const std::byte*& buf, const std::ptrdiff_t num_bytes) {
const auto* pos = &*buf;
const auto* pos = buf;
buf += num_bytes;
return reinterpret_cast<const char*>(pos);
}
Expand Down Expand Up @@ -76,16 +77,12 @@ DbnDecoder::DbnDecoder(ILogReceiver* log_receiver,
: log_receiver_{log_receiver},
upgrade_policy_{upgrade_policy},
input_{std::move(input)} {
read_buffer_.reserve(kBufferCapacity);
if (DetectCompression()) {
input_ = std::make_unique<detail::ZstdDecodeStream>(
std::move(input_), std::move(read_buffer_));
// Reinitialize buffer and get it into the same state as uncompressed input
read_buffer_ = std::vector<std::byte>();
read_buffer_.reserve(kBufferCapacity);
read_buffer_.resize(kMagicSize);
input_->ReadExact(read_buffer_.data(), kMagicSize);
const auto* buf_ptr = read_buffer_.data();
input_ =
std::make_unique<detail::ZstdDecodeStream>(std::move(input_), buffer_);
input_->ReadExact(buffer_.WriteBegin(), kMagicSize);
buffer_.WriteBegin() += kMagicSize;
const auto* buf_ptr = buffer_.ReadBegin();
if (std::strncmp(Consume(buf_ptr, 3), kDbnPrefix, 3) != 0) {
throw DbnResponseError{"Found Zstd input, but not DBN prefix"};
}
Expand Down Expand Up @@ -181,16 +178,22 @@ databento::Metadata DbnDecoder::DecodeMetadataFields(

databento::Metadata DbnDecoder::DecodeMetadata() {
// already read first 4 bytes detecting compression
read_buffer_.resize(kMetadataPreludeSize);
input_->ReadExact(&read_buffer_[4], 4);
const auto read_size = kMetadataPreludeSize - kMagicSize;
input_->ReadExact(buffer_.WriteBegin(), read_size);
buffer_.WriteBegin() += read_size;
const auto [version, size] = DbnDecoder::DecodeMetadataVersionAndSize(
read_buffer_.data(), kMetadataPreludeSize);
buffer_.ReadBegin(), kMetadataPreludeSize);
buffer_.ReadBegin() += kMetadataPreludeSize;
version_ = version;
read_buffer_.resize(size);
input_->ReadExact(read_buffer_.data(), read_buffer_.size());
buffer_idx_ = read_buffer_.size();
buffer_.Reserve(size);
input_->ReadExact(buffer_.WriteBegin(), size);
buffer_.WriteBegin() += size;
auto metadata = DbnDecoder::DecodeMetadataFields(
version_, read_buffer_.data(), read_buffer_.data() + read_buffer_.size());
version_, buffer_.ReadBegin(), buffer_.ReadEnd());
buffer_.ReadBegin() += size;
// Metadata may leave buffer misaligned. Shift records to ensure 8-byte
// alignment
buffer_.Shift();
ts_out_ = metadata.ts_out;
metadata.Upgrade(upgrade_policy_);
return metadata;
Expand Down Expand Up @@ -239,60 +242,53 @@ databento::Record DbnDecoder::DecodeRecordCompat(
// assumes DecodeMetadata has been called
const databento::Record* DbnDecoder::DecodeRecord() {
// need some unread unread_bytes
if (GetReadBufferSize() == 0) {
if (buffer_.ReadCapacity() == 0) {
if (FillBuffer() == 0) {
return nullptr;
}
}
// check length
while (GetReadBufferSize() < BufferRecordHeader()->Size()) {
while (buffer_.ReadCapacity() < BufferRecordHeader()->Size()) {
if (FillBuffer() == 0) {
if (GetReadBufferSize() > 0) {
if (buffer_.ReadCapacity() > 0) {
log_receiver_->Receive(
LogLevel::Warning,
"Unexpected partial record remaining in stream: " +
std::to_string(GetReadBufferSize()) + " bytes");
std::to_string(buffer_.ReadCapacity()) + " bytes");
}
return nullptr;
}
}
current_record_ = Record{BufferRecordHeader()};
buffer_idx_ += current_record_.Size();
buffer_.ReadBegin() += current_record_.Size();
current_record_ = DbnDecoder::DecodeRecordCompat(
version_, upgrade_policy_, ts_out_, &compat_buffer_, current_record_);
return &current_record_;
}

size_t DbnDecoder::FillBuffer() {
// Shift data forward
std::copy(read_buffer_.cbegin() + static_cast<std::ptrdiff_t>(buffer_idx_),
read_buffer_.cend(), read_buffer_.begin());
const auto unread_size = read_buffer_.size() - buffer_idx_;
buffer_idx_ = 0;
read_buffer_.resize(kBufferCapacity);
const auto fill_size = input_->ReadSome(&read_buffer_[unread_size],
kBufferCapacity - unread_size);
read_buffer_.resize(unread_size + fill_size);
if (buffer_.WriteCapacity() < kMaxRecordLen) {
buffer_.Shift();
}
const auto fill_size =
input_->ReadSome(buffer_.WriteBegin(), buffer_.WriteCapacity());
buffer_.WriteBegin() += fill_size;
return fill_size;
}

std::size_t DbnDecoder::GetReadBufferSize() const {
return read_buffer_.size() - buffer_idx_;
}

databento::RecordHeader* DbnDecoder::BufferRecordHeader() {
return reinterpret_cast<RecordHeader*>(&read_buffer_[buffer_idx_]);
return reinterpret_cast<RecordHeader*>(buffer_.ReadBegin());
}

bool DbnDecoder::DetectCompression() {
read_buffer_.resize(kMagicSize);
input_->ReadExact(read_buffer_.data(), kMagicSize);
const auto* read_buffer_it = read_buffer_.data();
if (std::strncmp(Consume(read_buffer_it, 3), kDbnPrefix, 3) == 0) {
input_->ReadExact(buffer_.WriteBegin(), kMagicSize);
buffer_.WriteBegin() += kMagicSize;
const auto* buffer_it = buffer_.ReadBegin();
if (std::strncmp(Consume(buffer_it, 3), kDbnPrefix, 3) == 0) {
return false;
}
read_buffer_it = read_buffer_.data();
auto x = Consume<std::uint32_t>(read_buffer_it);
buffer_it = buffer_.ReadBegin();
auto x = Consume<std::uint32_t>(buffer_it);
if (x == kZstdMagicNumber) {
return true;
}
Expand All @@ -302,12 +298,10 @@ bool DbnDecoder::DetectCompression() {
if ((x & kZstdSkippableFrame) == kZstdSkippableFrame) {
throw DbnResponseError{
"Legacy DBZ encoding is not supported. Please use the dbn CLI tool "
"to "
"convert it to DBN."};
"to convert it to DBN."};
}
throw DbnResponseError{
"Couldn't detect input type. It doesn't appear to be Zstd or "
"DBN."};
"Couldn't detect input type. It doesn't appear to be Zstd or DBN."};
}

std::string DbnDecoder::DecodeSymbol(std::size_t symbol_cstr_len,
Expand Down
2 changes: 1 addition & 1 deletion src/detail/buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ void Buffer::Reserve(std::size_t capacity) {
if (capacity <= Capacity()) {
return;
}
auto new_buf = std::make_unique<std::byte[]>(capacity);
UniqueBufPtr new_buf{AlignedNew(capacity), AlignedDelete};
const auto unread_bytes = ReadCapacity();
std::copy(ReadBegin(), ReadEnd(), new_buf.get());
buf_ = std::move(new_buf);
Expand Down
3 changes: 3 additions & 0 deletions src/detail/dbn_buffer_decoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ databento::KeepGoing DbnBufferDecoder::Process(const char* data,
auto metadata = DbnDecoder::DecodeMetadataFields(
input_version_, dbn_buffer_.ReadBegin(), dbn_buffer_.ReadEnd());
dbn_buffer_.ReadBegin() += bytes_needed_;
// Metadata may leave buffer misaligned. Shift records to ensure 8-byte
// alignment
dbn_buffer_.Shift();
ts_out_ = metadata.ts_out;
metadata.Upgrade(VersionUpgradePolicy::UpgradeToV2);
if (metadata_callback_) {
Expand Down
Loading
Loading