diff --git a/build_tools/build_detect_platform b/build_tools/build_detect_platform index c7ddb7cceec..60a4d12bc43 100755 --- a/build_tools/build_detect_platform +++ b/build_tools/build_detect_platform @@ -435,8 +435,8 @@ if test "$USE_HDFS"; then echo "JAVA_HOME has to be set for HDFS usage." exit 1 fi - HDFS_CCFLAGS="$HDFS_CCFLAGS -I$JAVA_HOME/include -I$JAVA_HOME/include/linux -DUSE_HDFS" - HDFS_LDFLAGS="$HDFS_LDFLAGS -lhdfs -L$JAVA_HOME/jre/lib/amd64" + HDFS_CCFLAGS="$HDFS_CCFLAGS -I$JAVA_HOME/include -I$JAVA_HOME/include/linux -DUSE_HDFS -I$HADOOP_HOME/include" + HDFS_LDFLAGS="$HDFS_LDFLAGS -lhdfs -L$JAVA_HOME/jre/lib/amd64 -L$HADOOP_HOME/lib/native" HDFS_LDFLAGS="$HDFS_LDFLAGS -L$JAVA_HOME/jre/lib/amd64/server -L$GLIBC_RUNTIME_PATH/lib" HDFS_LDFLAGS="$HDFS_LDFLAGS -ldl -lverify -ljava -ljvm" COMMON_FLAGS="$COMMON_FLAGS $HDFS_CCFLAGS" diff --git a/env/env_hdfs.cc b/env/env_hdfs.cc index d98020c76b3..9855745b08d 100644 --- a/env/env_hdfs.cc +++ b/env/env_hdfs.cc @@ -19,6 +19,7 @@ #include #include "rocksdb/status.h" #include "util/string_util.h" +#include "util/logging.h" #define HDFS_EXISTS 0 #define HDFS_DOESNT_EXIST -1 @@ -293,7 +294,8 @@ class HdfsLogger : public Logger { } } - virtual void Logv(const char* format, va_list ap) { + using Logger::Logv; + void Logv(const char* format, va_list ap) override { const uint64_t thread_id = (*gettid_)(); // We try twice: the first time with a fixed-size stack allocated buffer, @@ -365,7 +367,8 @@ class HdfsLogger : public Logger { // Finally, the hdfs environment -const std::string HdfsEnv::kProto = "hdfs://"; +const std::string HdfsEnv::kHdfsProto = "hdfs://"; +const std::string HdfsEnv::kFdsProto = "fds://"; const std::string HdfsEnv::pathsep = "/"; // open a file for sequential reading @@ -464,10 +467,10 @@ Status HdfsEnv::GetChildren(const std::string& path, pHdfsFileInfo = hdfsListDirectory(fileSys_, path.c_str(), &numEntries); if (numEntries >= 0) { for(int i = 0; i < numEntries; i++) { - char* pathname = pHdfsFileInfo[i].mName; - char* filename = std::rindex(pathname, '/'); - if (filename != nullptr) { - result->push_back(filename+1); + std::string pathname(pHdfsFileInfo[i].mName); + size_t pos = pathname.rfind("/"); + if (std::string::npos != pos) { + result->push_back(pathname.substr(pos + 1)); } } if (pHdfsFileInfo != nullptr) { diff --git a/hdfs/env_hdfs.h b/hdfs/env_hdfs.h index 3a62bc8cb92..f378a3b04ad 100644 --- a/hdfs/env_hdfs.h +++ b/hdfs/env_hdfs.h @@ -178,29 +178,36 @@ class HdfsEnv : public Env { // object here so that we can use posix timers, // posix threads, etc. - static const std::string kProto; + static const std::string kHdfsProto; + static const std::string kFdsProto; static const std::string pathsep; /** - * If the URI is specified of the form hdfs://server:port/path, - * then connect to the specified cluster - * else connect to default. + * If the URI is specified of the form hdfs://server:port/path + * or fds://accessId:accessSecret@bucket.endpoint#port then connect + * to the specified cluster else connect to default. */ hdfsFS connectToPath(const std::string& uri) { if (uri.empty()) { return nullptr; } - if (uri.find(kProto) != 0) { - // uri doesn't start with hdfs:// -> use default:0, which is special + + std::vector parts; + if (uri.find(kHdfsProto) == 0) { + const std::string hostport = uri.substr(kHdfsProto.length()); + split(hostport, ':', parts); + } else if (uri.find(kFdsProto) == 0) { + split(uri, '#', parts); + } else { + // uri doesn't start with hdfs:// or fds:// -> use default:0, which is special // to libhdfs. + fprintf(stderr, "[hdfs]You now access the default hdfs/fds url\n"); return hdfsConnectNewInstance("default", 0); } - const std::string hostport = uri.substr(kProto.length()); - std::vector parts; - split(hostport, ':', parts); + fprintf(stderr, "[hdfs]You now access the hdfs/fds url:%s\n", uri.c_str());} if (parts.size() != 2) { - throw HdfsFatalException("Bad uri for hdfs " + uri); + throw HdfsFatalException("Bad uri for hdfs/fds " + uri); } // parts[0] = hosts, parts[1] = port/xxx/yyy std::string host(parts[0]); @@ -213,7 +220,7 @@ class HdfsEnv : public Env { tPort port; port = atoi(portStr.c_str()); if (port == 0) { - throw HdfsFatalException("Bad host-port for hdfs " + uri); + throw HdfsFatalException("Bad host-port for hdfs/fds " + uri); } hdfsFS fs = hdfsConnectNewInstance(host.c_str(), port); return fs; diff --git a/java/CMakeLists.txt b/java/CMakeLists.txt index 294d8c61705..e416058eb68 100644 --- a/java/CMakeLists.txt +++ b/java/CMakeLists.txt @@ -79,6 +79,7 @@ set(NATIVE_JAVA_CLASSES org.rocksdb.FlushOptions org.rocksdb.HashLinkedListMemTableConfig org.rocksdb.HashSkipListMemTableConfig + org.rocksdb.HdfsEnv org.rocksdb.IngestExternalFileOptions org.rocksdb.Logger org.rocksdb.LRUCache diff --git a/java/Makefile b/java/Makefile index 11c6c807e39..5e22eb6a553 100644 --- a/java/Makefile +++ b/java/Makefile @@ -26,6 +26,7 @@ NATIVE_JAVA_CLASSES = org.rocksdb.AbstractCompactionFilter\ org.rocksdb.IngestExternalFileOptions\ org.rocksdb.HashLinkedListMemTableConfig\ org.rocksdb.HashSkipListMemTableConfig\ + org.rocksdb.HdfsEnv\ org.rocksdb.Logger\ org.rocksdb.LRUCache\ org.rocksdb.MergeOperator\ diff --git a/java/rocksjni/env.cc b/java/rocksjni/env.cc index dc949a07fa0..916acc1795b 100644 --- a/java/rocksjni/env.cc +++ b/java/rocksjni/env.cc @@ -6,8 +6,10 @@ // This file implements the "bridge" between Java and C++ and enables // calling c++ rocksdb::Env methods from Java side. +#include "portal.h" #include "include/org_rocksdb_Env.h" #include "include/org_rocksdb_RocksEnv.h" +#include "include/org_rocksdb_HdfsEnv.h" #include "include/org_rocksdb_RocksMemEnv.h" #include "rocksdb/env.h" @@ -17,7 +19,7 @@ * Signature: ()J */ jlong Java_org_rocksdb_Env_getDefaultEnvInternal( - JNIEnv* env, jclass jclazz) { + JNIEnv*, jclass) { return reinterpret_cast(rocksdb::Env::Default()); } @@ -79,3 +81,38 @@ void Java_org_rocksdb_RocksMemEnv_disposeInternal( assert(e != nullptr); delete e; } + +/* + * Class: org_rocksdb_HdfsEnv + * Method: createHdfsEnv + * Signature: (Ljava/lang/String;)J + */ +jlong Java_org_rocksdb_HdfsEnv_createHdfsEnv( + JNIEnv* env, jclass, jstring jfsname) { + jboolean has_exception = JNI_FALSE; + auto fsname = rocksdb::JniUtil::copyStdString(env, jfsname, &has_exception); + if (has_exception == JNI_TRUE) { + // exception occurred + return 0; + } + rocksdb::Env* hdfs_env; + rocksdb::Status s = rocksdb::NewHdfsEnv(&hdfs_env, fsname); + if (!s.ok()) { + // error occurred + rocksdb::RocksDBExceptionJni::ThrowNew(env, s); + return 0; + } + return reinterpret_cast(hdfs_env); +} + +/* + * Class: org_rocksdb_HdfsEnv + * Method: disposeInternal + * Signature: (J)V + */ +void Java_org_rocksdb_HdfsEnv_disposeInternal( + JNIEnv*, jobject, jlong jhandle) { + auto* e = reinterpret_cast(jhandle); + assert(e != nullptr); + delete e; +} diff --git a/java/rocksjni/options.cc b/java/rocksjni/options.cc index 45a5de86596..33ed281c615 100644 --- a/java/rocksjni/options.cc +++ b/java/rocksjni/options.cc @@ -5787,6 +5787,49 @@ jboolean Java_org_rocksdb_WriteOptions_noSlowdown( return reinterpret_cast(jhandle)->no_slowdown; } +/* + * Class: org_rocksdb_WriteOptions + * Method: setLowPri + * Signature: (JZ)V + */ +void Java_org_rocksdb_WriteOptions_setLowPri( + JNIEnv*, jobject, jlong jhandle, jboolean jlow_pri) { + reinterpret_cast(jhandle)->low_pri = + static_cast(jlow_pri); +} + +/* + * Class: org_rocksdb_WriteOptions + * Method: lowPri + * Signature: (J)Z + */ +jboolean Java_org_rocksdb_WriteOptions_lowPri( + JNIEnv*, jobject, jlong jhandle) { + return reinterpret_cast(jhandle)->low_pri; +} + +/* + * Class: org_rocksdb_WriteOptions + * Method: setGivencegree + * Signature: (JZ)V + */ +void Java_org_rocksdb_WriteOptions_setGivenDecree( + JNIEnv*, jobject, jlong jhandle, jboolean jgiven_decree) { + reinterpret_cast(jhandle)->given_decree = + static_cast(jgiven_decree); +} + +/* + * Class: org_rocksdb_WriteOptions + * Method: GivenDecree + * Signature: (J)Z + */ +jboolean Java_org_rocksdb_WriteOptions_GivenDecree( + JNIEnv*, jobject, jlong jhandle) { + return reinterpret_cast(jhandle)->given_decree; +} + + ///////////////////////////////////////////////////////////////////// // rocksdb::ReadOptions diff --git a/java/src/main/java/org/rocksdb/HdfsEnv.java b/java/src/main/java/org/rocksdb/HdfsEnv.java new file mode 100644 index 00000000000..4d8d3bff6fc --- /dev/null +++ b/java/src/main/java/org/rocksdb/HdfsEnv.java @@ -0,0 +1,27 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +package org.rocksdb; + +/** + * HDFS environment. + */ +public class HdfsEnv extends Env { + + /** +

Creates a new environment that is used for HDFS environment.

+ * + *

The caller must delete the result when it is + * no longer needed.

+ * + * @param fsName the HDFS as a string in the form "hdfs://hostname:port/" + */ + public HdfsEnv(final String fsName) { + super(createHdfsEnv(fsName)); + } + + private static native long createHdfsEnv(final String fsName); + @Override protected final native void disposeInternal(final long handle); +} diff --git a/java/src/main/java/org/rocksdb/WriteOptions.java b/java/src/main/java/org/rocksdb/WriteOptions.java index b9e8ad81c23..fdf82ea6041 100644 --- a/java/src/main/java/org/rocksdb/WriteOptions.java +++ b/java/src/main/java/org/rocksdb/WriteOptions.java @@ -144,6 +144,50 @@ public boolean noSlowdown() { return noSlowdown(nativeHandle_); } + /** + * If true, this write request is of lower priority if compaction is + * behind. In this case that, {@link #noSlowdown()} == true, the request + * will be cancelled immediately with {@link Status.Code#Incomplete} returned. + * Otherwise, it will be slowed down. The slowdown value is determined by + * RocksDB to guarantee it introduces minimum impacts to high priority writes. + * + * Default: false + * + * @param lowPri true if the write request should be of lower priority than + * compactions which are behind. + * + * @return the instance of the current WriteOptions. + */ + public WriteOptions setLowPri(final boolean lowPri) { + setLowPri(nativeHandle_, lowPri); + return this; + } + + /** + * Returns true if this write request is of lower priority if compaction is + * behind. + * + * See {@link #setLowPri(boolean)}. + * + * @return true if this write request is of lower priority, false otherwise. + */ + public boolean lowPri() { + return lowPri(nativeHandle_); + } + + /** + */ + public WriteOptions setGivenDecree(final long givenDecree) { + setGivenDecree(nativeHandle_, givenDecree); + return this; + } + + /** + */ + public boolean givenDecree() { + return givenDecree(nativeHandle_); + } + private native static long newWriteOptions(); private native void setSync(long handle, boolean flag); private native boolean sync(long handle); @@ -155,5 +199,9 @@ private native void setIgnoreMissingColumnFamilies(final long handle, private native void setNoSlowdown(final long handle, final boolean noSlowdown); private native boolean noSlowdown(final long handle); + private native void setLowPri(final long handle, final boolean lowPri); + private native boolean lowPri(final long handle); + private native void setGivenDecree(final long handle, final long givenDecree); + private native boolean givenDecree(final long handle); @Override protected final native void disposeInternal(final long handle); }