From 98a288ec85424ac698b061c6376cdefccfe074da Mon Sep 17 00:00:00 2001 From: dangleptr <37216992+dangleptr@users.noreply.github.com> Date: Mon, 16 Dec 2019 17:30:08 +0800 Subject: [PATCH] Refactor JNI library (#1402) * Refactor jni * Add UTs * Rename class * Update interfaces * Address critical27 and darion's comments * Address darion's comments --- .linters/cpp/hooks/pre-commit.sh | 5 +- src/common/base/Cord.cpp | 11 +- src/common/base/Cord.h | 6 +- src/common/base/ThriftTypes.h | 2 + src/common/base/test/CordTest.cpp | 5 +- src/dataman/RowReader.cpp | 2 +- src/dataman/RowWriter.cpp | 2 +- src/dataman/RowWriter.inl | 2 +- src/jni/CMakeLists.txt | 58 ++ src/jni/README.md | 22 + src/jni/java/pom.xml | 213 +++++++ .../java/com/vesoft/nebula/NebulaCodec.java | 110 ++++ .../nebula/NebulaCodecResourceLoader.java | 47 ++ .../com/vesoft/nebula/NebulaCodecTest.java | 73 +++ src/jni/src/CMakeLists.txt | 16 + src/jni/src/com_vesoft_nebula_NebulaCodec.cpp | 216 +++++++ src/jni/src/com_vesoft_nebula_NebulaCodec.h | 64 ++ src/jni/src/datamanlite/CMakeLists.txt | 15 + src/jni/src/datamanlite/DataCommon.cpp | 41 ++ src/jni/src/datamanlite/DataCommon.h | 48 ++ .../src/datamanlite/NebulaSchemaProvider.cpp | 99 +++ .../src/datamanlite/NebulaSchemaProvider.h | 86 +++ src/jni/src/datamanlite/RowReader.cpp | 595 ++++++++++++++++++ src/jni/src/datamanlite/RowReader.h | 286 +++++++++ src/jni/src/datamanlite/RowWriter.cpp | 118 ++++ src/jni/src/datamanlite/RowWriter.h | 100 +++ src/jni/src/datamanlite/SchemaProviderIf.cpp | 9 + src/jni/src/datamanlite/SchemaProviderIf.h | 148 +++++ src/jni/src/datamanlite/Slice.cpp | 85 +++ src/jni/src/datamanlite/Slice.h | 156 +++++ src/jni/src/datamanlite/test/CMakeLists.txt | 38 ++ .../src/datamanlite/test/RowReaderTest.cpp | 275 ++++++++ .../src/datamanlite/test/RowWriterTest.cpp | 103 +++ 33 files changed, 3036 insertions(+), 20 deletions(-) create mode 100644 src/jni/CMakeLists.txt create mode 100644 src/jni/README.md create mode 100644 src/jni/java/pom.xml create mode 100644 src/jni/java/src/main/java/com/vesoft/nebula/NebulaCodec.java create mode 100644 src/jni/java/src/main/java/com/vesoft/nebula/NebulaCodecResourceLoader.java create mode 100644 src/jni/java/src/test/java/com/vesoft/nebula/NebulaCodecTest.java create mode 100644 src/jni/src/CMakeLists.txt create mode 100644 src/jni/src/com_vesoft_nebula_NebulaCodec.cpp create mode 100644 src/jni/src/com_vesoft_nebula_NebulaCodec.h create mode 100644 src/jni/src/datamanlite/CMakeLists.txt create mode 100644 src/jni/src/datamanlite/DataCommon.cpp create mode 100644 src/jni/src/datamanlite/DataCommon.h create mode 100644 src/jni/src/datamanlite/NebulaSchemaProvider.cpp create mode 100644 src/jni/src/datamanlite/NebulaSchemaProvider.h create mode 100644 src/jni/src/datamanlite/RowReader.cpp create mode 100644 src/jni/src/datamanlite/RowReader.h create mode 100644 src/jni/src/datamanlite/RowWriter.cpp create mode 100644 src/jni/src/datamanlite/RowWriter.h create mode 100644 src/jni/src/datamanlite/SchemaProviderIf.cpp create mode 100644 src/jni/src/datamanlite/SchemaProviderIf.h create mode 100644 src/jni/src/datamanlite/Slice.cpp create mode 100644 src/jni/src/datamanlite/Slice.h create mode 100644 src/jni/src/datamanlite/test/CMakeLists.txt create mode 100644 src/jni/src/datamanlite/test/RowReaderTest.cpp create mode 100644 src/jni/src/datamanlite/test/RowWriterTest.cpp diff --git a/.linters/cpp/hooks/pre-commit.sh b/.linters/cpp/hooks/pre-commit.sh index c173dcd8597..e274a0e0c6b 100755 --- a/.linters/cpp/hooks/pre-commit.sh +++ b/.linters/cpp/hooks/pre-commit.sh @@ -16,13 +16,14 @@ if [ $# -eq 0 ];then echo "You have unstaged changes, please stage or stash them first." exit 1 fi - CHECK_FILES=$(git diff --name-only --diff-filter=ACMRTUXB HEAD | egrep '.*\.cpp$|.*\.h$|.*\.inl$' | grep -v 'com_vesoft_client_NativeClient.h') + CHECK_FILES=$(git diff --name-only --diff-filter=ACMRTUXB HEAD | egrep '.*\.cpp$|.*\.h$|.*\.inl$' | grep -v 'com_vesoft_client_NativeClient.h' | grep -v 'com_vesoft_nebula_NebulaCodec.h') else CHECK_FILES=$(find $@ -not \( -path src/CMakeFiles -prune \) \ -not \( -path src/interface/gen-cpp2 -prune \) \ -name "*.[h]" -o -name "*.cpp" -o -name '*.inl' \ | grep -v 'GraphScanner.*' | grep -v 'GraphParser.*' \ - | grep -v 'com_vesoft_client_NativeClient.h') + | grep -v 'com_vesoft_client_NativeClient.h' \ + | grep -v 'com_vesoft_nebula_NebulaCodec.h') fi # No changes on interested files diff --git a/src/common/base/Cord.cpp b/src/common/base/Cord.cpp index d53fd628d15..19777619f1a 100644 --- a/src/common/base/Cord.cpp +++ b/src/common/base/Cord.cpp @@ -4,8 +4,8 @@ * attached with Common Clause Condition 1.0, found in the LICENSES directory. */ -#include "base/Base.h" #include "base/Cord.h" +#include "base/Logging.h" namespace nebula { @@ -225,15 +225,6 @@ Cord& Cord::operator<<(const char* value) { return write(value, strlen(value)); } - -Cord& Cord::operator<<(const folly::StringPiece value) { - return write(value.begin(), value.size()); -} - -Cord& Cord::operator<<(const folly::ByteRange value) { - return write(reinterpret_cast(value.begin()), value.size()); -} - Cord& Cord::operator<<(const Cord& rhs) { char* next = rhs.head_; while (next != rhs.tail_) { diff --git a/src/common/base/Cord.h b/src/common/base/Cord.h index ab616c901eb..33415712f3d 100644 --- a/src/common/base/Cord.h +++ b/src/common/base/Cord.h @@ -7,7 +7,9 @@ #ifndef COMMON_BASE_CORD_H_ #define COMMON_BASE_CORD_H_ -#include "base/Base.h" +#include +#include +#include namespace nebula { @@ -53,8 +55,6 @@ class Cord { Cord& operator<<(const std::string& value); Cord& operator<<(const char* value); - Cord& operator<<(const folly::StringPiece value); - Cord& operator<<(const folly::ByteRange value); Cord& operator<<(const Cord& rhs); diff --git a/src/common/base/ThriftTypes.h b/src/common/base/ThriftTypes.h index 57b93739fc0..a5bdbafac99 100644 --- a/src/common/base/ThriftTypes.h +++ b/src/common/base/ThriftTypes.h @@ -7,6 +7,8 @@ #ifndef COMMON_BASE_THRIFTTYPES_H_ #define COMMON_BASE_THRIFTTYPES_H_ +#include + namespace nebula { // Raft related types diff --git a/src/common/base/test/CordTest.cpp b/src/common/base/test/CordTest.cpp index 291b439b6a9..a2f21d5ea2f 100644 --- a/src/common/base/test/CordTest.cpp +++ b/src/common/base/test/CordTest.cpp @@ -86,7 +86,7 @@ TEST(Cord, byteStream) { 0x88, 0x99, 0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF}; Cord cord2; - cord2 << folly::ByteRange(bytes, sizeof(bytes)); + cord2.write(reinterpret_cast(&bytes[0]), sizeof(bytes)); std::string str = cord2.str(); EXPECT_EQ(sizeof(bytes), str.size()); @@ -187,7 +187,8 @@ TEST(Cord, stringStream) { Cord cord; - cord << str1 << str2 << folly::StringPiece(str3); + cord << str1 << str2; + cord.write(str3.data(), str3.size()); EXPECT_EQ(str1.size() + strlen(str2) + str3.size(), cord.size()); EXPECT_EQ(str1.size() + strlen(str2) + str3.size(), cord.str().size()); diff --git a/src/dataman/RowReader.cpp b/src/dataman/RowReader.cpp index 5a040da64db..7a6dcea6852 100644 --- a/src/dataman/RowReader.cpp +++ b/src/dataman/RowReader.cpp @@ -293,7 +293,7 @@ int64_t RowReader::skipToNext(int64_t index, int64_t offset) const noexcept { break; } case cpp2::SupportedType::FLOAT: { - // Eight bytes + // Four bytes offset += sizeof(float); break; } diff --git a/src/dataman/RowWriter.cpp b/src/dataman/RowWriter.cpp index be91b750370..21c178fceb0 100644 --- a/src/dataman/RowWriter.cpp +++ b/src/dataman/RowWriter.cpp @@ -171,7 +171,7 @@ RowWriter& RowWriter::operator<<(folly::StringPiece v) noexcept { switch (type->get_type()) { case SupportedType::STRING: { writeInt(v.size()); - cord_ << v; + cord_.write(v.data(), v.size()); break; } default: { diff --git a/src/dataman/RowWriter.inl b/src/dataman/RowWriter.inl index 1696b3b8a4a..85844e20220 100644 --- a/src/dataman/RowWriter.inl +++ b/src/dataman/RowWriter.inl @@ -39,7 +39,7 @@ RowWriter::writeInt(T v) { uint8_t buf[10]; size_t len = folly::encodeVarint(v, buf); DCHECK_GT(len, 0UL); - cord_ << folly::ByteRange(buf, len); + cord_.write(reinterpret_cast(&buf[0]), len); } } // namespace nebula diff --git a/src/jni/CMakeLists.txt b/src/jni/CMakeLists.txt new file mode 100644 index 00000000000..a3f891dbabf --- /dev/null +++ b/src/jni/CMakeLists.txt @@ -0,0 +1,58 @@ +# Copyright (c) 2019 vesoft inc. All rights reserved. +# +# This source code is licensed under Apache 2.0 License, +# attached with Common Clause Condition 1.0, found in the LICENSES directory. +# +# The build can be controlled by defining following variables on the +# command line +# +# CMAKE_C_COMPILER -- Specify the compiler for C language +# CMAKE_CXX_COMPILER -- Specify the compiler for C++ language +# +# NEBULA_HOME -- Specify the root directory for nebula project +# NEBULA_THIRDPARTY_ROOT -- Specify the third-party root dir. +# ENABLE_TESTING -- Build unit test +# +cmake_minimum_required(VERSION 3.0.0) + +project("Nebula Graph codec" C CXX) + +set(CMAKE_CXX_STANDARD 14) +set(CMAKE_CXX_STANDARD_REQUIRED ON) + +option(ENABLE_TESTING "Whether to compile unit test ON or OFF" OFF) + +message(STATUS "CMAKE_CURRENT_BINARY_DIR:" ${CMAKE_CURRENT_BINARY_DIR}) +message(STATUS "CMAKE_CURRENT_SOURCE_DIR:" ${CMAKE_CURRENT_SOURCE_DIR}) +message(STATUS "NEBULA_HOME:" ${NEBULA_HOME}) +message(STATUS "NEBULA_THIRDPARTY_ROOT:" ${NEBULA_THIRDPARTY_ROOT}) + +# locate jni header +include_directories($ENV{JAVA_HOME}/include + $ENV{JAVA_HOME}/include/linux) +include_directories(AFTER ${NEBULA_HOME}/src) +include_directories(AFTER ${NEBULA_HOME}/src/common) +include_directories(AFTER ${NEBULA_HOME}/src/jni/src) + +include_directories(SYSTEM ${NEBULA_THIRDPARTY_ROOT}/include) +link_directories( + ${NEBULA_THIRDPARTY_ROOT}/lib + ${NEBULA_THIRDPARTY_ROOT}/lib64 +) + +if (ENABLE_TESTING) + enable_testing() +endif() + +if (!CMAKE_CXX_COMPILER) + message(FATAL_ERROR "No C++ compiler found") +endif() + +add_compile_options(-fPIC) +add_compile_options(-Wall) +add_compile_options(-Werror) +add_compile_options(-Wunused-parameter) + + +add_subdirectory(src) + diff --git a/src/jni/README.md b/src/jni/README.md new file mode 100644 index 00000000000..e1caff71f42 --- /dev/null +++ b/src/jni/README.md @@ -0,0 +1,22 @@ +# Nebula JNI codec library +It supports encode/decode data in nebula graph. + + +# How to build + +## Build requirements +To build this project, you must have: + * Access to the Internet + * make + * cmake 3.0.0+ + * GCC 6.0+ + * glog + + +## Steps + * mkdir build && cd build + * cmake .. -DNEBULA_HOME=${nebula project root dir} -DNEBULA_THIRDPARTY_ROOT=${dependencies root dir} + * make + * cd ../java && mvn clean package + +You could find the jni java package nebula-utils-1.0.0-beta.jar under java/target dir diff --git a/src/jni/java/pom.xml b/src/jni/java/pom.xml new file mode 100644 index 00000000000..c230c16689a --- /dev/null +++ b/src/jni/java/pom.xml @@ -0,0 +1,213 @@ + + + + 4.0.0 + + com.vesoft + nebula-utils + 1.0.0-rc2 + + + ../../.. + + + + + org.slf4j + slf4j-api + 1.7.25 + + + org.slf4j + slf4j-log4j12 + 1.7.25 + + + junit + junit + 4.12 + test + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.0 + + 1.6 + 1.6 + UTF-8 + true + 256m + 512m + + + + validate + + compile + + + + + + org.codehaus.mojo + native-maven-plugin + 1.0-alpha-9 + + + javah + generate-resources + + javah + + + ${project.root.dir}/src/jni/src + + + com_vesoft_nebula_NebulaCodec.h + + + com.vesoft.nebula.NebulaCodec + + + + + + + + org.apache.maven.plugins + maven-resources-plugin + 3.1.0 + + + copy-resources + package + + copy-resources + + + UTF-8 + ${basedir}/src/main/resources/ + + + + ${project.root.dir}/src/jni/build/src + + libnebula_codec.so + + + false + + + + + + + + org.apache.maven.plugins + maven-checkstyle-plugin + 3.0.0 + + + ${project.root.dir}/.linters/java/nebula_java_style_checks.xml + + UTF-8 + true + false + true + + 0 + warning + + + + checkstyle + validate + + check + + + + + + com.puppycrawl.tools + checkstyle + 8.18 + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.10 + + + -Djava.library.path=${project.root.dir}/src/jni/build/src + + + + org.apache.maven.plugins + maven-source-plugin + + + attach-sources + + jar + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + 3.1.1 + + + attach-javadocs + package + + jar + + + none + + + + + + org.apache.maven.plugins + maven-gpg-plugin + 1.6 + + + verify + + sign + + + + + + + + + release + https://oss.sonatype.org/service/local/staging/deploy/maven2/ + + + snapshots + https://oss.sonatype.org/content/repositories/snapshots/ + + + diff --git a/src/jni/java/src/main/java/com/vesoft/nebula/NebulaCodec.java b/src/jni/java/src/main/java/com/vesoft/nebula/NebulaCodec.java new file mode 100644 index 00000000000..d8f134f42af --- /dev/null +++ b/src/jni/java/src/main/java/com/vesoft/nebula/NebulaCodec.java @@ -0,0 +1,110 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NebulaCodec { + private static final Logger LOGGER = LoggerFactory.getLogger(NebulaCodec.class.getName()); + + static { + try { + System.loadLibrary("nebula_codec"); + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); + } catch (Error e) { + LOGGER.error(e.getMessage(), e); + } + } + + public static class Pair { + private String field; + private String clazz; + + public Pair(String field, String clazz) { + this.field = field; + this.clazz = clazz; + } + + public String getField() { + return field; + } + + public String getClazz() { + return clazz; + } + + @Override + public String toString() { + return "Pair{" + "field='" + field + '\'' + ", clazz=" + clazz + '}'; + } + } + + private static final int PARTITION_ID_SIZE = 4; + private static final int VERTEX_ID_SIZE = 8; + private static final int TAG_ID_SIZE = 4; + private static final int TAG_VERSION_SIZE = 8; + private static final int EDGE_TYPE_SIZE = 4; + private static final int EDGE_RANKING_SIZE = 8; + private static final int EDGE_VERSION_SIZE = 8; + private static final int VERTEX_SIZE = PARTITION_ID_SIZE + VERTEX_ID_SIZE + + TAG_ID_SIZE + TAG_VERSION_SIZE; + private static final int EDGE_SIZE = PARTITION_ID_SIZE + VERTEX_ID_SIZE + + EDGE_TYPE_SIZE + EDGE_RANKING_SIZE + VERTEX_ID_SIZE + EDGE_VERSION_SIZE; + + private static final int DATA_KEY_TYPE = 0x00000001; + private static final int TAG_MASK = 0xBFFFFFFF; + private static final int EDGE_MASK = 0x40000000; + + public static byte[] createEdgeKey(int partitionId, long srcId, int edgeType, + long edgeRank, long dstId, long edgeVersion) { + ByteBuffer buffer = ByteBuffer.allocate(EDGE_SIZE); + buffer.order(ByteOrder.LITTLE_ENDIAN); + partitionId = (partitionId << 8) | DATA_KEY_TYPE; + buffer.putInt(partitionId); + buffer.putLong(srcId); + edgeType |= EDGE_MASK; + buffer.putInt(edgeType); + buffer.putLong(edgeRank); + buffer.putLong(dstId); + buffer.putLong(edgeVersion); + return buffer.array(); + } + + public static byte[] createVertexKey(int partitionId, long vertexId, + int tagId, long tagVersion) { + ByteBuffer buffer = ByteBuffer.allocate(VERTEX_SIZE); + buffer.order(ByteOrder.LITTLE_ENDIAN); + partitionId = (partitionId << 8) | DATA_KEY_TYPE; + buffer.putInt(partitionId); + buffer.putLong(vertexId); + tagId &= TAG_MASK; + buffer.putInt(tagId); + buffer.putLong(tagVersion); + return buffer.array(); + } + + public static native byte[] encode(Object[] values); + + public static native List decode(byte[] encoded, Pair[] fields, long version); + + private boolean checkKey(String key) { + return Objects.isNull(key) || key.length() == 0; + } + + private boolean checkValues(Object[] values) { + return Objects.isNull(values) || values.length == 0 + || Arrays.asList(values).contains(null); + } +} diff --git a/src/jni/java/src/main/java/com/vesoft/nebula/NebulaCodecResourceLoader.java b/src/jni/java/src/main/java/com/vesoft/nebula/NebulaCodecResourceLoader.java new file mode 100644 index 00000000000..5ccff1a5723 --- /dev/null +++ b/src/jni/java/src/main/java/com/vesoft/nebula/NebulaCodecResourceLoader.java @@ -0,0 +1,47 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NebulaCodecResourceLoader { + private static final String NEBULA_LIB_NAME = "/libnebula_codec.so"; + private static final Logger LOGGER = LoggerFactory.getLogger(NebulaCodec.class.getName()); + + public static void resourceLoader() { + InputStream stream = ClassLoader.class.getResourceAsStream(NEBULA_LIB_NAME); + if (stream == null) { + throw new RuntimeException(NEBULA_LIB_NAME + " was not found in JAR"); + } + + File file = null; + try { + file = File.createTempFile("lib", ".so"); + } catch (IOException e) { + LOGGER.error("libnebula_codec Create Failed {}", e.getMessage()); + } + + if (!file.exists()) { + throw new RuntimeException(file.getAbsolutePath() + " does not exist"); + } + file.deleteOnExit(); + + try { + Files.copy(stream, file.toPath(), StandardCopyOption.REPLACE_EXISTING); + } catch (IOException e) { + LOGGER.error("libnebula_codec Copy Failed {}", e.getMessage()); + } + System.load(file.getAbsolutePath()); + } +} diff --git a/src/jni/java/src/test/java/com/vesoft/nebula/NebulaCodecTest.java b/src/jni/java/src/test/java/com/vesoft/nebula/NebulaCodecTest.java new file mode 100644 index 00000000000..a2994820ab1 --- /dev/null +++ b/src/jni/java/src/test/java/com/vesoft/nebula/NebulaCodecTest.java @@ -0,0 +1,73 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import com.vesoft.nebula.NebulaCodec.Pair; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.DoubleBuffer; +import java.nio.FloatBuffer; +import java.nio.IntBuffer; +import java.nio.LongBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.junit.Test; + +public class NebulaCodecTest { + + @Test + public void testDecoded() { + Object[] values = { + false, + 7, + 3.14F, + 0.618, + "Hello".getBytes() + }; + byte[] result = NebulaCodec.encode(values); + + NebulaCodec.Pair[] pairs = new NebulaCodec.Pair[]{ + new NebulaCodec.Pair("b_field", Boolean.class.getName()), + new NebulaCodec.Pair("i_field", Integer.class.getName()), + new NebulaCodec.Pair("f_field", Float.class.getName()), + new NebulaCodec.Pair("d_field", Double.class.getName()), + new NebulaCodec.Pair("s_field", byte[].class.getName()) + }; + + List decodedResult = NebulaCodec.decode(result, pairs, 0); + + byte[] byteValue = decodedResult.get(0); + boolean boolValue = (byteValue[0] == 0x00) ? false : true; + assertFalse(boolValue); + + ByteBuffer integerByteBuffer = ByteBuffer.wrap(decodedResult.get(1)); + integerByteBuffer.order(ByteOrder.LITTLE_ENDIAN); + IntBuffer intBuffer = integerByteBuffer.asIntBuffer(); + assertEquals(7, intBuffer.get()); + + + ByteBuffer floatByteBuffer = ByteBuffer.wrap(decodedResult.get(2)); + floatByteBuffer.order(ByteOrder.LITTLE_ENDIAN); + FloatBuffer floatBuffer = floatByteBuffer.asFloatBuffer(); + assertEquals("Float Value ", 3.14F, floatBuffer.get(), 0.0001); + + ByteBuffer doubleByteBuffer = ByteBuffer.wrap(decodedResult.get(3)); + doubleByteBuffer.order(ByteOrder.LITTLE_ENDIAN); + DoubleBuffer doubleBuffer = doubleByteBuffer.asDoubleBuffer(); + assertEquals("Double Value ", 0.618, doubleBuffer.get(), 0.0001); + + ByteBuffer stringByteBuffer = ByteBuffer.wrap(decodedResult.get(4)); + assertEquals("Hello", new String(stringByteBuffer.array())); + } +} + diff --git a/src/jni/src/CMakeLists.txt b/src/jni/src/CMakeLists.txt new file mode 100644 index 00000000000..032c59c77d5 --- /dev/null +++ b/src/jni/src/CMakeLists.txt @@ -0,0 +1,16 @@ +add_subdirectory(datamanlite) + +add_library(nebula_codec + SHARED + $ + com_vesoft_nebula_NebulaCodec.cpp) + + +target_link_libraries(nebula_codec + -lglog + -lgflags + -static-libgcc + -static-libstdc++ + -lpthread + -ldl) + diff --git a/src/jni/src/com_vesoft_nebula_NebulaCodec.cpp b/src/jni/src/com_vesoft_nebula_NebulaCodec.cpp new file mode 100644 index 00000000000..8dc9929119f --- /dev/null +++ b/src/jni/src/com_vesoft_nebula_NebulaCodec.cpp @@ -0,0 +1,216 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "com_vesoft_nebula_NebulaCodec.h" +#include +#include "datamanlite/RowReader.h" +#include "datamanlite/RowWriter.h" +#include "datamanlite/NebulaSchemaProvider.h" + +using namespace nebula::dataman::codec; // NOLINT + + +JNIEXPORT jbyteArray JNICALL Java_com_vesoft_nebula_NebulaCodec_encode(JNIEnv *env, + jclass clazz, + jobjectArray values) { + jint len = env->GetArrayLength(values); + RowWriter writer; + for (int i = 0; i < len; i++) { + jobject obj = env->GetObjectArrayElement(values, i); + clazz = env->GetObjectClass(obj); + jmethodID getClazz = env->GetMethodID(clazz, "getClass", "()Ljava/lang/Class;"); + + jobject getClazzObj = env->CallObjectMethod(obj, getClazz); + jclass objClazz = env->GetObjectClass(getClazzObj); + jmethodID getName = env->GetMethodID(objClazz, "getName", "()Ljava/lang/String;"); + jstring clazzType = static_cast(env->CallObjectMethod(getClazzObj, getName)); + + const char* clazzArray = env->GetStringUTFChars(clazzType, nullptr); + Slice name(clazzArray); + if (name == Slice("java.lang.Boolean")) { + jmethodID m = env->GetMethodID(clazz, "booleanValue", "()Z"); + bool value = env->CallBooleanMethod(obj, m); + writer << value; + } else if (name == Slice("java.lang.Integer")) { + jmethodID m = env->GetMethodID(clazz, "intValue", "()I"); + int value = env->CallIntMethod(obj, m); + writer << value; + } else if (name == Slice("java.lang.Long")) { + jmethodID m = env->GetMethodID(clazz, "longValue", "()J"); + int64_t value = env->CallLongMethod(obj, m); + writer << value; + } else if (name == Slice("java.lang.Float")) { + jmethodID m = env->GetMethodID(clazz, "floatValue", "()F"); + float value = env->CallFloatMethod(obj, m); + writer << value; + } else if (name == Slice("java.lang.Double")) { + jmethodID m = env->GetMethodID(clazz, "doubleValue", "()D"); + double value = env->CallDoubleMethod(obj, m); + writer << value; + } else if (name == Slice("[B")) { + jbyteArray byteArray = reinterpret_cast(obj); + jbyte* b = env->GetByteArrayElements(byteArray, nullptr); + const char* bytes = reinterpret_cast(b); + int size = env->GetArrayLength(byteArray); + writer << Slice(bytes, size); + } else { + // Type Error + LOG(FATAL) << "Type Error : " << name.data(); + } + env->ReleaseStringUTFChars(clazzType, clazzArray); + } + + auto result = writer.encode(); + auto *resultArray = reinterpret_cast(result.data()); + auto resultSize = result.size(); + + jbyteArray arrays = env->NewByteArray(resultSize); + env->SetByteArrayRegion(arrays, 0, resultSize, resultArray); + return arrays; +} + +JNIEXPORT jobject JNICALL Java_com_vesoft_nebula_NebulaCodec_decode(JNIEnv *env, + jclass clz, + jbyteArray encoded, + jobjectArray pairs, + jlong version) { + clz = env->FindClass("com/vesoft/nebula/NebulaCodec$Pair"); + jmethodID getField = env->GetMethodID(clz, "getField", "()Ljava/lang/String;"); + jmethodID getClazz = env->GetMethodID(clz, "getClazz", "()Ljava/lang/String;"); + + auto len = env->GetArrayLength(pairs); + // Now the version is zero, we should use the one passed in. + auto schema = std::make_shared(version); + for (int i = 0; i < len; i++) { + jobject o = env->GetObjectArrayElement(pairs, i); + jstring fieldValue = static_cast(env->CallObjectMethod(o, getField)); + jstring clazzValue = static_cast(env->CallObjectMethod(o, getClazz)); + const char* fieldArray = env->GetStringUTFChars(fieldValue, nullptr); + const char* clazzArray = env->GetStringUTFChars(clazzValue, nullptr); + Slice clazz(clazzArray); + if (clazz == Slice("java.lang.Boolean")) { + schema->addField(Slice(fieldArray), ValueType::BOOL); + } else if (clazz == Slice("java.lang.Integer")) { + schema->addField(Slice(fieldArray), ValueType::INT); + } else if (clazz == Slice("java.lang.Long")) { + schema->addField(Slice(fieldArray), ValueType::INT); + } else if (clazz == Slice("java.lang.Float")) { + schema->addField(Slice(fieldArray), ValueType::FLOAT); + } else if (clazz == Slice("java.lang.Double")) { + schema->addField(Slice(fieldArray), ValueType::DOUBLE); + } else if (clazz == Slice("[B")) { + schema->addField(Slice(fieldArray), ValueType::STRING); + } else { + // Type Error + LOG(FATAL) << "Type Error : " << clazz.data(); + } + env->ReleaseStringUTFChars(fieldValue, fieldArray); + env->ReleaseStringUTFChars(clazzValue, clazzArray); + } + + jclass arrayListClazz = env->FindClass("java/util/ArrayList"); + jmethodID listInit = env->GetMethodID(arrayListClazz, "", "()V"); + jobject list_obj = env->NewObject(arrayListClazz, listInit, ""); + jmethodID list_add = env->GetMethodID(arrayListClazz, "add", "(Ljava/lang/Object;)Z"); + + jbyte* b = env->GetByteArrayElements(encoded, nullptr); + const char* bytes = reinterpret_cast(b); + int size = env->GetArrayLength(encoded); + auto reader = RowReader::getRowReader(Slice(bytes, size), schema); + if (reader == nullptr) { + env->ReleaseByteArrayElements(encoded, b, 0); + return list_obj; + } + for (int i = 0; i < len; i++) { + jobject o = env->GetObjectArrayElement(pairs, i); + jstring fieldValue = static_cast(env->CallObjectMethod(o, getField)); + jstring clazzValue = static_cast(env->CallObjectMethod(o, getClazz)); + const char* fieldBytes = env->GetStringUTFChars(fieldValue, nullptr); + const char* clazzBytes = env->GetStringUTFChars(clazzValue, nullptr); + Slice field(fieldBytes); + Slice clazz(clazzBytes); + + if (clazz == Slice("java.lang.Boolean")) { + bool bValue; + auto ret = reader->getBool(i, bValue); + if (ret != ResultType::SUCCEEDED) { + break; + } + int valueSize = sizeof(bool); + jbyteArray values = env->NewByteArray(valueSize); + env->SetByteArrayRegion(values, 0, valueSize, + reinterpret_cast(&bValue)); + env->CallBooleanMethod(list_obj, list_add, values); + } else if (clazz == Slice("java.lang.Integer")) { + int32_t iValue; + auto ret = reader->getInt(i, iValue); + if (ret != ResultType::SUCCEEDED) { + break; + } + int valueSize = sizeof(int32_t); + jbyteArray values = env->NewByteArray(valueSize); + env->SetByteArrayRegion(values, 0, valueSize, + reinterpret_cast(&iValue)); + env->CallBooleanMethod(list_obj, list_add, values); + } else if (clazz == Slice("java.lang.Long")) { + int64_t lValue; + auto ret = reader->getInt(i, lValue); + if (ret != ResultType::SUCCEEDED) { + break; + } + int valueSize = sizeof(int64_t); + jbyteArray values = env->NewByteArray(valueSize); + env->SetByteArrayRegion(values, 0, valueSize, + reinterpret_cast(&lValue)); + env->CallBooleanMethod(list_obj, list_add, values); + } else if (clazz == Slice("java.lang.Float")) { + float fValue; + auto ret = reader->getFloat(i, fValue); + if (ret != ResultType::SUCCEEDED) { + break; + } + int valueSize = sizeof(float); + jbyteArray values = env->NewByteArray(valueSize); + env->SetByteArrayRegion(values, 0, valueSize, + reinterpret_cast(&fValue)); + env->CallBooleanMethod(list_obj, list_add, values); + } else if (clazz == Slice("java.lang.Double")) { + double dValue; + auto ret = reader->getDouble(i, dValue); + if (ret != ResultType::SUCCEEDED) { + break; + } + int valueSize = sizeof(double); + jbyteArray values = env->NewByteArray(valueSize); + env->SetByteArrayRegion(values, 0, valueSize, + reinterpret_cast(&dValue)); + env->CallBooleanMethod(list_obj, list_add, values); + } else if (clazz == Slice("[B")) { + Slice sValue; + auto ret = reader->getString(i, sValue); + if (ret != ResultType::SUCCEEDED) { + break; + } + int valueSize = sValue.size(); + jbyteArray values = env->NewByteArray(valueSize); + env->SetByteArrayRegion(values, 0, valueSize, + reinterpret_cast(sValue.data())); + env->CallBooleanMethod(list_obj, list_add, values); + } else { + // Type Error + LOG(FATAL) << "Type Error : " << clazz.data(); + return nullptr; + } + + env->ReleaseStringUTFChars(fieldValue, fieldBytes); + env->ReleaseStringUTFChars(clazzValue, clazzBytes); + } + + env->ReleaseByteArrayElements(encoded, b, 0); + return list_obj; +} + + diff --git a/src/jni/src/com_vesoft_nebula_NebulaCodec.h b/src/jni/src/com_vesoft_nebula_NebulaCodec.h new file mode 100644 index 00000000000..732d96c8a25 --- /dev/null +++ b/src/jni/src/com_vesoft_nebula_NebulaCodec.h @@ -0,0 +1,64 @@ +/* DO NOT EDIT THIS FILE - it is machine generated */ +#include +/* Header for class com_vesoft_nebula_NebulaCodec */ + +#ifndef _Included_com_vesoft_nebula_NebulaCodec +#define _Included_com_vesoft_nebula_NebulaCodec +#ifdef __cplusplus +extern "C" { +#endif +#undef com_vesoft_nebula_NebulaCodec_PARTITION_ID_SIZE +#define com_vesoft_nebula_NebulaCodec_PARTITION_ID_SIZE 4L +#undef com_vesoft_nebula_NebulaCodec_VERTEX_ID_SIZE +#define com_vesoft_nebula_NebulaCodec_VERTEX_ID_SIZE 8L +#undef com_vesoft_nebula_NebulaCodec_TAG_ID_SIZE +#define com_vesoft_nebula_NebulaCodec_TAG_ID_SIZE 4L +#undef com_vesoft_nebula_NebulaCodec_TAG_VERSION_SIZE +#define com_vesoft_nebula_NebulaCodec_TAG_VERSION_SIZE 8L +#undef com_vesoft_nebula_NebulaCodec_EDGE_TYPE_SIZE +#define com_vesoft_nebula_NebulaCodec_EDGE_TYPE_SIZE 4L +#undef com_vesoft_nebula_NebulaCodec_EDGE_RANKING_SIZE +#define com_vesoft_nebula_NebulaCodec_EDGE_RANKING_SIZE 8L +#undef com_vesoft_nebula_NebulaCodec_EDGE_VERSION_SIZE +#define com_vesoft_nebula_NebulaCodec_EDGE_VERSION_SIZE 8L +#undef com_vesoft_nebula_NebulaCodec_VERTEX_SIZE +#define com_vesoft_nebula_NebulaCodec_VERTEX_SIZE 24L +#undef com_vesoft_nebula_NebulaCodec_EDGE_SIZE +#define com_vesoft_nebula_NebulaCodec_EDGE_SIZE 40L +#undef com_vesoft_nebula_NebulaCodec_DATA_KEY_TYPE +#define com_vesoft_nebula_NebulaCodec_DATA_KEY_TYPE 1L +#undef com_vesoft_nebula_NebulaCodec_TAG_MASK +#define com_vesoft_nebula_NebulaCodec_TAG_MASK -1073741825L +#undef com_vesoft_nebula_NebulaCodec_EDGE_MASK +#define com_vesoft_nebula_NebulaCodec_EDGE_MASK 1073741824L +/* + * Class: com_vesoft_nebula_NebulaCodec + * Method: encode + * Signature: ([Ljava/lang/Object;)[B + */ +JNIEXPORT jbyteArray JNICALL Java_com_vesoft_nebula_NebulaCodec_encode + (JNIEnv *, jclass, jobjectArray); + +/* + * Class: com_vesoft_nebula_NebulaCodec + * Method: decode + * Signature: ([B[Lcom/vesoft/nebula/NebulaCodec/Pair;J)Ljava/util/List; + */ +JNIEXPORT jobject JNICALL Java_com_vesoft_nebula_NebulaCodec_decode + (JNIEnv *, jclass, jbyteArray, jobjectArray, jlong); + +#ifdef __cplusplus +} +#endif +#endif +/* Header for class com_vesoft_nebula_NebulaCodec_Pair */ + +#ifndef _Included_com_vesoft_nebula_NebulaCodec_Pair +#define _Included_com_vesoft_nebula_NebulaCodec_Pair +#ifdef __cplusplus +extern "C" { +#endif +#ifdef __cplusplus +} +#endif +#endif diff --git a/src/jni/src/datamanlite/CMakeLists.txt b/src/jni/src/datamanlite/CMakeLists.txt new file mode 100644 index 00000000000..b175b4938a3 --- /dev/null +++ b/src/jni/src/datamanlite/CMakeLists.txt @@ -0,0 +1,15 @@ + +add_library(nebula_codec_obj + OBJECT + ${NEBULA_HOME}/src/common/base/Cord.cpp + Slice.cpp + DataCommon.cpp + SchemaProviderIf.cpp + NebulaSchemaProvider.cpp + RowReader.cpp + RowWriter.cpp + ) + +if (ENABLE_TESTING) +add_subdirectory(test) +endif() diff --git a/src/jni/src/datamanlite/DataCommon.cpp b/src/jni/src/datamanlite/DataCommon.cpp new file mode 100644 index 00000000000..7995f6cb75d --- /dev/null +++ b/src/jni/src/datamanlite/DataCommon.cpp @@ -0,0 +1,41 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "datamanlite/DataCommon.h" + +namespace nebula { +namespace dataman { +namespace codec { + +int32_t decodeVarint(const int8_t* begin, size_t len, uint64_t& val) { + const int8_t* end = begin + len; + const int8_t* p = begin; + int shift = 0; + while (p != end && *p < 0) { + val |= static_cast(*p++ & 0x7f) << shift; + shift += 7; + } + if (p == end) { + return -1; + } + val |= static_cast(*p++) << shift; + return p - begin; +} + +size_t encodeVarint(uint64_t val, uint8_t* buf) { + uint8_t* p = buf; + while (val >= 128) { + *p++ = 0x80 | (val & 0x7f); + val >>= 7; + } + *p++ = uint8_t(val); + return size_t(p - buf); +} + +} // namespace codec +} // namespace dataman +} // namespace nebula + diff --git a/src/jni/src/datamanlite/DataCommon.h b/src/jni/src/datamanlite/DataCommon.h new file mode 100644 index 00000000000..3a8ea756cef --- /dev/null +++ b/src/jni/src/datamanlite/DataCommon.h @@ -0,0 +1,48 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef DATAMAN_CODEC_DATACOMMON_H_ +#define DATAMAN_CODEC_DATACOMMON_H_ + +#include +#include "base/Logging.h" + +namespace nebula { +namespace dataman { +namespace codec { + +enum class ResultType { + SUCCEEDED = 0, + E_NAME_NOT_FOUND = -1, + E_INDEX_OUT_OF_RANGE = -2, + E_INCOMPATIBLE_TYPE = -3, + E_VALUE_OUT_OF_RANGE = -4, + E_DATA_INVALID = -5, +}; + +template +typename std::enable_if< + std::is_integral< + typename std::remove_cv< + typename std::remove_reference::type + >::type + >::value, + bool +>::type +intToBool(IntType iVal) { + return iVal != 0; +} + +int32_t decodeVarint(const int8_t* begin, size_t len, uint64_t& val); + +size_t encodeVarint(uint64_t val, uint8_t* buf); + +} // namespace codec +} // namespace dataman +} // namespace nebula + +#endif // DATAMAN_CODEC_DATACOMMON_H_ + diff --git a/src/jni/src/datamanlite/NebulaSchemaProvider.cpp b/src/jni/src/datamanlite/NebulaSchemaProvider.cpp new file mode 100644 index 00000000000..d648a8410f0 --- /dev/null +++ b/src/jni/src/datamanlite/NebulaSchemaProvider.cpp @@ -0,0 +1,99 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "datamanlite/NebulaSchemaProvider.h" +#include "base/Logging.h" + +namespace nebula { +namespace dataman { +namespace codec { + +ValueType kUnknown = ValueType::UNKNOWN; + +SchemaVer NebulaSchemaProvider::getVersion() const noexcept { + return ver_; +} + +size_t NebulaSchemaProvider::getNumFields() const noexcept { + return fields_.size(); +} + +int64_t NebulaSchemaProvider::getFieldIndex(const Slice& name) const { + auto it = fieldNameIndex_.find(name.toString()); + if (it == fieldNameIndex_.end()) { + // Not found + return -1; + } else { + return it->second; + } +} + +const char* NebulaSchemaProvider::getFieldName(int64_t index) const { + if (index < 0 || index >= static_cast(fields_.size())) { + LOG(ERROR) << "Index[" << index << "] is out of range[0-" << fields_.size() << "]"; + return nullptr; + } + + return fields_[index]->getName(); +} + +const ValueType& NebulaSchemaProvider::getFieldType(int64_t index) const { + if (index < 0 || index >= static_cast(fields_.size())) { + LOG(ERROR) << "Index[" << index << "] is out of range[0-" << fields_.size() << "]"; + return kUnknown; + } + + return fields_[index]->getType(); +} + +const ValueType& NebulaSchemaProvider::getFieldType(const Slice& name) const { + auto it = fieldNameIndex_.find(name.toString()); + if (fieldNameIndex_.end() == it) { + LOG(ERROR) << "Unknown field \"" << name.toString() << "\""; + return kUnknown; + } + + return fields_[it->second]->getType(); +} + +std::shared_ptr NebulaSchemaProvider::field( + int64_t index) const { + if (index < 0) { + VLOG(2) << "Invalid index " << index; + return nullptr; + } + if (index >= static_cast(fields_.size())) { + VLOG(2) << "Index " << index << " is out of range"; + return nullptr; + } + + return fields_[index]; +} + +std::shared_ptr NebulaSchemaProvider::field( + const Slice& name) const { + auto it = fieldNameIndex_.find(name.toString()); + if (it == fieldNameIndex_.end()) { + VLOG(2) << "Unknown field \"" << name.toString() << "\""; + return nullptr; + } + + return fields_[it->second]; +} + +void NebulaSchemaProvider::addField(const Slice& name, + ValueType type) { + fields_.emplace_back(std::make_shared(name.toString(), + std::move(type))); + fieldNameIndex_.emplace(name.toString(), + static_cast(fields_.size() - 1)); +} + + +} // namespace codec +} // namespace dataman +} // namespace nebula + diff --git a/src/jni/src/datamanlite/NebulaSchemaProvider.h b/src/jni/src/datamanlite/NebulaSchemaProvider.h new file mode 100644 index 00000000000..25ec778c9d9 --- /dev/null +++ b/src/jni/src/datamanlite/NebulaSchemaProvider.h @@ -0,0 +1,86 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef DATAMAN_CODEC_NEBULASCHEMAPROVIDER_H_ +#define DATAMAN_CODEC_NEBULASCHEMAPROVIDER_H_ + +#include +#include +#include "datamanlite/SchemaProviderIf.h" +#include "datamanlite/Slice.h" + +namespace nebula { +namespace dataman { +namespace codec { + +class NebulaSchemaProvider : public SchemaProviderIf { +public: + class SchemaField final : public SchemaProviderIf::Field { + public: + SchemaField(std::string name, ValueType type) + : name_(std::move(name)) + , type_(type) {} + + const char* getName() const override { + return name_.c_str(); + } + + const ValueType& getType() const override { + return type_; + } + + bool isValid() const override { + return true; + } + + bool hasDefault() const override { + return hasDefault_; + } + + std::string getDefaultValue() const override { + return defaultValue_; + } + + private: + std::string name_; + ValueType type_; + bool hasDefault_; + std::string defaultValue_; + }; + +public: + NebulaSchemaProvider() = default; + explicit NebulaSchemaProvider(SchemaVer ver) : ver_(ver) {} + + SchemaVer getVersion() const noexcept override; + size_t getNumFields() const noexcept override; + + int64_t getFieldIndex(const Slice& name) const override; + const char* getFieldName(int64_t index) const override; + + const ValueType& getFieldType(int64_t index) const override; + const ValueType& getFieldType(const Slice& name) const override; + + std::shared_ptr field(int64_t index) const override; + std::shared_ptr field(const Slice& name) const override; + + void addField(const Slice& name, ValueType type); + + +protected: + SchemaVer ver_{0}; + + // fieldname -> index + std::unordered_map fieldNameIndex_; + std::vector> fields_; +}; + +} // namespace codec +} // namespace dataman +} // namespace nebula + +#endif // DATAMAN_CODEC_NEBULASCHEMAPROVIDER_H_ + diff --git a/src/jni/src/datamanlite/RowReader.cpp b/src/jni/src/datamanlite/RowReader.cpp new file mode 100644 index 00000000000..6b1e3f07e69 --- /dev/null +++ b/src/jni/src/datamanlite/RowReader.cpp @@ -0,0 +1,595 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "datamanlite/RowReader.h" +#include + + +namespace nebula { +namespace dataman { +namespace codec { + + +/********************************************* + * + * class RowReader::Cell + * + ********************************************/ +ResultType RowReader::Cell::getBool(bool& v) const noexcept { + RR_CELL_GET_VALUE(Bool); +} + + +ResultType RowReader::Cell::getFloat(float& v) const noexcept { + RR_CELL_GET_VALUE(Float); +} + + +ResultType RowReader::Cell::getDouble(double& v) const noexcept { + RR_CELL_GET_VALUE(Double); +} + + +ResultType RowReader::Cell::getString(Slice& v) const noexcept { + RR_CELL_GET_VALUE(String); +} + + +ResultType RowReader::Cell::getVid(int64_t& v) const noexcept { + RR_CELL_GET_VALUE(Vid); +} + + +/********************************************* + * + * class RowReader::Iterator + * + ********************************************/ +RowReader::Iterator::Iterator(const RowReader* reader, + size_t numFields, + int64_t index) + : reader_(reader) + , numFields_(numFields) + , index_(index) { + cell_.reset(new Cell(reader_, this)); +} + + +RowReader::Iterator::Iterator(Iterator&& iter) + : reader_(iter.reader_) + , numFields_(iter.numFields_) + , cell_(std::move(iter.cell_)) + , index_(iter.index_) + , bytes_(iter.bytes_) + , offset_(iter.offset_) {} + + +const RowReader::Cell& RowReader::Iterator::operator*() const { + return *cell_; +} + + +const RowReader::Cell* RowReader::Iterator::operator->() const { + return cell_.get(); +} + + +RowReader::Iterator& RowReader::Iterator::operator++() { + if (*this) { + if (bytes_ > 0) { + offset_ += bytes_; + } else { + offset_ = reader_->skipToNext(index_, offset_); + if (offset_ < 0) { + // Something is wrong + index_ = numFields_; + return *this; + } + } + + bytes_ = 0; + index_++; + } + + return *this; +} + + +bool RowReader::Iterator::operator==(const Iterator& rhs) const noexcept { + return reader_ == rhs.reader_ && + numFields_ == rhs.numFields_ && + index_ == rhs.index_; +} + + +RowReader::Iterator::operator bool() const { + return index_ != static_cast(numFields_); +} + + +// static +std::unique_ptr RowReader::getRowReader( + const Slice& row, + std::shared_ptr schema) { + SchemaVer ver = getSchemaVer(row); + CHECK_EQ(ver, schema->getVersion()); + return std::unique_ptr(new RowReader(row, std::move(schema))); +} + + +// static +int32_t RowReader::getSchemaVer(Slice row) { + const uint8_t* it = reinterpret_cast(row.begin()); + if (reinterpret_cast(it) == row.end()) { + LOG(ERROR) << "Row data is empty, so there is no schema version"; + return 0; + } + + // The first three bits indicate the number of bytes for the + // schema version. If the number is zero, no schema version + // presents + size_t verBytes = *(it++) >> 5; + int32_t ver = 0; + if (verBytes > 0) { + if (verBytes + 1 > row.size()) { + // Data is too short + // LOG(ERROR) << "Row data is too short"; + return 0; + } + // Schema Version is stored in Little Endian + for (size_t i = 0; i < verBytes; i++) { + ver |= (uint32_t(*(it++)) << (8 * i)); + } + } + + return ver; +} + +/********************************************* + * + * class RowReader + * + ********************************************/ +RowReader::RowReader(Slice row, + std::shared_ptr schema) + : schema_{std::move(schema)} { + CHECK(!!schema_) << "A schema must be provided"; + + if (processHeader(row)) { + // data_.begin() points to the first field + data_ = Slice(row.begin() + headerLen_, row.size() - headerLen_); + } else { + // Invalid data + // TODO We need a better error handler here + LOG(FATAL) << "Invalid row data!"; + } +} + + +bool RowReader::processHeader(Slice row) { + const uint8_t* it = reinterpret_cast(row.begin()); + if (reinterpret_cast(it) == row.end()) { + return false; + } + + DCHECK(!!schema_) << "A schema must be provided"; + + // The last three bits indicate the number of bytes for offsets + // The first three bits indicate the number of bytes for the + // schena version. If the number is zero, no schema version + // presents + numBytesForOffset_ = (*it & 0x07) + 1; + int32_t verBytes = *(it++) >> 5; + it += verBytes; + + // Process the block offsets + // Block offsets point to the start of every 16 fields, except the + // first 16 fields + // Block offsets are stored in Little Endian + uint32_t numFields = schema_->getNumFields(); + uint32_t numOffsets = (numFields >> 4); + if (numBytesForOffset_ * numOffsets + verBytes + 1 > row.size()) { + // Data is too short + // LOG(ERROR) << "Row data is too short"; + return false; + } + offsets_.resize(numFields + 1, -1); + offsets_[0] = 0; + blockOffsets_.emplace_back(0, 0); + blockOffsets_.reserve(numOffsets); + for (uint32_t i = 0; i < numOffsets; i++) { + int64_t offset = 0; + for (int32_t j = 0; j < numBytesForOffset_; j++) { + offset |= (uint64_t(*(it++)) << (8 * j)); + } + blockOffsets_.emplace_back(offset, 0); + offsets_[16 * (i + 1)] = offset; + } + // Now done with the header + + headerLen_ = reinterpret_cast(it) - row.begin(); + offsets_[numFields] = row.size() - headerLen_; + + return true; +} + + +int32_t RowReader::numFields() const noexcept { + return schema_->getNumFields(); +} + + +SchemaVer RowReader::schemaVer() const noexcept { + return schema_->getVersion(); +} + + +int64_t RowReader::skipToNext(int64_t index, int64_t offset) const noexcept { + auto vType = schema_->getFieldType(index); + if (offsets_[index + 1] >= 0) { + return offsets_[index + 1]; + } + + switch (vType) { + case ValueType::BOOL: { + // One byte + offset++; + break; + } + case ValueType::INT: + case ValueType::TIMESTAMP: { + int64_t v; + int32_t len = readInteger(offset, v); + if (len <= 0) { + return static_cast(ResultType::E_DATA_INVALID); + } + offset += len; + break; + } + case ValueType::FLOAT: { + // Four bytes + offset += sizeof(float); + break; + } + case ValueType::DOUBLE: { + // Eight bytes + offset += sizeof(double); + break; + } + case ValueType::STRING: { + int64_t strLen; + int32_t intLen = readInteger(offset, strLen); + if (intLen <= 0) { + return static_cast(ResultType::E_DATA_INVALID); + } + offset += intLen + strLen; + break; + } + case ValueType::VID: { + // Eight bytes + offset += sizeof(int64_t); + break; + } + default: { + // TODO + // LOG(FATAL) << "Unimplemented"; + std::abort(); + } + } + + if (offset > static_cast(data_.size())) { + return static_cast(ResultType::E_DATA_INVALID); + } + + // Update offsets + offsets_[index + 1] = offset; + // Update block offsets + int32_t base = (index + 1) >> 4; + blockOffsets_[base].second = ((index + 1) & 0x0F); + + return offset; +} + + +int64_t RowReader::skipToField(int64_t index) const noexcept { + // DCHECK_GE(index, 0); + if (index >= static_cast(schema_->getNumFields())) { + // Index is out of range + return static_cast(ResultType::E_INDEX_OUT_OF_RANGE); + } + + int64_t base = index >> 4; + const auto& blockOffset = blockOffsets_[base]; + base <<= 4; + int64_t maxVisitedIndex = base + blockOffset.second; + if (index <= maxVisitedIndex) { + return offsets_[index]; + } + + int64_t offset = offsets_[maxVisitedIndex]; + for (int64_t i = maxVisitedIndex; i < base + (index & 0x0000000f); i++) { + offset = skipToNext(i, offset); + if (offset < 0) { + return static_cast(ResultType::E_DATA_INVALID); + } + } + + return offset; +} + + +int32_t RowReader::readFloat(int64_t offset, float& v) const noexcept { + if (offset + sizeof(float) > data_.size()) { + return static_cast(ResultType::E_DATA_INVALID); + } + + memcpy(reinterpret_cast(&v), &(data_[offset]), sizeof(float)); + + return sizeof(float); +} + + +int32_t RowReader::readDouble(int64_t offset, double& v) const noexcept { + if (offset + sizeof(double) > data_.size()) { + return static_cast(ResultType::E_DATA_INVALID); + } + + memcpy(reinterpret_cast(&v), &(data_[offset]), sizeof(double)); + + return sizeof(double); +} + + +int32_t RowReader::readString(int64_t offset, Slice& v) + const noexcept { + int64_t strLen; + int32_t intLen = readInteger(offset, strLen); + // CHECK_GT(intLen, 0) << "Invalid string length"; + if (offset + intLen + strLen > static_cast(data_.size())) { + return static_cast(ResultType::E_DATA_INVALID); + } + + v = Slice(data_.data() + offset + intLen, strLen); + return intLen + strLen; +} + + +int32_t RowReader::readInt64(int64_t offset, int64_t& v) const noexcept { + if (offset + sizeof(int64_t) > data_.size()) { + return static_cast(ResultType::E_DATA_INVALID); + } + + // VID is stored in Little Endian + memcpy(reinterpret_cast(&v), &(data_[offset]), sizeof(int64_t)); + + return sizeof(int64_t); +} + + +int32_t RowReader::readVid(int64_t offset, int64_t& v) const noexcept { + return readInt64(offset, v); +} + + +ResultType RowReader::getBool(int64_t index, int64_t& offset, bool& v) + const noexcept { + switch (schema_->getFieldType(index)) { + case ValueType::BOOL: { + v = intToBool(data_[offset]); + offset++; + break; + } + case ValueType::INT: + case ValueType::TIMESTAMP: { + int64_t intV; + int32_t numBytes = readInteger(offset, intV); + if (numBytes > 0) { + v = intToBool(intV); + offset += numBytes; + } else { + return static_cast(numBytes); + } + break; + } + case ValueType::STRING: { + Slice strV; + int32_t numBytes = readString(offset, strV); + if (numBytes > 0) { + v = strToBool(strV); + offset += numBytes; + } else { + return static_cast(numBytes); + } + break; + } + default: { + return ResultType::E_INCOMPATIBLE_TYPE; + } + } + + return ResultType::SUCCEEDED; +} + + +ResultType RowReader::getFloat(int64_t index, int64_t& offset, float& v) + const noexcept { + switch (schema_->getFieldType(index)) { + case ValueType::FLOAT: { + int32_t numBytes = readFloat(offset, v); + if (numBytes < 0) { + return static_cast(numBytes); + } + offset += numBytes; + break; + } + case ValueType::DOUBLE: { + double d; + int32_t numBytes = readDouble(offset, d); + if (numBytes < 0) { + return static_cast(numBytes); + } + v = static_cast(d); + offset += numBytes; + break; + } + default: { + return ResultType::E_INCOMPATIBLE_TYPE; + } + } + + return ResultType::SUCCEEDED; +} + + +ResultType RowReader::getDouble(int64_t index, int64_t& offset, double& v) + const noexcept { + switch (schema_->getFieldType(index)) { + case ValueType::FLOAT: { + float f; + int32_t numBytes = readFloat(offset, f); + if (numBytes < 0) { + return static_cast(numBytes); + } + v = static_cast(f); + offset += numBytes; + break; + } + case ValueType::DOUBLE: { + int32_t numBytes = readDouble(offset, v); + if (numBytes < 0) { + return static_cast(numBytes); + } + offset += numBytes; + break; + } + default: { + return ResultType::E_INCOMPATIBLE_TYPE; + } + } + + return ResultType::SUCCEEDED; +} + + +ResultType RowReader::getString(int64_t index, + int64_t& offset, + Slice& v) const noexcept { + switch (schema_->getFieldType(index)) { + case ValueType::STRING: { + int32_t numBytes = readString(offset, v); + if (numBytes < 0) { + return static_cast(numBytes); + } + offset += numBytes; + break; + } + default: { + return ResultType::E_INCOMPATIBLE_TYPE; + } + } + + return ResultType::SUCCEEDED; +} + + +ResultType RowReader::getInt64(int64_t index, int64_t& offset, int64_t& v) + const noexcept { + switch (schema_->getFieldType(index)) { + case ValueType::INT: + case ValueType::TIMESTAMP: { + int32_t numBytes = readInteger(offset, v); + if (numBytes < 0) { + return static_cast(numBytes); + } + offset += numBytes; + break; + } + case ValueType::VID: { + int32_t numBytes = readVid(offset, v); + if (numBytes < 0) { + return static_cast(numBytes); + } + offset += numBytes; + break; + } + default: { + return ResultType::E_INCOMPATIBLE_TYPE; + } + } + + return ResultType::SUCCEEDED; +} + + +ResultType RowReader::getVid(int64_t index, int64_t& offset, int64_t& v) + const noexcept { + auto fieldType = schema_->getFieldType(index); + if (fieldType == ValueType::INT || fieldType == ValueType::VID) + return getInt64(index, offset, v); + else + return ResultType::E_INCOMPATIBLE_TYPE; +} + + +RowReader::Iterator RowReader::begin() const noexcept { + return Iterator(this, schema_->getNumFields(), 0); +} + + +RowReader::Iterator RowReader::end() const noexcept { + auto numFields = schema_->getNumFields(); + return Iterator(this, numFields, numFields); +} + + +/*************************************************** + * + * Field Accessors + * + **************************************************/ +ResultType RR_GET_VALUE_BY_NAME(Bool, bool) + +ResultType RowReader::getBool(int64_t index, bool& v) const noexcept { + RR_GET_OFFSET() + return getBool(index, offset, v); +} + + +ResultType RR_GET_VALUE_BY_NAME(Float, float) + +ResultType RowReader::getFloat(int64_t index, float& v) const noexcept { + RR_GET_OFFSET() + return getFloat(index, offset, v); +} + + +ResultType RR_GET_VALUE_BY_NAME(Double, double) + +ResultType RowReader::getDouble(int64_t index, double& v) const noexcept { + RR_GET_OFFSET() + return getDouble(index, offset, v); +} + + +ResultType RR_GET_VALUE_BY_NAME(String, Slice) + +ResultType RowReader::getString(int64_t index, Slice& v) + const noexcept { + RR_GET_OFFSET() + return getString(index, offset, v); +} + + +ResultType RR_GET_VALUE_BY_NAME(Vid, int64_t) + +ResultType RowReader::getVid(int64_t index, int64_t& v) const noexcept { + RR_GET_OFFSET() + return getVid(index, offset, v); +} + +} // namespace codec +} // namespace dataman +} // namespace nebula diff --git a/src/jni/src/datamanlite/RowReader.h b/src/jni/src/datamanlite/RowReader.h new file mode 100644 index 00000000000..a60c8eb4d39 --- /dev/null +++ b/src/jni/src/datamanlite/RowReader.h @@ -0,0 +1,286 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef DATAMAN_CODEC_ROWREADER_H_ +#define DATAMAN_CODEC_ROWREADER_H_ + +#include +#include +#include "base/ThriftTypes.h" +#include "datamanlite/DataCommon.h" +#include "datamanlite/Slice.h" +#include "datamanlite/SchemaProviderIf.h" + +#define RR_CELL_GET_VALUE(FN) \ + if (!*iter_) { \ + /* Already reached the end, or an error happened */ \ + return ResultType::E_INDEX_OUT_OF_RANGE; \ + } \ + int64_t offset = iter_->offset_; \ + ResultType res = reader_->get ## FN(iter_->index_, offset, v); \ + if (res == ResultType::SUCCEEDED) { \ + iter_->bytes_ = offset - iter_->offset_; \ + } else { \ + /* Move iterator to the end */ \ + iter_->index_ = iter_->numFields_; \ + } \ + return res + +#define RR_GET_VALUE_BY_NAME(FN, VT) \ + RowReader::get ## FN(const Slice& name, \ + VT& v) const noexcept { \ + int64_t index = schema_->getFieldIndex(name); \ + if (index < 0) { \ + return ResultType::E_NAME_NOT_FOUND; \ + } else { \ + return get ## FN (index, v); \ + } \ + } + +#define RR_GET_OFFSET() \ + int64_t offset = skipToField(index); \ + if (offset < 0) { \ + return static_cast(offset); \ + } \ + if (index >= static_cast(schema_->getNumFields())) { \ + return ResultType::E_INDEX_OUT_OF_RANGE; \ + } + +namespace nebula { +namespace dataman { +namespace codec { + +/** + * This class decodes one row of data + */ +class RowReader { + FRIEND_TEST(RowReader, headerInfo); + FRIEND_TEST(RowReader, encodedData); + FRIEND_TEST(RowWriter, offsetsCreation); + +public: + class Iterator; + + class Cell final { + friend class Iterator; + public: + template + typename std::enable_if::value, ResultType>::type + getInt(T& v) const noexcept; + + ResultType getBool(bool& v) const noexcept; + ResultType getFloat(float& v) const noexcept; + ResultType getDouble(double& v) const noexcept; + ResultType getString(Slice& v) const noexcept; + ResultType getVid(int64_t& v) const noexcept; + private: + const RowReader* reader_; + Iterator* iter_; + + Cell(const RowReader* reader, Iterator* iter) + : reader_(reader), iter_(iter) {} + }; + friend class Cell; + + + class Iterator final { + friend class RowReader; + public: + Iterator(Iterator&& iter); + const Cell& operator*() const; + const Cell* operator->() const; + Iterator& operator++(); + operator bool() const; + bool operator==(const Iterator& rhs) const noexcept; + private: + const RowReader* reader_; + const size_t numFields_; + std::unique_ptr cell_; + int64_t index_ = 0; + int32_t bytes_ = 0; + int64_t offset_ = 0; + + Iterator(const RowReader* reader, size_t numFields, int64_t index = 0); + }; + + +public: + static std::unique_ptr getRowReader(const Slice& row, + std::shared_ptr schema); + + virtual ~RowReader() = default; + + SchemaVer schemaVer() const noexcept; + int32_t numFields() const noexcept; + + Iterator begin() const noexcept; + Iterator end() const noexcept; + + ResultType getBool(const Slice& name, bool& v) const noexcept; + ResultType getBool(int64_t index, bool& v) const noexcept; + + template + typename std::enable_if::value, ResultType>::type + getInt(const Slice& name, T& v) const noexcept; + + template + typename std::enable_if::value, ResultType>::type + getInt(int64_t index, T& v) const noexcept; + + ResultType getFloat(const Slice& name, float& v) const noexcept; + ResultType getFloat(int64_t index, float& v) const noexcept; + + ResultType getDouble(const Slice& name, double& v) const noexcept; + ResultType getDouble(int64_t index, double& v) const noexcept; + + ResultType getString(const Slice& name, Slice& v) const noexcept; + ResultType getString(int64_t index, Slice& v) const noexcept; + + ResultType getVid(const Slice& name, int64_t& v) const noexcept; + ResultType getVid(int64_t index, int64_t& v) const noexcept; + + + std::shared_ptr getSchema() const { + return schema_; + } + + // TODO getPath(const std::string& name) const noexcept; + // TODO getPath(int64_t index) const noexcept; + // TODO getList(const std::string& name) const noexcept; + // TODO getList(int64_t index) const noexcept; + // TODO getSet(const std::string& name) const noexcept; + // TODO getSet(int64_t index) const noexcept; + // TODO getMap(const std::string& name) const noexcept; + // TODO getMap(int64_t index) const noexcept; + +private: + std::shared_ptr schema_; + + Slice data_; + int32_t headerLen_ = 0; + int32_t numBytesForOffset_ = 0; + // Block offet value is composed by two integers. The first one is + // the block offset, the second one is the largest index being visited + // in the block. This index is zero-based + mutable std::vector> blockOffsets_; + mutable std::vector offsets_; + +private: + static int32_t getSchemaVer(Slice row); + + RowReader(Slice row, + std::shared_ptr schema); + + // Process the row header infomation + // Returns false when the row data is invalid + bool processHeader(Slice row); + + // Process the block offsets (each block contains certain number of fields) + // Returns false when the row data is invalid + bool processBlockOffsets(Slice row, int32_t verBytes); + + // Skip to the next field + // Parameter: + // index : the current field index + // offset : the current offset + // When succeeded, the method returns the offset pointing to the + // next field + // When failed, the method returns a negative number + int64_t skipToNext(int64_t index, int64_t offset) const noexcept; + + // Skip to the {index}Th field + // The method retuns the offset of the field + // It returns a negative number when the data corrupts + int64_t skipToField(int64_t index) const noexcept; + + // The following methods all return the number of bytes read + // A negative number will be returned if an error occurs + template + typename std::enable_if::value, int32_t>::type + readInteger(int64_t offset, T& v) const noexcept; + + int32_t readFloat(int64_t offset, float& v) const noexcept; + int32_t readDouble(int64_t offset, double& v) const noexcept; + int32_t readString(int64_t offset, Slice& v) const noexcept; + int32_t readInt64(int64_t offset, int64_t& v) const noexcept; + int32_t readVid(int64_t offset, int64_t& v) const noexcept; + + // Following methods assume the parameters index and offset are valid + // When succeeded, offset will advance + template + typename std::enable_if::value, ResultType>::type + getInt(int64_t index, int64_t& offset, T& v) const noexcept; + + ResultType getBool(int64_t index, int64_t& offset, bool& v) const noexcept; + ResultType getFloat(int64_t index, int64_t& offset, float& v) const noexcept; + ResultType getDouble(int64_t index, int64_t& offset, double& v) + const noexcept; + ResultType getString(int64_t index, int64_t& offset, Slice& v) + const noexcept; + ResultType getInt64(int64_t index, int64_t& offset, int64_t& v) const noexcept; + ResultType getVid(int64_t index, int64_t& offset, int64_t& v) const noexcept; +}; + +/********************************************* + * + * class RowReader::Cell + * + ********************************************/ +template +typename std::enable_if::value, ResultType>::type +RowReader::Cell::getInt(T& v) const noexcept { + RR_CELL_GET_VALUE(Int); +} + + +template +typename std::enable_if::value, ResultType>::type +RowReader::getInt(int64_t index, int64_t& offset, T& v) const noexcept { + switch (schema_->getFieldType(index)) { + case ValueType::INT: + case ValueType::TIMESTAMP: { + int32_t numBytes = readInteger(offset, v); + if (numBytes < 0) { + return static_cast(numBytes); + } + offset += numBytes; + break; + } + default: { + return ResultType::E_INCOMPATIBLE_TYPE; + } + } + + return ResultType::SUCCEEDED; +} + +template +typename std::enable_if::value, ResultType>::type +RR_GET_VALUE_BY_NAME(Int, T) + +template +typename std::enable_if::value, ResultType>::type +RowReader::getInt(int64_t index, T& v) const noexcept { + RR_GET_OFFSET() + return getInt(index, offset, v); +} + + +template +typename std::enable_if::value, int32_t>::type +RowReader::readInteger(int64_t offset, T& v) const noexcept { + const int8_t* start = reinterpret_cast(&(data_[offset])); + uint64_t val = 0; + auto len = decodeVarint(start, data_.size() - offset, val); + // CHECK_GT(len, 0); + v = val; + return len; +} + +} // namespace codec +} // namespace dataman +} // namespace nebula +#endif // DATAMAN_CODEC_ROWREADER_H_ diff --git a/src/jni/src/datamanlite/RowWriter.cpp b/src/jni/src/datamanlite/RowWriter.cpp new file mode 100644 index 00000000000..4b7b9b097ff --- /dev/null +++ b/src/jni/src/datamanlite/RowWriter.cpp @@ -0,0 +1,118 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "datamanlite/RowWriter.h" + + +namespace nebula { +namespace dataman { +namespace codec { + +int64_t RowWriter::size() const noexcept { + auto offsetBytes = calcOccupiedBytes(cord_.size()); + SchemaVer verBytes = 0; + if (ver_ > 0) { + verBytes = calcOccupiedBytes(ver_); + } + return cord_.size() // data length + + offsetBytes * blockOffsets_.size() // block offsets length + + verBytes // version number length + + 1; // Header +} + + +std::string RowWriter::encode() noexcept { + std::string encoded; + // Reserve enough space so resize will not happen + encoded.reserve(sizeof(int64_t) * blockOffsets_.size() + cord_.size() + 11); + encodeTo(encoded); + + return encoded; +} + + +void RowWriter::encodeTo(std::string& encoded) noexcept { + // Header information + auto offsetBytes = calcOccupiedBytes(cord_.size()); + char header = offsetBytes - 1; + + SchemaVer ver = ver_; + if (ver > 0) { + auto verBytes = calcOccupiedBytes(ver); + header |= verBytes << 5; + encoded.append(&header, 1); + // Schema version is stored in Little Endian + encoded.append(reinterpret_cast(&ver), verBytes); + } else { + encoded.append(&header, 1); + } + + // Offsets are stored in Little Endian + for (auto offset : blockOffsets_) { + encoded.append(reinterpret_cast(&offset), offsetBytes); + } + + cord_.appendTo(encoded); +} + + +int64_t RowWriter::calcOccupiedBytes(uint64_t v) const noexcept { + int64_t bytes = 0; + do { + bytes++; + v >>= 8; + } while (v); + + return bytes; +} + + +/**************************** + * + * Data Stream + * + ***************************/ +RowWriter& RowWriter::operator<<(bool v) noexcept { + cord_ << v; + RW_CLEAN_UP_WRITE() + return *this; +} + + +RowWriter& RowWriter::operator<<(float v) noexcept { + cord_ << v; + RW_CLEAN_UP_WRITE() + return *this; +} + + +RowWriter& RowWriter::operator<<(double v) noexcept { + cord_ << v; + RW_CLEAN_UP_WRITE() + return *this; +} + + +RowWriter& RowWriter::operator<<(const std::string& v) noexcept { + return operator<<(Slice(v)); +} + +RowWriter& RowWriter::operator<<(Slice v) noexcept { + writeInt(v.size()); + cord_.write(v.data(), v.size()); + RW_CLEAN_UP_WRITE() + return *this; +} + + +RowWriter& RowWriter::operator<<(const char* v) noexcept { + return operator<<(Slice(v)); +} + + +} // namespace codec +} // namespace dataman +} // namespace nebula diff --git a/src/jni/src/datamanlite/RowWriter.h b/src/jni/src/datamanlite/RowWriter.h new file mode 100644 index 00000000000..cd6bd3ea1e5 --- /dev/null +++ b/src/jni/src/datamanlite/RowWriter.h @@ -0,0 +1,100 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef DATAMAN_CODEC_ROWWRITER_H_ +#define DATAMAN_CODEC_ROWWRITER_H_ + +#include +#include +#include "base/Cord.h" +#include "datamanlite/DataCommon.h" +#include "datamanlite/Slice.h" +#include "datamanlite/SchemaProviderIf.h" + +#define RW_CLEAN_UP_WRITE() \ + colNum_++; \ + if (colNum_ != 0 && (colNum_ >> 4 << 4) == colNum_) { \ + /* We need to record offset for every 16 fields */ \ + blockOffsets_.emplace_back(cord_.size()); \ + } + + +namespace nebula { +namespace dataman { +namespace codec { + +/** + * It's a write-only data streamer, used to encode one row of data + * It just encodes the data writed-in. + */ +class RowWriter { +public: + explicit RowWriter(SchemaVer ver = 0) + : ver_(ver) {} + + // Encode into a binary array + std::string encode() noexcept; + // Encode and attach to the given string + // For the sake of performance, th caller needs to make sure the sting + // is large enough so that resize will not happen + void encodeTo(std::string& encoded) noexcept; + + // Calculate the exact length of the encoded binary array + int64_t size() const noexcept; + + // Data stream + RowWriter& operator<<(bool v) noexcept; + RowWriter& operator<<(float v) noexcept; + RowWriter& operator<<(double v) noexcept; + + /* + template + typename std::enable_if::value, RowWriter&>::type + operator<<(T v) noexcept; + */ + template + typename std::enable_if::value, RowWriter&>::type + operator<<(T v) noexcept { + writeInt(v); + RW_CLEAN_UP_WRITE() + return *this; + } + + + RowWriter& operator<<(const std::string& v) noexcept; + RowWriter& operator<<(Slice v) noexcept; + RowWriter& operator<<(const char* v) noexcept; + +private: + nebula::Cord cord_; + SchemaVer ver_; + + int64_t colNum_ = 0; + // Block offsets for every 16 fields + std::vector blockOffsets_; + + template + typename std::enable_if::value>::type + writeInt(T v) { + uint8_t buf[10]; + size_t len = encodeVarint(v, buf); + DCHECK_GT(len, 0UL); + cord_.write(reinterpret_cast(&buf[0]), len); + } + + + // Calculate the number of bytes occupied (ignore the leading 0s) + int64_t calcOccupiedBytes(uint64_t v) const noexcept; +}; + +} // namespace codec +} // namespace dataman +} // namespace nebula + +#endif // DATAMAN_CODEC_ROWWRITER_H_ + + + diff --git a/src/jni/src/datamanlite/SchemaProviderIf.cpp b/src/jni/src/datamanlite/SchemaProviderIf.cpp new file mode 100644 index 00000000000..ec3225a5126 --- /dev/null +++ b/src/jni/src/datamanlite/SchemaProviderIf.cpp @@ -0,0 +1,9 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "datamanlite/SchemaProviderIf.h" + + diff --git a/src/jni/src/datamanlite/SchemaProviderIf.h b/src/jni/src/datamanlite/SchemaProviderIf.h new file mode 100644 index 00000000000..76b15457bb7 --- /dev/null +++ b/src/jni/src/datamanlite/SchemaProviderIf.h @@ -0,0 +1,148 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef DATAMAN_CODEC_SCHEMAPROVIDERIF_H_ +#define DATAMAN_CODEC_SCHEMAPROVIDERIF_H_ + +#include +#include +#include "base/ThriftTypes.h" +#include "datamanlite/Slice.h" + +namespace nebula { +namespace dataman { +namespace codec { + +enum class ValueType { + UNKNOWN = 0, + // Simple types + BOOL = 1, + INT = 2, + VID = 3, + FLOAT = 4, + DOUBLE = 5, + STRING = 6, + + // Date time + TIMESTAMP = 21, + YEAR = 22, + YEARMONTH = 23, + DATE = 24, + DATETIME = 25, + + // Graph specific + PATH = 41, + + // Container types + // LIST = 101, + // SET = 102, + // MAP = 103, // The key type is always a STRING + // STRUCT = 104, +}; + +class SchemaProviderIf { +public: + // This is an interface class + class Field { + public: + virtual ~Field() = default; + + virtual const char* getName() const = 0; + virtual const ValueType& getType() const = 0; + virtual bool isValid() const = 0; + virtual bool hasDefault() const = 0; + virtual std::string getDefaultValue() const = 0; + }; + + // Inherited classes do not need to implement the Iterator + class Iterator final { + friend class SchemaProviderIf; + + public: + const Field& operator*() const { + return *field_; + } + + const Field* operator->() const { + return field_.get(); + } + + Iterator& operator++() { + if (field_) { + index_++; + field_ = schema_->field(index_); + } + return *this; + } + + Iterator& operator+(uint16_t steps) { + if (field_) { + index_ += steps; + field_ = schema_->field(index_); + } + return *this; + } + + operator bool() const { + return static_cast(field_); + } + + bool operator==(const Iterator& rhs) const { + return schema_ == rhs.schema_ && + (index_ == rhs.index_ || (!field_ && !rhs.field_)); + } + + private: + const SchemaProviderIf* schema_; + size_t numFields_; + int64_t index_; + std::shared_ptr field_; + + private: + explicit Iterator(const SchemaProviderIf* schema, + int64_t idx = 0) + : schema_(schema) + , numFields_(schema_->getNumFields()) + , index_(idx) { + field_ = schema_->field(index_); + } + }; + +public: + virtual ~SchemaProviderIf() = default; + + virtual SchemaVer getVersion() const noexcept = 0; + virtual size_t getNumFields() const noexcept = 0; + + virtual int64_t getFieldIndex(const Slice& name) const = 0; + virtual const char* getFieldName(int64_t index) const = 0; + + virtual const ValueType& getFieldType(int64_t index) const = 0; + virtual const ValueType& getFieldType(const Slice& name) const = 0; + + virtual std::shared_ptr field(int64_t index) const = 0; + virtual std::shared_ptr field(const Slice& name) const = 0; + + /****************************************** + * + * Iterator implementation + * + *****************************************/ + Iterator begin() const { + return Iterator(this, 0); + } + + + Iterator end() const { + return Iterator(this, getNumFields()); + } +}; + +} // namespace codec +} // namespace dataman +} // namespace nebula + +#endif // DATAMAN_CODEC_SCHEMAPROVIDERIF_H_ diff --git a/src/jni/src/datamanlite/Slice.cpp b/src/jni/src/datamanlite/Slice.cpp new file mode 100644 index 00000000000..ff6028de6cf --- /dev/null +++ b/src/jni/src/datamanlite/Slice.cpp @@ -0,0 +1,85 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ +#include "datamanlite/Slice.h" + +namespace nebula { +namespace dataman { +namespace codec { + +// 2 small internal utility functions, for efficient hex conversions +// and no need for snprintf, toupper etc... +// Originally from wdt/util/EncryptionUtils.cpp - for toString(true)/DecodeHex: +char toHex(unsigned char v) { + if (v <= 9) { + return '0' + v; + } + return 'A' + v - 10; +} +// most of the code is for validation/error check +int fromHex(char c) { + // toupper: + if (c >= 'a' && c <= 'f') { + c -= ('a' - 'A'); // aka 0x20 + } + // validation + if (c < '0' || (c > '9' && (c < 'A' || c > 'F'))) { + return -1; // invalid not 0-9A-F hex char + } + if (c <= '9') { + return c - '0'; + } + return c - 'A' + 10; +} + +// Return a string that contains the copy of the referenced data. +std::string Slice::toString(bool hex) const { + std::string result; // RVO/NRVO/move + if (hex) { + result.reserve(2 * size_); + for (size_t i = 0; i < size_; ++i) { + unsigned char c = data_[i]; + result.push_back(toHex(c >> 4)); + result.push_back(toHex(c & 0xf)); + } + return result; + } else { + result.assign(data_, size_); + return result; + } +} + +// Originally from rocksdb/utilities/ldb_cmd.h +bool Slice::DecodeHex(std::string* result) const { + std::string::size_type len = size_; + if (len % 2) { + // Hex string must be even number of hex digits to get complete bytes back + return false; + } + if (!result) { + return false; + } + result->clear(); + result->reserve(len / 2); + + for (size_t i = 0; i < len;) { + int h1 = fromHex(data_[i++]); + if (h1 < 0) { + return false; + } + int h2 = fromHex(data_[i++]); + if (h2 < 0) { + return false; + } + result->push_back(static_cast((h1 << 4) | h2)); + } + return true; +} + + +} // namespace codec +} // namespace dataman +} // namespace nebula + diff --git a/src/jni/src/datamanlite/Slice.h b/src/jni/src/datamanlite/Slice.h new file mode 100644 index 00000000000..455013151aa --- /dev/null +++ b/src/jni/src/datamanlite/Slice.h @@ -0,0 +1,156 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef DATAMAN_CODEC_SLICE_H_ +#define DATAMAN_CODEC_SLICE_H_ + +#include +#include +#include +#include + +namespace nebula { +namespace dataman { +namespace codec { + +/** + * It is copied from rocksdb::Slice +*/ +class Slice { +public: + // Create an empty slice. + Slice() : data_(""), size_(0) {} + + // Create a slice that refers to d[0,n-1]. + Slice(const char* d, size_t n) : data_(d), size_(n) {} + + // Create a slice that refers to the contents of "s" + /* implicit */ + explicit Slice(const std::string& s) : data_(s.data()), size_(s.size()) {} + + // Create a slice that refers to s[0,strlen(s)-1] + /* implicit */ + explicit Slice(const char* s) : data_(s) { size_ = (s == nullptr) ? 0 : strlen(s); } + + // Return a pointer to the beginning of the referenced data + const char* data() const { return data_; } + + // Return the length (in bytes) of the referenced data + size_t size() const { return size_; } + + // Return true iff the length of the referenced data is zero + bool empty() const { return size_ == 0; } + + const char& operator[](size_t n) const { + assert(n < size()); + return data_[n]; + } + + // Change this slice to refer to an empty array + void clear() { + data_ = ""; + size_ = 0; + } + + // Drop the first "n" bytes from this slice. + void remove_prefix(size_t n) { + assert(n <= size()); + data_ += n; + size_ -= n; + } + + void remove_suffix(size_t n) { + assert(n <= size()); + size_ -= n; + } + + // Return a string that contains the copy of the referenced data. + // when hex is true, returns a string of twice the length hex encoded (0-9A-F) + std::string toString(bool hex = false) const; + + // Decodes the current slice interpreted as an hexadecimal string into result, + // if successful returns true, if this isn't a valid hex string + // (e.g not coming from Slice::toString(true)) DecodeHex returns false. + // This slice is expected to have an even number of 0-9A-F characters + // also accepts lowercase (a-f) + bool DecodeHex(std::string* result) const; + + // Three-way comparison. Returns value: + // < 0 iff "*this" < "b", + // == 0 iff "*this" == "b", + // > 0 iff "*this" > "b" + int compare(const Slice& b) const; + + // Return true iff "x" is a prefix of "*this" + bool starts_with(const Slice& x) const { + return ((size_ >= x.size_) && (memcmp(data_, x.data_, x.size_) == 0)); + } + + bool ends_with(const Slice& x) const { + return ((size_ >= x.size_) && + (memcmp(data_ + size_ - x.size_, x.data_, x.size_) == 0)); + } + + // Compare two slices and returns the first byte where they differ + size_t difference_offset(const Slice& b) const; + + const char* begin() const { + return data_; + } + + const char* end() const { + return data_ + size_; + } + +private: + const char* data_; + size_t size_; + + // Intentionally copyable +}; + +inline bool operator==(const Slice& x, const Slice& y) { + return ((x.size() == y.size()) && + (memcmp(x.data(), y.data(), x.size()) == 0)); +} + +inline bool operator!=(const Slice& x, const Slice& y) { return !(x == y); } + + +inline bool strToBool(Slice str) { + return str == Slice("Y") || str == Slice("y") || str == Slice("T") || str == Slice("t") || + str == Slice("yes") || str == Slice("Yes") || str == Slice("YES") || + str == Slice("true") || str == Slice("True") || str == Slice("TRUE"); +} + +inline int Slice::compare(const Slice& b) const { + assert(data_ != nullptr && b.data_ != nullptr); + const size_t min_len = (size_ < b.size_) ? size_ : b.size_; + int r = memcmp(data_, b.data_, min_len); + if (r == 0) { + if (size_ < b.size_) + r = -1; + else if (size_ > b.size_) + r = +1; + } + return r; +} + +inline size_t Slice::difference_offset(const Slice& b) const { + size_t off = 0; + const size_t len = (size_ < b.size_) ? size_ : b.size_; + for (; off < len; off++) { + if (data_[off] != b.data_[off]) break; + } + return off; +} + +} // namespace codec +} // namespace dataman +} // namespace nebula + +#endif // DATAMAN_CODEC_SLICE_H_ + diff --git a/src/jni/src/datamanlite/test/CMakeLists.txt b/src/jni/src/datamanlite/test/CMakeLists.txt new file mode 100644 index 00000000000..a9b40215e92 --- /dev/null +++ b/src/jni/src/datamanlite/test/CMakeLists.txt @@ -0,0 +1,38 @@ +add_executable( + row_reader_test + RowReaderTest.cpp +) + +target_link_libraries( + row_reader_test + $ + folly + double-conversion + gtest + glog + gflags + unwind + pthread +) + +add_test(NAME row_reader_test COMMAND row_reader_test) + +add_executable( + row_writer_test + RowWriterTest.cpp +) + +target_link_libraries( + row_writer_test + $ + folly + double-conversion + gtest + glog + gflags + unwind + pthread + dl +) + +add_test(NAME row_writer_test COMMAND row_writer_test) diff --git a/src/jni/src/datamanlite/test/RowReaderTest.cpp b/src/jni/src/datamanlite/test/RowReaderTest.cpp new file mode 100644 index 00000000000..8c06e9f2fca --- /dev/null +++ b/src/jni/src/datamanlite/test/RowReaderTest.cpp @@ -0,0 +1,275 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include +#include "base/Base.h" +#include "base/Logging.h" +#include "datamanlite/Slice.h" +#include "datamanlite/RowReader.h" +#include "datamanlite/NebulaSchemaProvider.h" + +namespace nebula { +namespace dataman { +namespace codec { + + +TEST(RowReader, headerInfo) { + // Simplest row, nothing in it + char data1[] = {0x00}; + auto schema1 = std::make_shared(0); + auto reader1 = RowReader::getRowReader( + Slice(data1, sizeof(data1)), + schema1); + EXPECT_EQ(0, reader1->schemaVer()); + EXPECT_EQ(sizeof(data1), reader1->headerLen_); + + // With schema version + char data2[] = {0x40, 0x01, static_cast(0xFF)}; + auto schema2 = std::make_shared(0x00FF01); + auto reader2 = RowReader::getRowReader( + Slice(data2, sizeof(data2)), + schema2); + EXPECT_EQ(0x0000FF01, reader2->schemaVer()); + EXPECT_EQ(sizeof(data2), reader2->headerLen_); + + // Insert 33 fields into schema, so we will get 2 offsets + auto schema3 = std::make_shared(0x00FFFF01); + for (int i = 0; i < 33; i++) { + schema3->addField(Slice(folly::stringPrintf("Column%02d", i)), + ValueType::INT); + } + + // With schema version and offsets + char data3[] = {0x60, 0x01, static_cast(0xFF), + static_cast(0xFF), 0x40, + static_cast(0xF0)}; + auto reader3 = RowReader::getRowReader( + Slice(data3, sizeof(data3)), + schema3); + EXPECT_EQ(0x00FFFF01, reader3->schemaVer()); + EXPECT_EQ(sizeof(data3), reader3->headerLen_); + ASSERT_EQ(3, reader3->blockOffsets_.size()); + EXPECT_EQ(0x0000, reader3->blockOffsets_[0].first); + EXPECT_EQ(0x0040, reader3->blockOffsets_[1].first); + EXPECT_EQ(0x00F0, reader3->blockOffsets_[2].first); + + // No schema version, with offsets + auto schema4 = std::make_shared(); + for (int i = 0; i < 33; i++) { + schema4->addField(Slice(folly::stringPrintf("Column%02d", i)), + ValueType::INT); + } + + char data4[] = {0x01, static_cast(0xFF), 0x40, 0x08, static_cast(0xF0)}; + auto reader4 = RowReader::getRowReader( + Slice(data4, sizeof(data4)), + schema4); + EXPECT_EQ(0, reader4->schemaVer()); + EXPECT_EQ(sizeof(data4), reader4->headerLen_); + ASSERT_EQ(3, reader4->blockOffsets_.size()); + EXPECT_EQ(0x000000, reader4->blockOffsets_[0].first); + EXPECT_EQ(0x0040FF, reader4->blockOffsets_[1].first); + EXPECT_EQ(0x00F008, reader4->blockOffsets_[2].first); +} + +TEST(RowReader, encodedData) { + const char* colName1 = "int_col1"; + const std::string colName2("int_col2"); + std::string colName3("vid_col"); + + auto schema = std::make_shared(); + // Col 0: bool_col1 -- BOOL + schema->addField(Slice("bool_col1"), ValueType::BOOL); + // Col 1: str_col1 -- STRING + schema->addField(Slice("str_col1"), ValueType::STRING); + // Col 2: int_col1 -- INT + schema->addField(Slice(colName1), ValueType::INT); + // Col 3: int_col2 -- INT + schema->addField(Slice(colName2), ValueType::INT); + // Col 4: vid_col -- VID + schema->addField(Slice(colName3), + ValueType::VID); + // Col 5: str_col2 -- STRING + schema->addField(Slice("str_col2"), ValueType::STRING); + // Col 6: bool_col2 -- BOOL + schema->addField(Slice("bool_col2"), + ValueType::BOOL); + // Col 7: float_col -- FLOAT + schema->addField(Slice("float_col"), + ValueType::FLOAT); + // Col 8: double_col -- DOUBLE + schema->addField(Slice("double_col"), + ValueType::DOUBLE); + // Col 9: timestamp_col -- TIMESTAMP + schema->addField(Slice("timestamp_col"), ValueType::TIMESTAMP); + + std::string encoded; + // Single byte header (Schema version is 0, no offset) + encoded.append(1, 0x00); + // Col 0 + encoded.append(1, 0x01); + + const char* str1 = "Hello World!"; + const char* str2 = "Welcome to the future!"; + + // Col 1 + uint8_t buf[10]; + size_t len = folly::encodeVarint(strlen(str1), buf); + encoded.append(reinterpret_cast(buf), len); + encoded.append(str1, strlen(str1)); + + // Col 2 + len = folly::encodeVarint(100, buf); + encoded.append(reinterpret_cast(buf), len); + + // Col 3 + len = folly::encodeVarint(0xFFFFFFFFFFFFFFFFL, buf); + encoded.append(reinterpret_cast(buf), len); + + // Col 4 + int64_t id = 0x8877665544332211L; + encoded.append(reinterpret_cast(&id), sizeof(int64_t)); + + // Col 5 + len = folly::encodeVarint(strlen(str2), buf); + encoded.append(reinterpret_cast(buf), len); + encoded.append(str2, strlen(str2)); + + // Col 6 + encoded.append(1, 0x00); + + // Col 7 + float pi = 3.1415926; + encoded.append(reinterpret_cast(&pi), sizeof(float)); + + // Col 8 + double e = 2.71828182845904523536028747135266249775724709369995; + encoded.append(reinterpret_cast(&e), sizeof(double)); + + // Col 9 + len = folly::encodeVarint(1551331827, buf); + encoded.append(reinterpret_cast(buf), len); + + // Now let's read it + auto reader = RowReader::getRowReader(Slice(encoded), schema); + + // Header info + EXPECT_EQ(0, reader->schemaVer()); + EXPECT_EQ(1, reader->blockOffsets_.size()); + EXPECT_EQ(0, reader->blockOffsets_[0].first); + EXPECT_EQ(1, reader->headerLen_); + + bool bVal; + int32_t i32Val; + int64_t i64Val; + Slice sVal; + float fVal; + double dVal; + + // Col 0 + EXPECT_EQ(ResultType::SUCCEEDED, + reader->getBool(0, bVal)); + EXPECT_TRUE(bVal); + + // Col 1 + EXPECT_EQ(ResultType::SUCCEEDED, + reader->getString(1, sVal)); + EXPECT_EQ(str1, sVal.toString()); + + // Col 2 + EXPECT_EQ(ResultType::SUCCEEDED, + reader->getInt(2, i32Val)); + EXPECT_EQ(100, i32Val); + + // Col 3 + EXPECT_EQ(ResultType::SUCCEEDED, + reader->getInt(3, i32Val)); + EXPECT_EQ(0xFFFFFFFF, i32Val); + i32Val = 0; + + EXPECT_EQ(ResultType::SUCCEEDED, + reader->getInt(3, i64Val)); + EXPECT_EQ(0xFFFFFFFFFFFFFFFFL, i64Val); + + // Col 4 + EXPECT_EQ(ResultType::SUCCEEDED, + reader->getVid(4, i64Val)); + EXPECT_EQ(0x8877665544332211L, i64Val); + + // Col 5 + EXPECT_EQ(ResultType::SUCCEEDED, + reader->getString(5, sVal)); + EXPECT_EQ(str2, sVal.toString()); + + // Col 6 + EXPECT_EQ(ResultType::SUCCEEDED, + reader->getBool(6, bVal)); + EXPECT_FALSE(bVal); + + // Col 7 + EXPECT_EQ(ResultType::SUCCEEDED, + reader->getFloat(7, fVal)); + EXPECT_FLOAT_EQ(pi, fVal); + + // Col 8 + EXPECT_EQ(ResultType::SUCCEEDED, + reader->getDouble(8, dVal)); + EXPECT_DOUBLE_EQ(e, dVal); + + // Col 9 + i64Val = 0; + EXPECT_EQ(ResultType::SUCCEEDED, + reader->getInt(9, i64Val)); + EXPECT_EQ(1551331827, i64Val); + + // Col 10 -- non-existing column + EXPECT_EQ(ResultType::E_INDEX_OUT_OF_RANGE, + reader->getBool(10, bVal)); +} + +TEST(RowReader, iterator) { + std::string encoded; + encoded.append(1, 0); + encoded.append(1, 16); + encoded.append(1, 32); + encoded.append(1, 48); + encoded.append(1, 64); + + auto schema = std::make_shared(); + for (int i = 0; i < 64; i++) { + schema->addField(Slice(folly::stringPrintf("Col%02d", i)), + ValueType::INT); + encoded.append(1, i + 1); + } + + auto reader = RowReader::getRowReader(Slice(encoded), schema); + auto it = reader->begin(); + int32_t v1; + int32_t v2; + for (int i = 0; i < 64; i++) { + EXPECT_EQ(ResultType::SUCCEEDED, reader->getInt(i, v1)); + EXPECT_EQ(ResultType::SUCCEEDED, it->getInt(v2)); + EXPECT_EQ(v1, v2); + ++it; + } + + EXPECT_FALSE((bool)it); + EXPECT_EQ(it, reader->end()); +} + +} // namespace codec +} // namespace dataman +} // namespace nebula + + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + google::SetStderrLogging(google::INFO); + + return RUN_ALL_TESTS(); +} + + diff --git a/src/jni/src/datamanlite/test/RowWriterTest.cpp b/src/jni/src/datamanlite/test/RowWriterTest.cpp new file mode 100644 index 00000000000..9a1d0dca27b --- /dev/null +++ b/src/jni/src/datamanlite/test/RowWriterTest.cpp @@ -0,0 +1,103 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "base/Base.h" +#include +#include +#include "datamanlite/RowWriter.h" +#include "datamanlite/RowReader.h" +#include "datamanlite/NebulaSchemaProvider.h" + +namespace nebula { +namespace dataman { +namespace codec { + +TEST(RowWriter, withoutSchema) { + RowWriter writer; + writer << true << 10 << "Hello World!" + << 3.1415926 // By default, this will be a double + << static_cast(3.1415926); + + std::string encoded = writer.encode(); + auto schema = std::make_shared(0); + schema->addField(Slice("c1"), ValueType::BOOL); + schema->addField(Slice("c2"), ValueType::INT); + schema->addField(Slice("c3"), ValueType::STRING); + schema->addField(Slice("c4"), ValueType::DOUBLE); + schema->addField(Slice("c5"), ValueType::FLOAT); + + auto reader = RowReader::getRowReader(Slice(encoded), schema); + + EXPECT_EQ(0x00000000, reader->schemaVer()); + EXPECT_EQ(5, reader->numFields()); + + // Col 0 + bool bVal; + EXPECT_EQ(ResultType::SUCCEEDED, + reader->getBool(0, bVal)); + EXPECT_TRUE(bVal); + + // Col 1 + int32_t iVal; + EXPECT_EQ(ResultType::SUCCEEDED, + reader->getInt(1, iVal)); + EXPECT_EQ(10, iVal); + + // Col 2 + Slice sVal; + EXPECT_EQ(ResultType::SUCCEEDED, + reader->getString(2, sVal)); + EXPECT_EQ("Hello World!", sVal.toString()); + + // Col 3 + double dVal; + EXPECT_EQ(ResultType::SUCCEEDED, + reader->getDouble(3, dVal)); + EXPECT_DOUBLE_EQ(3.1415926, dVal); + + // Col 4 + float fVal; + EXPECT_EQ(ResultType::SUCCEEDED, + reader->getFloat(4, fVal)); + EXPECT_FLOAT_EQ(3.1415926, fVal); +} + + +TEST(RowWriter, offsetsCreation) { + RowWriter writer; + // These fields should create two offsets + for (int i = 0; i < 33; i++) { + writer << i; + } + + std::string encoded = writer.encode(); + auto schema = std::make_shared(0); + for (int i = 0; i < 33; i++) { + schema->addField(Slice(folly::stringPrintf("col%d", i)), ValueType::INT); + } + auto reader = RowReader::getRowReader(Slice(encoded), schema); + + EXPECT_EQ(0x00000000, reader->schemaVer()); + EXPECT_EQ(33, reader->numFields()); + EXPECT_EQ(3, reader->blockOffsets_.size()); + EXPECT_EQ(0, reader->blockOffsets_[0].first); + EXPECT_EQ(16, reader->blockOffsets_[1].first); + EXPECT_EQ(32, reader->blockOffsets_[2].first); +} + + +} // namespace codec +} // namespace dataman +} // namespace nebula + + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + google::SetStderrLogging(google::INFO); + + return RUN_ALL_TESTS(); +} +