Skip to content

Commit

Permalink
support hdfs/fds (facebook#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
foreverneverer authored and acelyc111 committed Oct 12, 2019
1 parent dc91f8a commit ecef9ea
Show file tree
Hide file tree
Showing 9 changed files with 187 additions and 20 deletions.
4 changes: 2 additions & 2 deletions build_tools/build_detect_platform
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,8 @@ if test "$USE_HDFS"; then
echo "JAVA_HOME has to be set for HDFS usage."
exit 1
fi
HDFS_CCFLAGS="$HDFS_CCFLAGS -I$JAVA_HOME/include -I$JAVA_HOME/include/linux -DUSE_HDFS"
HDFS_LDFLAGS="$HDFS_LDFLAGS -lhdfs -L$JAVA_HOME/jre/lib/amd64"
HDFS_CCFLAGS="$HDFS_CCFLAGS -I$JAVA_HOME/include -I$JAVA_HOME/include/linux -DUSE_HDFS -I$HADOOP_HOME/include"
HDFS_LDFLAGS="$HDFS_LDFLAGS -lhdfs -L$JAVA_HOME/jre/lib/amd64 -L$HADOOP_HOME/lib/native"
HDFS_LDFLAGS="$HDFS_LDFLAGS -L$JAVA_HOME/jre/lib/amd64/server -L$GLIBC_RUNTIME_PATH/lib"
HDFS_LDFLAGS="$HDFS_LDFLAGS -ldl -lverify -ljava -ljvm"
COMMON_FLAGS="$COMMON_FLAGS $HDFS_CCFLAGS"
Expand Down
15 changes: 9 additions & 6 deletions env/env_hdfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <sstream>
#include "rocksdb/status.h"
#include "util/string_util.h"
#include "util/logging.h"

#define HDFS_EXISTS 0
#define HDFS_DOESNT_EXIST -1
Expand Down Expand Up @@ -293,7 +294,8 @@ class HdfsLogger : public Logger {
}
}

virtual void Logv(const char* format, va_list ap) {
using Logger::Logv;
void Logv(const char* format, va_list ap) override {
const uint64_t thread_id = (*gettid_)();

// We try twice: the first time with a fixed-size stack allocated buffer,
Expand Down Expand Up @@ -365,7 +367,8 @@ class HdfsLogger : public Logger {

// Finally, the hdfs environment

const std::string HdfsEnv::kProto = "hdfs://";
const std::string HdfsEnv::kHdfsProto = "hdfs://";
const std::string HdfsEnv::kFdsProto = "fds://";
const std::string HdfsEnv::pathsep = "/";

// open a file for sequential reading
Expand Down Expand Up @@ -464,10 +467,10 @@ Status HdfsEnv::GetChildren(const std::string& path,
pHdfsFileInfo = hdfsListDirectory(fileSys_, path.c_str(), &numEntries);
if (numEntries >= 0) {
for(int i = 0; i < numEntries; i++) {
char* pathname = pHdfsFileInfo[i].mName;
char* filename = std::rindex(pathname, '/');
if (filename != nullptr) {
result->push_back(filename+1);
std::string pathname(pHdfsFileInfo[i].mName);
size_t pos = pathname.rfind("/");
if (std::string::npos != pos) {
result->push_back(pathname.substr(pos + 1));
}
}
if (pHdfsFileInfo != nullptr) {
Expand Down
29 changes: 18 additions & 11 deletions hdfs/env_hdfs.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,29 +178,36 @@ class HdfsEnv : public Env {
// object here so that we can use posix timers,
// posix threads, etc.

static const std::string kProto;
static const std::string kHdfsProto;
static const std::string kFdsProto;
static const std::string pathsep;

/**
* If the URI is specified of the form hdfs://server:port/path,
* then connect to the specified cluster
* else connect to default.
* If the URI is specified of the form hdfs://server:port/path
* or fds://accessId:accessSecret@bucket.endpoint#port then connect
* to the specified cluster else connect to default.
*/
hdfsFS connectToPath(const std::string& uri) {
if (uri.empty()) {
return nullptr;
}
if (uri.find(kProto) != 0) {
// uri doesn't start with hdfs:// -> use default:0, which is special

std::vector <std::string> parts;
if (uri.find(kHdfsProto) == 0) {
const std::string hostport = uri.substr(kHdfsProto.length());
split(hostport, ':', parts);
} else if (uri.find(kFdsProto) == 0) {
split(uri, '#', parts);
} else {
// uri doesn't start with hdfs:// or fds:// -> use default:0, which is special
// to libhdfs.
fprintf(stderr, "[hdfs]You now access the default hdfs/fds url\n");
return hdfsConnectNewInstance("default", 0);
}
const std::string hostport = uri.substr(kProto.length());

std::vector <std::string> parts;
split(hostport, ':', parts);
fprintf(stderr, "[hdfs]You now access the hdfs/fds url:%s\n", uri.c_str());}
if (parts.size() != 2) {
throw HdfsFatalException("Bad uri for hdfs " + uri);
throw HdfsFatalException("Bad uri for hdfs/fds " + uri);
}
// parts[0] = hosts, parts[1] = port/xxx/yyy
std::string host(parts[0]);
Expand All @@ -213,7 +220,7 @@ class HdfsEnv : public Env {
tPort port;
port = atoi(portStr.c_str());
if (port == 0) {
throw HdfsFatalException("Bad host-port for hdfs " + uri);
throw HdfsFatalException("Bad host-port for hdfs/fds " + uri);
}
hdfsFS fs = hdfsConnectNewInstance(host.c_str(), port);
return fs;
Expand Down
1 change: 1 addition & 0 deletions java/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ set(NATIVE_JAVA_CLASSES
org.rocksdb.FlushOptions
org.rocksdb.HashLinkedListMemTableConfig
org.rocksdb.HashSkipListMemTableConfig
org.rocksdb.HdfsEnv
org.rocksdb.IngestExternalFileOptions
org.rocksdb.Logger
org.rocksdb.LRUCache
Expand Down
1 change: 1 addition & 0 deletions java/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ NATIVE_JAVA_CLASSES = org.rocksdb.AbstractCompactionFilter\
org.rocksdb.IngestExternalFileOptions\
org.rocksdb.HashLinkedListMemTableConfig\
org.rocksdb.HashSkipListMemTableConfig\
org.rocksdb.HdfsEnv\
org.rocksdb.Logger\
org.rocksdb.LRUCache\
org.rocksdb.MergeOperator\
Expand Down
39 changes: 38 additions & 1 deletion java/rocksjni/env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
// This file implements the "bridge" between Java and C++ and enables
// calling c++ rocksdb::Env methods from Java side.

#include "portal.h"
#include "include/org_rocksdb_Env.h"
#include "include/org_rocksdb_RocksEnv.h"
#include "include/org_rocksdb_HdfsEnv.h"
#include "include/org_rocksdb_RocksMemEnv.h"
#include "rocksdb/env.h"

Expand All @@ -17,7 +19,7 @@
* Signature: ()J
*/
jlong Java_org_rocksdb_Env_getDefaultEnvInternal(
JNIEnv* env, jclass jclazz) {
JNIEnv*, jclass) {
return reinterpret_cast<jlong>(rocksdb::Env::Default());
}

Expand Down Expand Up @@ -79,3 +81,38 @@ void Java_org_rocksdb_RocksMemEnv_disposeInternal(
assert(e != nullptr);
delete e;
}

/*
* Class: org_rocksdb_HdfsEnv
* Method: createHdfsEnv
* Signature: (Ljava/lang/String;)J
*/
jlong Java_org_rocksdb_HdfsEnv_createHdfsEnv(
JNIEnv* env, jclass, jstring jfsname) {
jboolean has_exception = JNI_FALSE;
auto fsname = rocksdb::JniUtil::copyStdString(env, jfsname, &has_exception);
if (has_exception == JNI_TRUE) {
// exception occurred
return 0;
}
rocksdb::Env* hdfs_env;
rocksdb::Status s = rocksdb::NewHdfsEnv(&hdfs_env, fsname);
if (!s.ok()) {
// error occurred
rocksdb::RocksDBExceptionJni::ThrowNew(env, s);
return 0;
}
return reinterpret_cast<jlong>(hdfs_env);
}

/*
* Class: org_rocksdb_HdfsEnv
* Method: disposeInternal
* Signature: (J)V
*/
void Java_org_rocksdb_HdfsEnv_disposeInternal(
JNIEnv*, jobject, jlong jhandle) {
auto* e = reinterpret_cast<rocksdb::Env*>(jhandle);
assert(e != nullptr);
delete e;
}
43 changes: 43 additions & 0 deletions java/rocksjni/options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5787,6 +5787,49 @@ jboolean Java_org_rocksdb_WriteOptions_noSlowdown(
return reinterpret_cast<rocksdb::WriteOptions*>(jhandle)->no_slowdown;
}

/*
* Class: org_rocksdb_WriteOptions
* Method: setLowPri
* Signature: (JZ)V
*/
void Java_org_rocksdb_WriteOptions_setLowPri(
JNIEnv*, jobject, jlong jhandle, jboolean jlow_pri) {
reinterpret_cast<rocksdb::WriteOptions*>(jhandle)->low_pri =
static_cast<bool>(jlow_pri);
}

/*
* Class: org_rocksdb_WriteOptions
* Method: lowPri
* Signature: (J)Z
*/
jboolean Java_org_rocksdb_WriteOptions_lowPri(
JNIEnv*, jobject, jlong jhandle) {
return reinterpret_cast<rocksdb::WriteOptions*>(jhandle)->low_pri;
}

/*
* Class: org_rocksdb_WriteOptions
* Method: setGivencegree
* Signature: (JZ)V
*/
void Java_org_rocksdb_WriteOptions_setGivenDecree(
JNIEnv*, jobject, jlong jhandle, jboolean jgiven_decree) {
reinterpret_cast<rocksdb::WriteOptions*>(jhandle)->given_decree =
static_cast<uint64_t>(jgiven_decree);
}

/*
* Class: org_rocksdb_WriteOptions
* Method: GivenDecree
* Signature: (J)Z
*/
jboolean Java_org_rocksdb_WriteOptions_GivenDecree(
JNIEnv*, jobject, jlong jhandle) {
return reinterpret_cast<rocksdb::WriteOptions*>(jhandle)->given_decree;
}


/////////////////////////////////////////////////////////////////////
// rocksdb::ReadOptions

Expand Down
27 changes: 27 additions & 0 deletions java/src/main/java/org/rocksdb/HdfsEnv.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

package org.rocksdb;

/**
* HDFS environment.
*/
public class HdfsEnv extends Env {

/**
<p>Creates a new environment that is used for HDFS environment.</p>
*
* <p>The caller must delete the result when it is
* no longer needed.</p>
*
* @param fsName the HDFS as a string in the form "hdfs://hostname:port/"
*/
public HdfsEnv(final String fsName) {
super(createHdfsEnv(fsName));
}

private static native long createHdfsEnv(final String fsName);
@Override protected final native void disposeInternal(final long handle);
}
48 changes: 48 additions & 0 deletions java/src/main/java/org/rocksdb/WriteOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,50 @@ public boolean noSlowdown() {
return noSlowdown(nativeHandle_);
}

/**
* If true, this write request is of lower priority if compaction is
* behind. In this case that, {@link #noSlowdown()} == true, the request
* will be cancelled immediately with {@link Status.Code#Incomplete} returned.
* Otherwise, it will be slowed down. The slowdown value is determined by
* RocksDB to guarantee it introduces minimum impacts to high priority writes.
*
* Default: false
*
* @param lowPri true if the write request should be of lower priority than
* compactions which are behind.
*
* @return the instance of the current WriteOptions.
*/
public WriteOptions setLowPri(final boolean lowPri) {
setLowPri(nativeHandle_, lowPri);
return this;
}

/**
* Returns true if this write request is of lower priority if compaction is
* behind.
*
* See {@link #setLowPri(boolean)}.
*
* @return true if this write request is of lower priority, false otherwise.
*/
public boolean lowPri() {
return lowPri(nativeHandle_);
}

/**
*/
public WriteOptions setGivenDecree(final long givenDecree) {
setGivenDecree(nativeHandle_, givenDecree);
return this;
}

/**
*/
public boolean givenDecree() {
return givenDecree(nativeHandle_);
}

private native static long newWriteOptions();
private native void setSync(long handle, boolean flag);
private native boolean sync(long handle);
Expand All @@ -155,5 +199,9 @@ private native void setIgnoreMissingColumnFamilies(final long handle,
private native void setNoSlowdown(final long handle,
final boolean noSlowdown);
private native boolean noSlowdown(final long handle);
private native void setLowPri(final long handle, final boolean lowPri);
private native boolean lowPri(final long handle);
private native void setGivenDecree(final long handle, final long givenDecree);
private native boolean givenDecree(final long handle);
@Override protected final native void disposeInternal(final long handle);
}

0 comments on commit ecef9ea

Please sign in to comment.