Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion cpp/doc/HDFS.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,31 @@ LD_LIBRARY_PATH), and relies on some environment variables.
export CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath --glob`
```

#### Setting $JAVA_HOME automatically on OS X
### Mac Specifics

The installed location of Java on OS X can vary, however the following snippet
will set it automatically for you:

```shell
export JAVA_HOME=$(/usr/libexec/java_home)
```

Homebrew's Hadoop does not have native libs. Apache doesn't build these, so
users must build Hadoop to get the native libs. See this Stack Overflow
answer for details:

http://stackoverflow.com/a/40051353/478288

Be sure to include the path to the native libs in `JAVA_LIBRARY_PATH`:

```shell
export JAVA_LIBRARY_PATH=$HADOOP_HOME/lib/native:$JAVA_LIBRARY_PATH
```

If you get an error about needing to install Java 6, then add *BundledApp* and
*JNI* to the `JVMCapabilities` in `$JAVA_HOME/../Info.plist`. See

https://oliverdowling.com.au/2015/10/09/oracles-jre-8-on-mac-os-x-el-capitan/

https://derflounder.wordpress.com/2015/08/08/modifying-oracles-java-sdk-to-run-java-applications-on-os-x/

16 changes: 15 additions & 1 deletion cpp/src/arrow/io/hdfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -287,12 +287,25 @@ class HdfsClient::HdfsClientImpl {
Status Connect(const HdfsConnectionConfig* config) {
RETURN_NOT_OK(ConnectLibHdfs());

fs_ = hdfsConnectAsUser(config->host.c_str(), config->port, config->user.c_str());
// connect to HDFS with the builder object
hdfsBuilder* builder = hdfsNewBuilder();
if (!config->host.empty()) {
hdfsBuilderSetNameNode(builder, config->host.c_str());
}
hdfsBuilderSetNameNodePort(builder, config->port);
if (!config->user.empty()) {
hdfsBuilderSetUserName(builder, config->user.c_str());
}
if (!config->kerb_ticket.empty()) {
hdfsBuilderSetKerbTicketCachePath(builder, config->kerb_ticket.c_str());
}
Copy link
Member

@wesm wesm Oct 28, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if any of these 3 strings are empty?

fs_ = hdfsBuilderConnect(builder);

if (fs_ == nullptr) { return Status::IOError("HDFS connection failed"); }
namenode_host_ = config->host;
port_ = config->port;
user_ = config->user;
kerb_ticket_ = config->kerb_ticket;

return Status::OK();
}
Expand Down Expand Up @@ -425,6 +438,7 @@ class HdfsClient::HdfsClientImpl {
std::string namenode_host_;
std::string user_;
int port_;
std::string kerb_ticket_;

hdfsFS fs_;
};
Expand Down
9 changes: 3 additions & 6 deletions cpp/src/arrow/io/hdfs.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,16 @@ struct HdfsConnectionConfig {
std::string host;
int port;
std::string user;

// TODO: Kerberos, etc.
std::string kerb_ticket;
};

class ARROW_EXPORT HdfsClient : public FileSystemClient {
public:
~HdfsClient();

// Connect to an HDFS cluster at indicated host, port, and as user
// Connect to an HDFS cluster given a configuration
//
// @param host (in)
// @param port (in)
// @param user (in): user to identify as
// @param config (in): configuration for connecting
// @param fs (out): the created client
// @returns Status
static Status Connect(
Expand Down
87 changes: 64 additions & 23 deletions cpp/src/arrow/io/libhdfs_shim.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,17 @@ static HINSTANCE libjvm_handle = NULL;

// NOTE(wesm): cpplint does not like use of short and other imprecise C types

static hdfsFS (*ptr_hdfsConnectAsUser)(
const char* host, tPort port, const char* user) = NULL;
static hdfsFS (*ptr_hdfsConnect)(const char* host, tPort port) = NULL;
static hdfsBuilder* (*ptr_hdfsNewBuilder)(void) = NULL;
static void (*ptr_hdfsBuilderSetNameNode)(
hdfsBuilder* bld, const char* nn) = NULL;
static void (*ptr_hdfsBuilderSetNameNodePort)(
hdfsBuilder* bld, tPort port) = NULL;
static void (*ptr_hdfsBuilderSetUserName)(
hdfsBuilder* bld, const char* userName) = NULL;
static void (*ptr_hdfsBuilderSetKerbTicketCachePath)(
hdfsBuilder* bld, const char* kerbTicketCachePath) = NULL;
static hdfsFS (*ptr_hdfsBuilderConnect)(hdfsBuilder* bld) = NULL;

static int (*ptr_hdfsDisconnect)(hdfsFS fs) = NULL;

static hdfsFile (*ptr_hdfsOpenFile)(hdfsFS fs, const char* path, int flags,
Expand Down Expand Up @@ -149,18 +157,29 @@ static void* get_symbol(const char* symbol) {
#endif
}

hdfsFS hdfsConnectAsUser(const char* host, tPort port, const char* user) {
return ptr_hdfsConnectAsUser(host, port, user);
hdfsBuilder* hdfsNewBuilder(void) {
return ptr_hdfsNewBuilder();
}

// Returns NULL on failure
hdfsFS hdfsConnect(const char* host, tPort port) {
if (ptr_hdfsConnect) {
return ptr_hdfsConnect(host, port);
} else {
// TODO: error reporting when shim setup fails
return NULL;
}
void hdfsBuilderSetNameNode(hdfsBuilder* bld, const char* nn) {
ptr_hdfsBuilderSetNameNode(bld, nn);
}

void hdfsBuilderSetNameNodePort(hdfsBuilder* bld, tPort port) {
ptr_hdfsBuilderSetNameNodePort(bld, port);
}

void hdfsBuilderSetUserName(hdfsBuilder* bld, const char* userName) {
ptr_hdfsBuilderSetUserName(bld, userName);
}

void hdfsBuilderSetKerbTicketCachePath(hdfsBuilder* bld,
const char* kerbTicketCachePath) {
ptr_hdfsBuilderSetKerbTicketCachePath(bld , kerbTicketCachePath);
}

hdfsFS hdfsBuilderConnect(hdfsBuilder* bld) {
return ptr_hdfsBuilderConnect(bld);
}

int hdfsDisconnect(hdfsFS fs) {
Expand Down Expand Up @@ -342,18 +361,36 @@ int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime) {
}

static std::vector<fs::path> get_potential_libhdfs_paths() {
std::vector<fs::path> libhdfs_potential_paths = {
// find one in the local directory
fs::path("./libhdfs.so"), fs::path("./hdfs.dll"),
// find a global libhdfs.so
fs::path("libhdfs.so"), fs::path("hdfs.dll"),
std::vector<fs::path> libhdfs_potential_paths;
std::string file_name;

// OS-specific file name
#ifdef __WIN32
file_name = "hdfs.dll";
#elif __APPLE__
file_name = "libhdfs.dylib";
#else
file_name = "libhdfs.so";
#endif

// Common paths
std::vector<fs::path> search_paths = {
fs::path(""),
fs::path(".")
};

// Path from environment variable
const char* hadoop_home = std::getenv("HADOOP_HOME");
if (hadoop_home != nullptr) {
auto path = fs::path(hadoop_home) / "lib/native/libhdfs.so";
libhdfs_potential_paths.push_back(path);
auto path = fs::path(hadoop_home) / "lib/native";
search_paths.push_back(path);
}

// All paths with file name
for (auto& path : search_paths) {
libhdfs_potential_paths.push_back(path / file_name);
}

return libhdfs_potential_paths;
}

Expand All @@ -371,7 +408,7 @@ static std::vector<fs::path> get_potential_libjvm_paths() {
file_name = "jvm.dll";
#elif __APPLE__
search_prefixes = {""};
search_suffixes = {""};
search_suffixes = {"", "/jre/lib/server"};
file_name = "libjvm.dylib";

// SFrame uses /usr/libexec/java_home to find JAVA_HOME; for now we are
Expand Down Expand Up @@ -513,8 +550,12 @@ Status ARROW_EXPORT ConnectLibHdfs() {
return Status::IOError("Prior attempt to load libhdfs failed");
}

GET_SYMBOL_REQUIRED(hdfsConnect);
GET_SYMBOL_REQUIRED(hdfsConnectAsUser);
GET_SYMBOL_REQUIRED(hdfsNewBuilder);
GET_SYMBOL_REQUIRED(hdfsBuilderSetNameNode);
GET_SYMBOL_REQUIRED(hdfsBuilderSetNameNodePort);
GET_SYMBOL_REQUIRED(hdfsBuilderSetUserName);
GET_SYMBOL_REQUIRED(hdfsBuilderSetKerbTicketCachePath);
GET_SYMBOL_REQUIRED(hdfsBuilderConnect);
GET_SYMBOL_REQUIRED(hdfsCreateDirectory);
GET_SYMBOL_REQUIRED(hdfsDelete);
GET_SYMBOL_REQUIRED(hdfsDisconnect);
Expand Down
1 change: 1 addition & 0 deletions python/pyarrow/includes/libarrow_io.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ cdef extern from "arrow/io/hdfs.h" namespace "arrow::io" nogil:
c_string host
int port
c_string user
c_string kerb_ticket

cdef cppclass HdfsPathInfo:
ObjectType kind;
Expand Down
29 changes: 20 additions & 9 deletions python/pyarrow/io.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,6 @@ cdef class HdfsClient:
shared_ptr[CHdfsClient] client

cdef readonly:
object host
int port
object user
bint is_open

def __cinit__(self):
Expand All @@ -301,6 +298,9 @@ cdef class HdfsClient:
self.close()

def close(self):
"""
Disconnect from the HDFS cluster
"""
self._ensure_client()
with nogil:
check_status(self.client.get().Disconnect())
Expand All @@ -313,14 +313,21 @@ cdef class HdfsClient:
raise IOError('HDFS client is closed')

@classmethod
def connect(cls, host, port, user):
def connect(cls, host="default", port=0, user=None, kerb_ticket=None):
"""
Connect to an HDFS cluster. All parameters are optional and should
only be set if the defaults need to be overridden.

Authentication should be automatic if the HDFS cluster uses Kerberos.
However, if a username is specified, then the ticket cache will likely
be required.

Parameters
----------
host :
port :
user :
host : NameNode. Set to "default" for fs.defaultFS from core-site.xml.
port : NameNode's port. Set to 0 for default or logical (HA) nodes.
user : Username when connecting to HDFS; None implies login user.
kerb_ticket : Path to Kerberos ticket cache.

Notes
-----
Expand All @@ -335,9 +342,13 @@ cdef class HdfsClient:
HdfsClient out = HdfsClient()
HdfsConnectionConfig conf

conf.host = tobytes(host)
if host is not None:
conf.host = tobytes(host)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is None a valid option?

conf.port = port
conf.user = tobytes(user)
if user is not None:
conf.user = tobytes(user)
if kerb_ticket is not None:
conf.kerb_ticket = tobytes(kerb_ticket)

with nogil:
check_status(CHdfsClient.Connect(&conf, &out.client))
Expand Down