Skip to content

Commit

Permalink
Rename VectorSerializer to IterativeVectorSerializer to distinguish i…
Browse files Browse the repository at this point in the history
…t from BatchVectorSerializer

Summary:
To make the differences between VectorSerializer and BatchVectorSerializer more explicit, I'm renaming VectorSerializer to 
IterativeVectorSerializer.

I also rename createSerializer to createIterativeSerializer, however, since Presto uses createSerializer, I've left it in.  Once this 
diff lands, I'll land a change in Presto to use createIterativeSerializer, and then land a change back in Velox to remove 
createSerializer.

Reviewed By: bikramSingh91

Differential Revision: D53201916
  • Loading branch information
Kevin Wilfong authored and facebook-github-bot committed Feb 7, 2024
1 parent e4a2ce2 commit 613c005
Show file tree
Hide file tree
Showing 12 changed files with 47 additions and 29 deletions.
5 changes: 3 additions & 2 deletions velox/serializers/CompactRowSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ void CompactRowVectorSerde::estimateSerializedSize(
}

namespace {
class CompactRowVectorSerializer : public VectorSerializer {
class CompactRowVectorSerializer : public IterativeVectorSerializer {
public:
using TRowSize = uint32_t;

Expand Down Expand Up @@ -120,7 +120,8 @@ std::string concatenatePartialRow(

} // namespace

std::unique_ptr<VectorSerializer> CompactRowVectorSerde::createSerializer(
std::unique_ptr<IterativeVectorSerializer>
CompactRowVectorSerde::createIterativeSerializer(
RowTypePtr /* type */,
int32_t /* numRows */,
StreamArena* streamArena,
Expand Down
2 changes: 1 addition & 1 deletion velox/serializers/CompactRowSerializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class CompactRowVectorSerde : public VectorSerde {

// This method is not used in production code. It is only used to
// support round-trip tests for deserialization.
std::unique_ptr<VectorSerializer> createSerializer(
std::unique_ptr<IterativeVectorSerializer> createIterativeSerializer(
RowTypePtr type,
int32_t numRows,
StreamArena* streamArena,
Expand Down
13 changes: 7 additions & 6 deletions velox/serializers/PrestoSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3316,9 +3316,9 @@ class PrestoBatchVectorSerializer : public BatchVectorSerializer {
const std::unique_ptr<folly::io::Codec> codec_;
};

class PrestoVectorSerializer : public VectorSerializer {
class PrestoIterativeVectorSerializer : public IterativeVectorSerializer {
public:
PrestoVectorSerializer(
PrestoIterativeVectorSerializer(
const RowTypePtr& rowType,
std::vector<VectorEncoding::Simple> encodings,
int32_t numRows,
Expand Down Expand Up @@ -3349,7 +3349,7 @@ class PrestoVectorSerializer : public VectorSerializer {
// Constructor that takes a row vector instead of only the types. This is
// different because then we know exactly how each vector is encoded
// (recursively).
PrestoVectorSerializer(
PrestoIterativeVectorSerializer(
const RowVectorPtr& rowVector,
StreamArena* streamArena,
bool useLosslessTimestamp,
Expand Down Expand Up @@ -3455,13 +3455,14 @@ void PrestoVectorSerde::estimateSerializedSize(
estimateSerializedSizeInt(vector->loadedVector(), rows, sizes, scratch);
}

std::unique_ptr<VectorSerializer> PrestoVectorSerde::createSerializer(
std::unique_ptr<IterativeVectorSerializer>
PrestoVectorSerde::createIterativeSerializer(
RowTypePtr type,
int32_t numRows,
StreamArena* streamArena,
const Options* options) {
const auto prestoOptions = toPrestoOptions(options);
return std::make_unique<PrestoVectorSerializer>(
return std::make_unique<PrestoIterativeVectorSerializer>(
type,
prestoOptions.encodings,
numRows,
Expand All @@ -3484,7 +3485,7 @@ void PrestoVectorSerde::deprecatedSerializeEncoded(
const Options* options,
OutputStream* out) {
auto prestoOptions = toPrestoOptions(options);
auto serializer = std::make_unique<PrestoVectorSerializer>(
auto serializer = std::make_unique<PrestoIterativeVectorSerializer>(
vector,
streamArena,
prestoOptions.useLosslessTimestamp,
Expand Down
11 changes: 6 additions & 5 deletions velox/serializers/PrestoSerializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ namespace facebook::velox::serializer::presto {
/// There are two ways to serialize data using PrestoVectorSerde:
///
/// 1. In order to append multiple RowVectors into the same serialized payload,
/// one can first create a VectorSerializer using createSerializer(), then
/// append successive RowVectors using VectorSerializer::append(). In this case,
/// since different RowVector might encode columns differently, data is always
/// flattened in the serialized payload.
/// one can first create an IterativeVectorSerializer using
/// createIterativeSerializer(), then append successive RowVectors using
/// IterativeVectorSerializer::append(). In this case, since different RowVector
/// might encode columns differently, data is always flattened in the serialized
/// payload.
///
/// Note that there are two flavors of append(), one that takes a range of rows,
/// and one that takes a list of row ids. The former is useful when serializing
Expand Down Expand Up @@ -76,7 +77,7 @@ class PrestoVectorSerde : public VectorSerde {
vector_size_t** sizes,
Scratch& scratch) override;

std::unique_ptr<VectorSerializer> createSerializer(
std::unique_ptr<IterativeVectorSerializer> createIterativeSerializer(
RowTypePtr type,
int32_t numRows,
StreamArena* streamArena,
Expand Down
5 changes: 3 additions & 2 deletions velox/serializers/UnsafeRowSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ void UnsafeRowVectorSerde::estimateSerializedSize(
}

namespace {
class UnsafeRowVectorSerializer : public VectorSerializer {
class UnsafeRowVectorSerializer : public IterativeVectorSerializer {
public:
using TRowSize = uint32_t;

Expand Down Expand Up @@ -122,7 +122,8 @@ std::string concatenatePartialRow(

} // namespace

std::unique_ptr<VectorSerializer> UnsafeRowVectorSerde::createSerializer(
std::unique_ptr<IterativeVectorSerializer>
UnsafeRowVectorSerde::createIterativeSerializer(
RowTypePtr /* type */,
int32_t /* numRows */,
StreamArena* streamArena,
Expand Down
2 changes: 1 addition & 1 deletion velox/serializers/UnsafeRowSerializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class UnsafeRowVectorSerde : public VectorSerde {

// This method is not used in production code. It is only used to
// support round-trip tests for deserialization.
std::unique_ptr<VectorSerializer> createSerializer(
std::unique_ptr<IterativeVectorSerializer> createIterativeSerializer(
RowTypePtr type,
int32_t numRows,
StreamArena* streamArena,
Expand Down
3 changes: 2 additions & 1 deletion velox/serializers/tests/CompactRowSerializerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class CompactRowSerializerTest : public ::testing::Test,

auto arena = std::make_unique<StreamArena>(pool_.get());
auto rowType = asRowType(rowVector->type());
auto serializer = serde_->createSerializer(rowType, numRows, arena.get());
auto serializer =
serde_->createIterativeSerializer(rowType, numRows, arena.get());

Scratch scratch;
serializer->append(rowVector, folly::Range(rows.data(), numRows), scratch);
Expand Down
4 changes: 2 additions & 2 deletions velox/serializers/tests/PrestoSerializerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ class PrestoSerializerTest
auto rowType = asRowType(rowVector->type());
auto numRows = rowVector->size();
auto paramOptions = getParamSerdeOptions(serdeOptions);
auto serializer =
serde_->createSerializer(rowType, numRows, arena.get(), &paramOptions);
auto serializer = serde_->createIterativeSerializer(
rowType, numRows, arena.get(), &paramOptions);
vector_size_t sizeEstimate = 0;

Scratch scratch;
Expand Down
3 changes: 2 additions & 1 deletion velox/serializers/tests/UnsafeRowSerializerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class UnsafeRowSerializerTest : public ::testing::Test,

auto arena = std::make_unique<StreamArena>(pool_.get());
auto rowType = std::dynamic_pointer_cast<const RowType>(rowVector->type());
auto serializer = serde_->createSerializer(rowType, numRows, arena.get());
auto serializer =
serde_->createIterativeSerializer(rowType, numRows, arena.get());

Scratch scratch;
serializer->append(rowVector, folly::Range(rows.data(), numRows), scratch);
Expand Down
6 changes: 3 additions & 3 deletions velox/vector/VectorStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class DefaultBatchVectorSerializer : public BatchVectorSerializer {
}

StreamArena arena(pool_);
auto serializer = serde_->createSerializer(
auto serializer = serde_->createIterativeSerializer(
asRowType(vector->type()), numRows, &arena, options_);
serializer->append(vector, ranges, scratch);
serializer->flush(stream);
Expand All @@ -67,7 +67,7 @@ getNamedVectorSerdeImpl() {

} // namespace

void VectorSerializer::append(const RowVectorPtr& vector) {
void IterativeVectorSerializer::append(const RowVectorPtr& vector) {
const IndexRange allRows{0, vector->size()};
Scratch scratch;
append(vector, folly::Range(&allRows, 1), scratch);
Expand Down Expand Up @@ -144,7 +144,7 @@ void VectorStreamGroup::createStreamTree(
RowTypePtr type,
int32_t numRows,
const VectorSerde::Options* options) {
serializer_ = serde_->createSerializer(type, numRows, this, options);
serializer_ = serde_->createIterativeSerializer(type, numRows, this, options);
}

void VectorStreamGroup::append(
Expand Down
20 changes: 16 additions & 4 deletions velox/vector/VectorStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ struct IndexRange {
/// Uses successive calls to `append` to add more rows to the serialization
/// buffer. Then call `flush` to write the aggregate serialized data to an
/// OutputStream.
class VectorSerializer {
class IterativeVectorSerializer {
public:
virtual ~VectorSerializer() = default;
virtual ~IterativeVectorSerializer() = default;

/// Serialize a subset of rows in a vector.
virtual void append(
Expand Down Expand Up @@ -159,7 +159,19 @@ class VectorSerde {
///
/// This is more appropriate if the use case involves many small writes, e.g.
/// partitioning a RowVector across multiple destinations.
virtual std::unique_ptr<VectorSerializer> createSerializer(
///
/// TODO: Remove createSerializer once Presto is updated to call
/// createIterativeSerializer.
virtual std::unique_ptr<IterativeVectorSerializer> createSerializer(
RowTypePtr type,
int32_t numRows,
StreamArena* streamArena,
const Options* options = nullptr) {
return createIterativeSerializer(
std::move(type), numRows, streamArena, options);
}

virtual std::unique_ptr<IterativeVectorSerializer> createIterativeSerializer(
RowTypePtr type,
int32_t numRows,
StreamArena* streamArena,
Expand Down Expand Up @@ -298,7 +310,7 @@ class VectorStreamGroup : public StreamArena {
const VectorSerde::Options* options = nullptr);

private:
std::unique_ptr<VectorSerializer> serializer_;
std::unique_ptr<IterativeVectorSerializer> serializer_;
VectorSerde* serde_{nullptr};
};

Expand Down
2 changes: 1 addition & 1 deletion velox/vector/tests/VectorStreamTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class MockVectorSerde : public VectorSerde {
const folly::Range<const IndexRange*>& ranges,
vector_size_t** sizes) override {}

std::unique_ptr<VectorSerializer> createSerializer(
std::unique_ptr<IterativeVectorSerializer> createIterativeSerializer(
RowTypePtr type,
int32_t numRows,
StreamArena* streamArena,
Expand Down

0 comments on commit 613c005

Please sign in to comment.