Skip to content

Commit b1d1633

Browse files
Matthew Topolxhochy
authored andcommitted
ARROW-2661: [Python] Adding the ability to programmatically pass hdfs configration key/value pairs via pyarrow
https://issues.apache.org/jira/browse/ARROW-2661 Both the JNI and libhdfs3 support hdfsBuilderConfSetStr so we can utilize that to allow passing arbitrary configuration values for hdfs connection similiar to how https://hdfs3.readthedocs.io/en/latest/hdfs.html supports passing them. I've added a param called `extra_conf` to facilitate it in pyarrow, such as: ```python import pyarrow conf = {"dfs.nameservices": "nameservice1", "dfs.ha.namenodes.nameservice1": "namenode113,namenode188", "dfs.namenode.rpc-address.nameservice1.namenode113": "hostname_of_server1:8020", "dfs.namenode.rpc-address.nameservice1.namenode188": "hostname_of_server2:8020", "dfs.namenode.http-address.nameservice1.namenode188": "hostname_of_server1:50070", "dfs.namenode.http-address.nameservice1.namenode188": "hostname_of_server2:50070", "hadoop.security.authentication": "kerberos" } hdfs = pyarrow.hdfs.connect(host='nameservice1', driver='libhdfs3', extra_conf=conf) ``` Author: Matthew Topol <mtopol@factset.com> Closes #2097 from zeroshade/configs and squashes the following commits: 047dd4b <Matthew Topol> forgot to use make format to fix the order of includes. oops d27e3c3 <Matthew Topol> switching to unordered_map 858b44b <Matthew Topol> missed a flake8 spot 77eeae0 <Matthew Topol> Adding the ability to programmatically pass hdfs configuration key/value pairs in the C++ and via pyarrow
1 parent b9b4376 commit b1d1633

File tree

7 files changed

+32
-7
lines changed

7 files changed

+32
-7
lines changed

cpp/src/arrow/io/hdfs-internal.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,10 @@ hdfsFS LibHdfsShim::BuilderConnect(hdfsBuilder* bld) {
318318
return this->hdfsBuilderConnect(bld);
319319
}
320320

321+
int LibHdfsShim::BuilderConfSetStr(hdfsBuilder* bld, const char* key, const char* val) {
322+
return this->hdfsBuilderConfSetStr(bld, key, val);
323+
}
324+
321325
int LibHdfsShim::Disconnect(hdfsFS fs) { return this->hdfsDisconnect(fs); }
322326

323327
hdfsFile LibHdfsShim::OpenFile(hdfsFS fs, const char* path, int flags, int bufferSize,
@@ -495,6 +499,7 @@ Status LibHdfsShim::GetRequiredSymbols() {
495499
GET_SYMBOL_REQUIRED(this, hdfsBuilderSetUserName);
496500
GET_SYMBOL_REQUIRED(this, hdfsBuilderSetKerbTicketCachePath);
497501
GET_SYMBOL_REQUIRED(this, hdfsBuilderSetForceNewInstance);
502+
GET_SYMBOL_REQUIRED(this, hdfsBuilderConfSetStr);
498503
GET_SYMBOL_REQUIRED(this, hdfsBuilderConnect);
499504
GET_SYMBOL_REQUIRED(this, hdfsCreateDirectory);
500505
GET_SYMBOL_REQUIRED(this, hdfsDelete);

cpp/src/arrow/io/hdfs-internal.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ struct LibHdfsShim {
5353
const char* kerbTicketCachePath);
5454
void (*hdfsBuilderSetForceNewInstance)(hdfsBuilder* bld);
5555
hdfsFS (*hdfsBuilderConnect)(hdfsBuilder* bld);
56+
int (*hdfsBuilderConfSetStr)(hdfsBuilder* bld, const char* key, const char* val);
5657

5758
int (*hdfsDisconnect)(hdfsFS fs);
5859

@@ -97,6 +98,7 @@ struct LibHdfsShim {
9798
this->hdfsBuilderSetUserName = nullptr;
9899
this->hdfsBuilderSetKerbTicketCachePath = nullptr;
99100
this->hdfsBuilderSetForceNewInstance = nullptr;
101+
this->hdfsBuilderConfSetStr = nullptr;
100102
this->hdfsBuilderConnect = nullptr;
101103
this->hdfsDisconnect = nullptr;
102104
this->hdfsOpenFile = nullptr;
@@ -142,6 +144,8 @@ struct LibHdfsShim {
142144

143145
void BuilderSetForceNewInstance(hdfsBuilder* bld);
144146

147+
int BuilderConfSetStr(hdfsBuilder* bld, const char* key, const char* val);
148+
145149
hdfsFS BuilderConnect(hdfsBuilder* bld);
146150

147151
int Disconnect(hdfsFS fs);

cpp/src/arrow/io/hdfs.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,12 @@ class HadoopFileSystem::HadoopFileSystemImpl {
335335
if (!config->kerb_ticket.empty()) {
336336
driver_->BuilderSetKerbTicketCachePath(builder, config->kerb_ticket.c_str());
337337
}
338+
339+
for (auto& kv : config->extra_conf) {
340+
int ret = driver_->BuilderConfSetStr(builder, kv.first.c_str(), kv.second.c_str());
341+
CHECK_FAILURE(ret, "confsetstr");
342+
}
343+
338344
driver_->BuilderSetForceNewInstance(builder);
339345
fs_ = driver_->BuilderConnect(builder);
340346

cpp/src/arrow/io/hdfs.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <cstdint>
2222
#include <memory>
2323
#include <string>
24+
#include <unordered_map>
2425
#include <vector>
2526

2627
#include "arrow/io/interfaces.h"
@@ -63,6 +64,7 @@ struct HdfsConnectionConfig {
6364
int port;
6465
std::string user;
6566
std::string kerb_ticket;
67+
std::unordered_map<std::string, std::string> extra_conf;
6668
HdfsDriver driver;
6769
};
6870

python/pyarrow/hdfs.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,16 @@ class HadoopFileSystem(lib.HadoopFileSystem, FileSystem):
3030
"""
3131

3232
def __init__(self, host="default", port=0, user=None, kerb_ticket=None,
33-
driver='libhdfs'):
33+
driver='libhdfs', extra_conf=None):
3434
if driver == 'libhdfs':
3535
_maybe_set_hadoop_classpath()
3636

37-
self._connect(host, port, user, kerb_ticket, driver)
37+
self._connect(host, port, user, kerb_ticket, driver, extra_conf)
3838

3939
def __reduce__(self):
4040
return (HadoopFileSystem, (self.host, self.port, self.user,
41-
self.kerb_ticket, self.driver))
41+
self.kerb_ticket, self.driver,
42+
self.extra_conf))
4243

4344
def _isfilestore(self):
4445
"""
@@ -149,7 +150,7 @@ def _libhdfs_walk_files_dirs(top_path, contents):
149150

150151

151152
def connect(host="default", port=0, user=None, kerb_ticket=None,
152-
driver='libhdfs'):
153+
driver='libhdfs', extra_conf=None):
153154
"""
154155
Connect to an HDFS cluster. All parameters are optional and should
155156
only be set if the defaults need to be overridden.
@@ -178,5 +179,6 @@ def connect(host="default", port=0, user=None, kerb_ticket=None,
178179
filesystem : HadoopFileSystem
179180
"""
180181
fs = HadoopFileSystem(host=host, port=port, user=user,
181-
kerb_ticket=kerb_ticket, driver=driver)
182+
kerb_ticket=kerb_ticket, driver=driver,
183+
extra_conf=extra_conf)
182184
return fs

python/pyarrow/includes/libarrow.pxd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
from pyarrow.includes.common cimport *
2121

22-
2322
cdef extern from "arrow/util/key_value_metadata.h" namespace "arrow" nogil:
2423
cdef cppclass CKeyValueMetadata" arrow::KeyValueMetadata":
2524
CKeyValueMetadata()
@@ -656,6 +655,7 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil:
656655
int port
657656
c_string user
658657
c_string kerb_ticket
658+
unordered_map[c_string, c_string] extra_conf
659659
HdfsDriver driver
660660

661661
cdef cppclass HdfsPathInfo:

python/pyarrow/io-hdfs.pxi

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,9 @@ cdef class HadoopFileSystem:
6464
str kerb_ticket
6565
str driver
6666
int port
67+
dict extra_conf
6768

68-
def _connect(self, host, port, user, kerb_ticket, driver):
69+
def _connect(self, host, port, user, kerb_ticket, driver, extra_conf):
6970
cdef HdfsConnectionConfig conf
7071

7172
if host is not None:
@@ -95,6 +96,11 @@ cdef class HadoopFileSystem:
9596
raise ValueError("unknown driver: %r" % driver)
9697
self.driver = driver
9798

99+
if extra_conf is not None and isinstance(extra_conf, dict):
100+
conf.extra_conf = {tobytes(k): tobytes(v)
101+
for k, v in extra_conf.items()}
102+
self.extra_conf = extra_conf
103+
98104
with nogil:
99105
check_status(CHadoopFileSystem.Connect(&conf, &self.client))
100106
self.is_open = True

0 commit comments

Comments
 (0)