Skip to content

Commit b377440

Browse files
authored
[hive][clone] support clone file format including options (#5575)
1 parent 064db80 commit b377440

File tree

3 files changed

+101
-16
lines changed

3 files changed

+101
-16
lines changed

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ListCloneFilesFunction.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@
4343
import java.util.HashSet;
4444
import java.util.List;
4545
import java.util.Map;
46+
import java.util.Objects;
4647

48+
import static org.apache.paimon.CoreOptions.FILE_FORMAT;
4749
import static org.apache.paimon.utils.Preconditions.checkNotNull;
4850
import static org.apache.paimon.utils.Preconditions.checkState;
4951

@@ -136,6 +138,13 @@ private void checkCompatible(Schema sourceSchema, FileStoreTable existedTable) {
136138
"Can not clone data to existed paimon table which bucket is not -1. Existed paimon table is "
137139
+ existedTable.name());
138140

141+
// check format
142+
checkState(
143+
Objects.equals(
144+
sourceSchema.options().get(FILE_FORMAT.key()),
145+
existedTable.coreOptions().formatType()),
146+
"source table format is not compatible with existed paimon table format.");
147+
139148
// check partition keys
140149
List<String> sourcePartitionFields = sourceSchema.partitionKeys();
141150
List<String> existedPartitionFields = existedSchema.partitionKeys();

paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveCloneUtils.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,12 @@
4949
import java.util.HashMap;
5050
import java.util.List;
5151
import java.util.Map;
52+
import java.util.Objects;
5253
import java.util.function.Predicate;
5354
import java.util.stream.Collectors;
5455

56+
import static org.apache.paimon.CoreOptions.FILE_COMPRESSION;
57+
import static org.apache.paimon.CoreOptions.FILE_FORMAT;
5558
import static org.apache.paimon.hive.HiveTypeUtils.toPaimonType;
5659

5760
/** Utils for cloning Hive table to Paimon table. */
@@ -122,6 +125,20 @@ public static Schema hiveTableToPaimonSchema(HiveCatalog hiveCatalog, Identifier
122125
paimonOptions.put("hive.comment", hiveTableOptions.get("comment"));
123126
}
124127

128+
String format = parseFormat(hiveTable);
129+
paimonOptions.put(FILE_FORMAT.key(), format);
130+
Map<String, String> formatOptions = getIdentifierPrefixOptions(format, hiveTableOptions);
131+
Map<String, String> sdFormatOptions =
132+
getIdentifierPrefixOptions(
133+
format, hiveTable.getSd().getSerdeInfo().getParameters());
134+
formatOptions.putAll(sdFormatOptions);
135+
paimonOptions.putAll(formatOptions);
136+
137+
String compression = parseCompression(hiveTable, format, formatOptions);
138+
if (compression != null) {
139+
paimonOptions.put(FILE_COMPRESSION.key(), compression);
140+
}
141+
125142
Schema.Builder schemaBuilder =
126143
Schema.newBuilder()
127144
.comment(hiveTableOptions.get("comment"))
@@ -226,4 +243,40 @@ public static String parseFormat(Partition partition) {
226243
}
227244
return format;
228245
}
246+
247+
private static String parseCompression(StorageDescriptor storageDescriptor) {
248+
Map<String, String> serderParams = storageDescriptor.getSerdeInfo().getParameters();
249+
if (serderParams.containsKey("compression")) {
250+
return serderParams.get("compression");
251+
}
252+
return null;
253+
}
254+
255+
private static String parseCompression(
256+
Table table, String format, Map<String, String> formatOptions) {
257+
String compression = null;
258+
if (Objects.equals(format, "avro")) {
259+
compression = formatOptions.getOrDefault("avro.codec", parseCompression(table.getSd()));
260+
} else if (Objects.equals(format, "parquet")) {
261+
compression =
262+
formatOptions.getOrDefault(
263+
"parquet.compression", parseCompression(table.getSd()));
264+
} else if (Objects.equals(format, "orc")) {
265+
compression =
266+
formatOptions.getOrDefault("orc.compress", parseCompression(table.getSd()));
267+
}
268+
return compression;
269+
}
270+
271+
public static Map<String, String> getIdentifierPrefixOptions(
272+
String formatIdentifier, Map<String, String> options) {
273+
Map<String, String> result = new HashMap<>();
274+
String prefix = formatIdentifier.toLowerCase() + ".";
275+
for (String key : options.keySet()) {
276+
if (key.toLowerCase().startsWith(prefix)) {
277+
result.put(prefix + key.substring(prefix.length()), options.get(key));
278+
}
279+
}
280+
return result;
281+
}
229282
}

paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import java.util.ArrayList;
4444
import java.util.Arrays;
4545
import java.util.List;
46+
import java.util.Objects;
4647
import java.util.Random;
4748
import java.util.concurrent.ThreadLocalRandom;
4849

@@ -407,7 +408,7 @@ public void testCloneWithExistedTable() throws Exception {
407408
tEnv.executeSql("CREATE DATABASE test");
408409
// create a paimon table with the same name
409410
int ddlIndex = ThreadLocalRandom.current().nextInt(0, 4);
410-
tEnv.executeSql(ddls()[ddlIndex]);
411+
tEnv.executeSql(ddls(format)[ddlIndex]);
411412

412413
List<String> args =
413414
new ArrayList<>(
@@ -428,7 +429,7 @@ public void testCloneWithExistedTable() throws Exception {
428429
"--target_catalog_conf",
429430
"warehouse=" + warehouse));
430431

431-
if (ddlIndex < 3) {
432+
if (ddlIndex < 4) {
432433
assertThatThrownBy(() -> createAction(CloneAction.class, args).run())
433434
.rootCause()
434435
.hasMessageContaining(exceptionMsg()[ddlIndex]);
@@ -499,31 +500,48 @@ public void testCloneWithNotExistedDatabase() throws Exception {
499500
Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
500501
}
501502

502-
private String[] ddls() {
503+
private String[] ddls(String format) {
503504
// has primary key
504505
String ddl0 =
505506
"CREATE TABLE test.test_table (id string, id2 int, id3 int, PRIMARY KEY (id, id2, id3) NOT ENFORCED) "
506-
+ "PARTITIONED BY (id2, id3) with ('bucket' = '-1');";
507+
+ "PARTITIONED BY (id2, id3) with ('bucket' = '-1', 'file.format' = '"
508+
+ format
509+
+ ");";
507510
// has different partition keys
508511
String ddl1 =
509512
"CREATE TABLE test.test_table (id string, id2 int, id3 int) "
510-
+ "PARTITIONED BY (id, id3) with ('bucket' = '-1');";
513+
+ "PARTITIONED BY (id, id3) with ('bucket' = '-1', 'file.format' = '"
514+
+ format
515+
+ "');";
511516
// size of fields is different
512517
String ddl2 =
513518
"CREATE TABLE test.test_table (id2 int, id3 int) "
514-
+ "PARTITIONED BY (id2, id3) with ('bucket' = '-1');";
515-
// normal
519+
+ "PARTITIONED BY (id2, id3) with ('bucket' = '-1', 'file.format' = '"
520+
+ format
521+
+ "');";
522+
523+
// different format
516524
String ddl3 =
525+
"CREATE TABLE test.test_table (id2 int, id3 int) "
526+
+ "PARTITIONED BY (id2, id3) with ('bucket' = '-1', 'file.format' = '"
527+
+ randomFormat(format)
528+
+ "');";
529+
530+
// normal
531+
String ddl4 =
517532
"CREATE TABLE test.test_table (id string, id2 int, id3 int) "
518-
+ "PARTITIONED BY (id2, id3) with ('bucket' = '-1');";
519-
return new String[] {ddl0, ddl1, ddl2, ddl3};
533+
+ "PARTITIONED BY (id2, id3) with ('bucket' = '-1', 'file.format' = '"
534+
+ format
535+
+ "');";
536+
return new String[] {ddl0, ddl1, ddl2, ddl3, ddl4};
520537
}
521538

522539
private String[] exceptionMsg() {
523540
return new String[] {
524541
"Can not clone data to existed paimon table which has primary keys",
525542
"source table partition keys is not compatible with existed paimon table partition keys.",
526-
"source table partition keys is not compatible with existed paimon table partition keys."
543+
"source table partition keys is not compatible with existed paimon table partition keys.",
544+
"source table format is not compatible with existed paimon table format."
527545
};
528546
}
529547

@@ -549,13 +567,18 @@ private static String data(int i) {
549567
private String randomFormat() {
550568
ThreadLocalRandom random = ThreadLocalRandom.current();
551569
int i = random.nextInt(3);
552-
if (i == 0) {
553-
return "orc";
554-
} else if (i == 1) {
555-
return "parquet";
556-
} else {
557-
return "avro";
570+
String[] formats = new String[] {"orc", "parquet", "avro"};
571+
return formats[i];
572+
}
573+
574+
private String randomFormat(String excludedFormat) {
575+
ThreadLocalRandom random = ThreadLocalRandom.current();
576+
int i = random.nextInt(3);
577+
String[] formats = new String[] {"orc", "parquet", "avro"};
578+
if (Objects.equals(excludedFormat, formats[i])) {
579+
return formats[(i + 1) % 3];
558580
}
581+
return formats[i];
559582
}
560583

561584
protected FileStoreTable paimonTable(

0 commit comments

Comments
 (0)