Skip to content

Commit 77eeae0

Browse files
author
Matthew Topol
committed
[ARROW-2661] Adding the ability to programmatically pass hdfs configuration key/value pairs in the C++ and via pyarrow
1 parent 79a2207 commit 77eeae0

File tree

7 files changed

+32
-7
lines changed

7 files changed

+32
-7
lines changed

cpp/src/arrow/io/hdfs-internal.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,10 @@ hdfsFS LibHdfsShim::BuilderConnect(hdfsBuilder* bld) {
318318
return this->hdfsBuilderConnect(bld);
319319
}
320320

321+
int LibHdfsShim::BuilderConfSetStr(hdfsBuilder* bld, const char* key, const char* val) {
322+
return this->hdfsBuilderConfSetStr(bld, key, val);
323+
}
324+
321325
int LibHdfsShim::Disconnect(hdfsFS fs) { return this->hdfsDisconnect(fs); }
322326

323327
hdfsFile LibHdfsShim::OpenFile(hdfsFS fs, const char* path, int flags, int bufferSize,
@@ -495,6 +499,7 @@ Status LibHdfsShim::GetRequiredSymbols() {
495499
GET_SYMBOL_REQUIRED(this, hdfsBuilderSetUserName);
496500
GET_SYMBOL_REQUIRED(this, hdfsBuilderSetKerbTicketCachePath);
497501
GET_SYMBOL_REQUIRED(this, hdfsBuilderSetForceNewInstance);
502+
GET_SYMBOL_REQUIRED(this, hdfsBuilderConfSetStr);
498503
GET_SYMBOL_REQUIRED(this, hdfsBuilderConnect);
499504
GET_SYMBOL_REQUIRED(this, hdfsCreateDirectory);
500505
GET_SYMBOL_REQUIRED(this, hdfsDelete);

cpp/src/arrow/io/hdfs-internal.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ struct LibHdfsShim {
5353
const char* kerbTicketCachePath);
5454
void (*hdfsBuilderSetForceNewInstance)(hdfsBuilder* bld);
5555
hdfsFS (*hdfsBuilderConnect)(hdfsBuilder* bld);
56+
int (*hdfsBuilderConfSetStr)(hdfsBuilder* bld, const char* key, const char* val);
5657

5758
int (*hdfsDisconnect)(hdfsFS fs);
5859

@@ -97,6 +98,7 @@ struct LibHdfsShim {
9798
this->hdfsBuilderSetUserName = nullptr;
9899
this->hdfsBuilderSetKerbTicketCachePath = nullptr;
99100
this->hdfsBuilderSetForceNewInstance = nullptr;
101+
this->hdfsBuilderConfSetStr = nullptr;
100102
this->hdfsBuilderConnect = nullptr;
101103
this->hdfsDisconnect = nullptr;
102104
this->hdfsOpenFile = nullptr;
@@ -142,6 +144,8 @@ struct LibHdfsShim {
142144

143145
void BuilderSetForceNewInstance(hdfsBuilder* bld);
144146

147+
int BuilderConfSetStr(hdfsBuilder* bld, const char* key, const char* val);
148+
145149
hdfsFS BuilderConnect(hdfsBuilder* bld);
146150

147151
int Disconnect(hdfsFS fs);

cpp/src/arrow/io/hdfs.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,12 @@ class HadoopFileSystem::HadoopFileSystemImpl {
335335
if (!config->kerb_ticket.empty()) {
336336
driver_->BuilderSetKerbTicketCachePath(builder, config->kerb_ticket.c_str());
337337
}
338+
339+
for (auto& kv : config->extra_conf) {
340+
int ret = driver_->BuilderConfSetStr(builder, kv.first.c_str(), kv.second.c_str());
341+
CHECK_FAILURE(ret, "confsetstr");
342+
}
343+
338344
driver_->BuilderSetForceNewInstance(builder);
339345
fs_ = driver_->BuilderConnect(builder);
340346

cpp/src/arrow/io/hdfs.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#define ARROW_IO_HDFS
2020

2121
#include <cstdint>
22+
#include <map>
2223
#include <memory>
2324
#include <string>
2425
#include <vector>
@@ -63,6 +64,7 @@ struct HdfsConnectionConfig {
6364
int port;
6465
std::string user;
6566
std::string kerb_ticket;
67+
std::map<std::string, std::string> extra_conf;
6668
HdfsDriver driver;
6769
};
6870

python/pyarrow/hdfs.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,15 @@ class HadoopFileSystem(lib.HadoopFileSystem, FileSystem):
3030
"""
3131

3232
def __init__(self, host="default", port=0, user=None, kerb_ticket=None,
33-
driver='libhdfs'):
33+
driver='libhdfs', extra_conf=None):
3434
if driver == 'libhdfs':
3535
_maybe_set_hadoop_classpath()
3636

37-
self._connect(host, port, user, kerb_ticket, driver)
37+
self._connect(host, port, user, kerb_ticket, driver, extra_conf)
3838

3939
def __reduce__(self):
4040
return (HadoopFileSystem, (self.host, self.port, self.user,
41-
self.kerb_ticket, self.driver))
41+
self.kerb_ticket, self.driver, self.extra_conf))
4242

4343
def _isfilestore(self):
4444
"""
@@ -149,7 +149,7 @@ def _libhdfs_walk_files_dirs(top_path, contents):
149149

150150

151151
def connect(host="default", port=0, user=None, kerb_ticket=None,
152-
driver='libhdfs'):
152+
driver='libhdfs', extra_conf=None):
153153
"""
154154
Connect to an HDFS cluster. All parameters are optional and should
155155
only be set if the defaults need to be overridden.
@@ -178,5 +178,6 @@ def connect(host="default", port=0, user=None, kerb_ticket=None,
178178
filesystem : HadoopFileSystem
179179
"""
180180
fs = HadoopFileSystem(host=host, port=port, user=user,
181-
kerb_ticket=kerb_ticket, driver=driver)
181+
kerb_ticket=kerb_ticket, driver=driver,
182+
extra_conf=extra_conf)
182183
return fs

python/pyarrow/includes/libarrow.pxd

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
# distutils: language = c++
1919

2020
from pyarrow.includes.common cimport *
21-
21+
from libcpp.map cimport map as cppmap
2222

2323
cdef extern from "arrow/util/key_value_metadata.h" namespace "arrow" nogil:
2424
cdef cppclass CKeyValueMetadata" arrow::KeyValueMetadata":
@@ -651,6 +651,7 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil:
651651
int port
652652
c_string user
653653
c_string kerb_ticket
654+
cppmap[c_string, c_string] extra_conf
654655
HdfsDriver driver
655656

656657
cdef cppclass HdfsPathInfo:

python/pyarrow/io-hdfs.pxi

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,9 @@ cdef class HadoopFileSystem:
6464
str kerb_ticket
6565
str driver
6666
int port
67+
dict extra_conf
6768

68-
def _connect(self, host, port, user, kerb_ticket, driver):
69+
def _connect(self, host, port, user, kerb_ticket, driver, extra_conf):
6970
cdef HdfsConnectionConfig conf
7071

7172
if host is not None:
@@ -95,6 +96,11 @@ cdef class HadoopFileSystem:
9596
raise ValueError("unknown driver: %r" % driver)
9697
self.driver = driver
9798

99+
if extra_conf is not None and isinstance(extra_conf, dict):
100+
conf.extra_conf = {tobytes(k): tobytes(v)
101+
for k, v in extra_conf.items()}
102+
self.extra_conf = extra_conf
103+
98104
with nogil:
99105
check_status(CHadoopFileSystem.Connect(&conf, &self.client))
100106
self.is_open = True

0 commit comments

Comments
 (0)