Skip to content

Feature/recordio #8759

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ include(external/eigen) # download eigen3
include(external/pybind11) # download pybind11
include(external/cares)
include(external/grpc)
include(external/snappy) # download snappy

include(cudnn) # set cudnn libraries, must before configure
include(cupti)
Expand Down
57 changes: 57 additions & 0 deletions cmake/external/snappy.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

IF(MOBILE_INFERENCE)
return()
ENDIF()

include (ExternalProject)

# NOTE: snappy is needed when linking with recordio

SET(SNAPPY_SOURCES_DIR ${THIRD_PARTY_PATH}/snappy)
SET(SNAPPY_INSTALL_DIR ${THIRD_PARTY_PATH}/install/snappy)
SET(SNAPPY_INCLUDE_DIR "${SNAPPY_INSTALL_DIR}/include/" CACHE PATH "snappy include directory." FORCE)

ExternalProject_Add(
extern_snappy
GIT_REPOSITORY "https://github.com/google/snappy"
GIT_TAG "1.1.7"
PREFIX ${SNAPPY_SOURCES_DIR}
UPDATE_COMMAND ""
CMAKE_ARGS -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}
-DCMAKE_C_COMPILER=${CMAKE_C_COMPILER}
-DCMAKE_CXX_FLAGS=${CMAKE_CXX_FLAGS}
-DCMAKE_C_FLAGS=${CMAKE_C_FLAGS}
-DCMAKE_INSTALL_PREFIX=${SNAPPY_INSTALL_DIR}
-DCMAKE_INSTALL_LIBDIR=${SNAPPY_INSTALL_DIR}/lib
-DCMAKE_POSITION_INDEPENDENT_CODE=ON
-DBUILD_TESTING=OFF
-DCMAKE_BUILD_TYPE=${THIRD_PARTY_BUILD_TYPE}
${EXTERNAL_OPTIONAL_ARGS}
CMAKE_CACHE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=${SNAPPY_INSTALL_DIR}
-DCMAKE_INSTALL_LIBDIR:PATH=${SNAPPY_INSTALL_DIR}/lib
-DCMAKE_POSITION_INDEPENDENT_CODE:BOOL=ON
-DCMAKE_BUILD_TYPE:STRING=${THIRD_PARTY_BUILD_TYPE}
BUILD_COMMAND make -j8
INSTALL_COMMAND make install
)

add_library(snappy STATIC IMPORTED GLOBAL)
set_property(TARGET snappy PROPERTY IMPORTED_LOCATION
"${SNAPPY_INSTALL_DIR}/lib/libsnappy.a")

include_directories(${SNAPPY_INCLUDE_DIR})
add_dependencies(snappy extern_snappy)
1 change: 1 addition & 0 deletions paddle/fluid/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ add_subdirectory(operators)
add_subdirectory(pybind)
add_subdirectory(inference)
add_subdirectory(string)
add_subdirectory(recordio)
14 changes: 14 additions & 0 deletions paddle/fluid/recordio/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# internal library.
cc_library(io SRCS io.cc DEPS stringpiece)
cc_test(io_test SRCS io_test.cc DEPS io)
cc_library(header SRCS header.cc DEPS io)
cc_test(header_test SRCS header_test.cc DEPS header)
cc_library(chunk SRCS chunk.cc DEPS snappy io header)
cc_test(chunk_test SRCS chunk_test.cc DEPS chunk)
cc_library(range_scanner SRCS range_scanner.cc DEPS io chunk)
cc_test(range_scanner_test SRCS range_scanner_test.cc DEPS range_scanner)
cc_library(scanner SRCS scanner.cc DEPS range_scanner)
cc_test(scanner_test SRCS scanner_test.cc DEPS scanner)
# exported library.
cc_library(recordio SRCS recordio.cc DEPS scanner chunk header)
cc_test(recordio_test SRCS recordio_test.cc DEPS scanner)
119 changes: 119 additions & 0 deletions paddle/fluid/recordio/chunk.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/recordio/chunk.h"

#include <cstring>
#include <sstream>
#include <utility>

#include "snappy.h"

#include "paddle/fluid/recordio/crc32.h"

namespace paddle {
namespace recordio {

void Chunk::Add(const char* record, size_t length) {
records_.emplace_back(std::string(record, length));
num_bytes_ += records_.back().size() * sizeof(char);
}

bool Chunk::Dump(Stream* fo, Compressor ct) {
// NOTE(dzhwinter): don't check records.numBytes instead, because
// empty records are allowed.
if (records_.size() == 0) return false;

// pack the record into consecutive memory for compress
std::ostringstream os;
for (auto& record : records_) {
size_t size = record.size();
os.write(reinterpret_cast<char*>(&size), sizeof(size));
os.write(record.data(), static_cast<std::streamsize>(record.size()));
}

std::unique_ptr<char[]> buffer(new char[num_bytes_]);
size_t compressed =
CompressData(os.str().c_str(), num_bytes_, ct, buffer.get());
uint32_t checksum = Crc32(buffer.get(), compressed);
Header hdr(records_.size(), checksum, ct, static_cast<uint32_t>(compressed));
hdr.Write(fo);
fo->Write(buffer.get(), compressed);
// clear the content
records_.clear();
num_bytes_ = 0;
return true;
}

void Chunk::Parse(Stream* fi, size_t offset) {
fi->Seek(offset);
Header hdr;
hdr.Parse(fi);

size_t size = static_cast<size_t>(hdr.CompressSize());
std::unique_ptr<char[]> buffer(new char[size]);
fi->Read(buffer.get(), size);
size_t deflated_size = 0;
snappy::GetUncompressedLength(buffer.get(), size, &deflated_size);
std::unique_ptr<char[]> deflated_buffer(new char[deflated_size]);
DeflateData(buffer.get(), size, hdr.CompressType(), deflated_buffer.get());
std::istringstream deflated(
std::string(deflated_buffer.get(), deflated_size));
for (size_t i = 0; i < hdr.NumRecords(); ++i) {
size_t rs;
deflated.read(reinterpret_cast<char*>(&rs), sizeof(size_t));
std::string record(rs, '\0');
deflated.read(&record[0], rs);
records_.emplace_back(record);
num_bytes_ += record.size();
}
}

size_t CompressData(const char* in,
size_t in_length,
Compressor ct,
char* out) {
size_t compressd_size = 0;
switch (ct) {
case Compressor::kNoCompress:
// do nothing
memcpy(out, in, in_length);
compressd_size = in_length;
break;
case Compressor::kSnappy:
snappy::RawCompress(in, in_length, out, &compressd_size);
break;
case Compressor::kGzip:
// TODO(dzhwinter): support gzip
break;
}
return compressd_size;
}

void DeflateData(const char* in, size_t in_length, Compressor ct, char* out) {
switch (ct) {
case Compressor::kNoCompress:
memcpy(out, in, in_length);
break;
case Compressor::kSnappy:
snappy::RawUncompress(in, in_length, out);
break;
case Compressor::kGzip:
// TODO(dzhwinter): support gzip
break;
}
}

} // namespace recordio
} // namespace paddle
49 changes: 49 additions & 0 deletions paddle/fluid/recordio/chunk.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once
#include <string>
#include <vector>

#include "paddle/fluid/recordio/header.h"
#include "paddle/fluid/recordio/io.h"

namespace paddle {
namespace recordio {

// A Chunk contains the Header and optionally compressed records.
class Chunk {
public:
Chunk() : num_bytes_(0) {}
void Add(const char* record, size_t size);
// dump the chunk into w, and clears the chunk and makes it ready for
// the next add invocation.
bool Dump(Stream* fo, Compressor ct);
void Parse(Stream* fi, size_t offset);
size_t NumBytes() { return num_bytes_; }
const std::string Record(int i) { return records_[i]; }

private:
std::vector<std::string> records_;
// sum of record lengths in bytes.
size_t num_bytes_;
DISABLE_COPY_AND_ASSIGN(Chunk);
};

size_t CompressData(const char* in, size_t in_length, Compressor ct, char* out);

void DeflateData(const char* in, size_t in_length, Compressor ct, char* out);

} // namespace recordio
} // namespace paddle
59 changes: 59 additions & 0 deletions paddle/fluid/recordio/chunk_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/recordio/chunk.h"

#include <sstream>

#include "gtest/gtest.h"

using namespace paddle::recordio;

TEST(Chunk, SaveLoad) {
Chunk ch;
ch.Add("12345", 6);
ch.Add("123", 4);
{
Stream* fs = Stream::Open("/tmp/record_11", "w");
ch.Dump(fs, Compressor::kNoCompress);
EXPECT_EQ(ch.NumBytes(), 0);
delete fs;
}
{
Stream* fs = Stream::Open("/tmp/record_11", "r");
ch.Parse(fs, 0);
EXPECT_EQ(ch.NumBytes(), 10);
delete fs;
}
}

TEST(Chunk, Compressor) {
Chunk ch;
ch.Add("12345", 6);
ch.Add("123", 4);
ch.Add("123", 4);
ch.Add("123", 4);
{
Stream* fs = Stream::Open("/tmp/record_12", "w");
ch.Dump(fs, Compressor::kSnappy);
EXPECT_EQ(ch.NumBytes(), 0);
delete fs;
}
{
Stream* fs = Stream::Open("/tmp/record_12", "r");
ch.Parse(fs, 0);
EXPECT_EQ(ch.NumBytes(), 10);
delete fs;
}
}
33 changes: 33 additions & 0 deletions paddle/fluid/recordio/crc32.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// A wrapper on crc library https://github.com/d-bahr/CRCpp
#include <cstdint>

#include "paddle/fluid/recordio/detail/crc.h"

namespace paddle {
namespace recordio {

// usage
// char data[] = "hello,world";
// crc = Crc32(data, 12);
// Assert_EQ(crc, 68a85159);

uint32_t Crc32(const char* data, size_t size) {
return CRC::Calculate(data, size, CRC::CRC_32());
}

} // namespace recordio
} // namespace paddle
Loading