Skip to content

Commit a1c0948

Browse files
author
yuzelin
committed
T
1 parent 31abdd2 commit a1c0948

File tree

2 files changed

+70
-32
lines changed

2 files changed

+70
-32
lines changed

paimon_python_java/gateway_server.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ def _get_hadoop_classpath(env):
103103
return env[constants.PYPAIMON_HADOOP_CLASSPATH]
104104

105105
if 'HADOOP_CLASSPATH' in env:
106-
return None
106+
return env['HADOOP_CLASSPATH']
107107
else:
108108
raise EnvironmentError(f"You haven't set '{constants.PYPAIMON_HADOOP_CLASSPATH}', \
109109
and 'HADOOP_CLASSPATH' is also not set. Ensure one of them is set.")

paimon_python_java/tests/test_preicates.py

Lines changed: 69 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -54,44 +54,52 @@ def setUpClass(cls):
5454

5555
catalog = Catalog.create({'warehouse': cls.warehouse})
5656
catalog.create_database('default', False)
57-
58-
pa_schema = pa.schema([
59-
('f0', pa.int64()),
60-
('f1', pa.string()),
61-
])
62-
catalog.create_table('default.test_append',
63-
Schema(pa_schema, options={'file.format': _random_format()}),
64-
False)
65-
catalog.create_table('default.test_pk',
66-
Schema(pa_schema, primary_keys=['f0'],
67-
options={'bucket': '1', 'file.format': _random_format()}),
68-
False)
57+
cls.catalog = catalog
6958

7059
df = pd.DataFrame({
7160
'f0': [1, 2, 3, 4, 5],
7261
'f1': ['abc', 'abbc', 'bc', 'd', None],
7362
})
63+
cls.df = df
7464

75-
append_table = catalog.get_table('default.test_append')
76-
write_builder = append_table.new_batch_write_builder()
77-
write = write_builder.new_write()
78-
commit = write_builder.new_commit()
79-
write.write_pandas(df)
80-
commit.commit(write.prepare_commit())
81-
write.close()
82-
commit.close()
83-
84-
pk_table = catalog.get_table('default.test_pk')
85-
write_builder = pk_table.new_batch_write_builder()
86-
write = write_builder.new_write()
87-
commit = write_builder.new_commit()
88-
write.write_pandas(df)
89-
commit.commit(write.prepare_commit())
90-
write.close()
91-
commit.close()
65+
def _init(self):
66+
pa_schema = pa.schema([
67+
('f0', pa.int64()),
68+
('f1', pa.string()),
69+
])
9270

93-
cls.catalog = catalog
94-
cls.df = df
71+
try:
72+
self.catalog.get_table('default.test_append')
73+
except Exception:
74+
self.catalog.create_table(
75+
'default.test_append',
76+
Schema(pa_schema, options={'file.format': _random_format()}),
77+
True)
78+
append_table = self.catalog.get_table('default.test_append')
79+
write_builder = append_table.new_batch_write_builder()
80+
write = write_builder.new_write()
81+
commit = write_builder.new_commit()
82+
write.write_pandas(self.df)
83+
commit.commit(write.prepare_commit())
84+
write.close()
85+
commit.close()
86+
87+
try:
88+
self.catalog.get_table('default.test_pk')
89+
except Exception:
90+
self.catalog.create_table(
91+
'default.test_pk',
92+
Schema(pa_schema, primary_keys=['f0'],
93+
options={'bucket': '1', 'file.format': _random_format()}),
94+
False)
95+
pk_table = self.catalog.get_table('default.test_pk')
96+
write_builder = pk_table.new_batch_write_builder()
97+
write = write_builder.new_write()
98+
commit = write_builder.new_commit()
99+
write.write_pandas(self.df)
100+
commit.commit(write.prepare_commit())
101+
write.close()
102+
commit.close()
95103

96104
@classmethod
97105
def tearDownClass(cls):
@@ -102,6 +110,7 @@ def tearDownClass(cls):
102110
shutil.rmtree(cls.warehouse)
103111

104112
def testWrongFieldName(self):
113+
self._init()
105114
table = self.catalog.get_table('default.test_append')
106115
predicate_builder = table.new_read_builder().new_predicate_builder()
107116
with self.assertRaises(ValueError) as e:
@@ -215,168 +224,196 @@ def testAllFieldTypesWithEqual(self):
215224
_check_filtered_result(table.new_read_builder().with_filter(predicate), df.loc[[0]])
216225

217226
def testEqualPk(self):
227+
self._init()
218228
table = self.catalog.get_table('default.test_pk')
219229
predicate_builder = table.new_read_builder().new_predicate_builder()
220230
predicate = predicate_builder.equal('f0', 1)
221231
_check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[[0]])
222232

223233
def testNotEqualAppend(self):
234+
self._init()
224235
table = self.catalog.get_table('default.test_append')
225236
predicate_builder = table.new_read_builder().new_predicate_builder()
226237
predicate = predicate_builder.not_equal('f0', 1)
227238
_check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[1:4])
228239

229240
def testNotEqualPk(self):
241+
self._init()
230242
table = self.catalog.get_table('default.test_pk')
231243
predicate_builder = table.new_read_builder().new_predicate_builder()
232244
predicate = predicate_builder.not_equal('f0', 1)
233245
_check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[1:4])
234246

235247
def testLessThanAppend(self):
248+
self._init()
236249
table = self.catalog.get_table('default.test_append')
237250
predicate_builder = table.new_read_builder().new_predicate_builder()
238251
predicate = predicate_builder.less_than('f0', 3)
239252
_check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:1])
240253

241254
def testLessThanPk(self):
255+
self._init()
242256
table = self.catalog.get_table('default.test_pk')
243257
predicate_builder = table.new_read_builder().new_predicate_builder()
244258
predicate = predicate_builder.less_than('f0', 3)
245259
_check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:1])
246260

247261
def testLessOrEqualAppend(self):
262+
self._init()
248263
table = self.catalog.get_table('default.test_append')
249264
predicate_builder = table.new_read_builder().new_predicate_builder()
250265
predicate = predicate_builder.less_or_equal('f0', 3)
251266
_check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:2])
252267

253268
def testLessOrEqualPk(self):
269+
self._init()
254270
table = self.catalog.get_table('default.test_pk')
255271
predicate_builder = table.new_read_builder().new_predicate_builder()
256272
predicate = predicate_builder.less_or_equal('f0', 3)
257273
_check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:2])
258274

259275
def testGreaterThanAppend(self):
276+
self._init()
260277
table = self.catalog.get_table('default.test_append')
261278
predicate_builder = table.new_read_builder().new_predicate_builder()
262279
predicate = predicate_builder.greater_than('f0', 3)
263280
_check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[3:4])
264281

265282
def testGreaterThanPk(self):
283+
self._init()
266284
table = self.catalog.get_table('default.test_pk')
267285
predicate_builder = table.new_read_builder().new_predicate_builder()
268286
predicate = predicate_builder.greater_than('f0', 3)
269287
_check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[3:4])
270288

271289
def testGreaterOrEqualAppend(self):
290+
self._init()
272291
table = self.catalog.get_table('default.test_append')
273292
predicate_builder = table.new_read_builder().new_predicate_builder()
274293
predicate = predicate_builder.greater_or_equal('f0', 3)
275294
_check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[2:4])
276295

277296
def testGreaterOrEqualPk(self):
297+
self._init()
278298
table = self.catalog.get_table('default.test_pk')
279299
predicate_builder = table.new_read_builder().new_predicate_builder()
280300
predicate = predicate_builder.greater_or_equal('f0', 3)
281301
_check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[2:4])
282302

283303
def testIsNullAppend(self):
304+
self._init()
284305
table = self.catalog.get_table('default.test_append')
285306
predicate_builder = table.new_read_builder().new_predicate_builder()
286307
predicate = predicate_builder.is_null('f1')
287308
_check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[[4]])
288309

289310
def testIsNullPk(self):
311+
self._init()
290312
table = self.catalog.get_table('default.test_pk')
291313
predicate_builder = table.new_read_builder().new_predicate_builder()
292314
predicate = predicate_builder.is_null('f1')
293315
_check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[[4]])
294316

295317
def testIsNotNullAppend(self):
318+
self._init()
296319
table = self.catalog.get_table('default.test_append')
297320
predicate_builder = table.new_read_builder().new_predicate_builder()
298321
predicate = predicate_builder.is_not_null('f1')
299322
_check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:3])
300323

301324
def testIsNotNullPk(self):
325+
self._init()
302326
table = self.catalog.get_table('default.test_pk')
303327
predicate_builder = table.new_read_builder().new_predicate_builder()
304328
predicate = predicate_builder.is_not_null('f1')
305329
_check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:3])
306330

307331
def testStartswithAppend(self):
332+
self._init()
308333
table = self.catalog.get_table('default.test_append')
309334
predicate_builder = table.new_read_builder().new_predicate_builder()
310335
predicate = predicate_builder.startswith('f1', 'ab')
311336
_check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:1])
312337

313338
def testStartswithPk(self):
339+
self._init()
314340
table = self.catalog.get_table('default.test_pk')
315341
predicate_builder = table.new_read_builder().new_predicate_builder()
316342
predicate = predicate_builder.startswith('f1', 'ab')
317343
_check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:1])
318344

319345
def testEndswithAppend(self):
346+
self._init()
320347
table = self.catalog.get_table('default.test_append')
321348
predicate_builder = table.new_read_builder().new_predicate_builder()
322349
predicate = predicate_builder.endswith('f1', 'bc')
323350
_check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:2])
324351

325352
def testEndswithPk(self):
353+
self._init()
326354
table = self.catalog.get_table('default.test_pk')
327355
predicate_builder = table.new_read_builder().new_predicate_builder()
328356
predicate = predicate_builder.endswith('f1', 'bc')
329357
_check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:2])
330358

331359
def testContainsAppend(self):
360+
self._init()
332361
table = self.catalog.get_table('default.test_append')
333362
predicate_builder = table.new_read_builder().new_predicate_builder()
334363
predicate = predicate_builder.contains('f1', 'bb')
335364
_check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[[1]])
336365

337366
def testContainsPk(self):
367+
self._init()
338368
table = self.catalog.get_table('default.test_pk')
339369
predicate_builder = table.new_read_builder().new_predicate_builder()
340370
predicate = predicate_builder.contains('f1', 'bb')
341371
_check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[[1]])
342372

343373
def testIsInAppend(self):
374+
self._init()
344375
table = self.catalog.get_table('default.test_append')
345376
predicate_builder = table.new_read_builder().new_predicate_builder()
346377
predicate = predicate_builder.is_in('f0', [1, 2])
347378
_check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:1])
348379

349380
def testIsInPk(self):
381+
self._init()
350382
table = self.catalog.get_table('default.test_pk')
351383
predicate_builder = table.new_read_builder().new_predicate_builder()
352384
predicate = predicate_builder.is_in('f1', ['abc', 'd'])
353385
_check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[[0, 3]])
354386

355387
def testIsNotInAppend(self):
388+
self._init()
356389
table = self.catalog.get_table('default.test_append')
357390
predicate_builder = table.new_read_builder().new_predicate_builder()
358391
predicate = predicate_builder.is_not_in('f0', [1, 2])
359392
_check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[2:4])
360393

361394
def testIsNotInPk(self):
395+
self._init()
362396
table = self.catalog.get_table('default.test_pk')
363397
predicate_builder = table.new_read_builder().new_predicate_builder()
364398
predicate = predicate_builder.is_not_in('f1', ['abc', 'd'])
365399
_check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[1:2])
366400

367401
def testBetweenAppend(self):
402+
self._init()
368403
table = self.catalog.get_table('default.test_append')
369404
predicate_builder = table.new_read_builder().new_predicate_builder()
370405
predicate = predicate_builder.between('f0', 1, 3)
371406
_check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:2])
372407

373408
def testBetweenPk(self):
409+
self._init()
374410
table = self.catalog.get_table('default.test_pk')
375411
predicate_builder = table.new_read_builder().new_predicate_builder()
376412
predicate = predicate_builder.between('f0', 1, 3)
377413
_check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[0:2])
378414

379415
def testAndPredicates(self):
416+
self._init()
380417
table = self.catalog.get_table('default.test_append')
381418
predicate_builder = table.new_read_builder().new_predicate_builder()
382419
predicate1 = predicate_builder.greater_than('f0', 1)
@@ -385,6 +422,7 @@ def testAndPredicates(self):
385422
_check_filtered_result(table.new_read_builder().with_filter(predicate), self.df.loc[[1]])
386423

387424
def testOrPredicates(self):
425+
self._init()
388426
table = self.catalog.get_table('default.test_append')
389427
predicate_builder = table.new_read_builder().new_predicate_builder()
390428
predicate1 = predicate_builder.greater_than('f0', 3)

0 commit comments

Comments
 (0)