Skip to content

Commit

Permalink
[flink][hive] Introduce procedure to migrate table from hive to paimon (
Browse files Browse the repository at this point in the history
  • Loading branch information
leaves12138 authored Nov 28, 2023
1 parent 34a619f commit c7a17b2
Show file tree
Hide file tree
Showing 26 changed files with 1,476 additions and 53 deletions.
106 changes: 106 additions & 0 deletions docs/content/migration/migration-from-hive.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
---
title: "Migration From Hive"
weight: 1
type: docs
aliases:
- /migration/migration-from-hive.html
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Hive Table Migration

Apache Hive supports ORC, Parquet file formats that could be migrated to Paimon.
When migrating data to a paimon table, the origin table will be permanently disappeared. So please back up your data if you
still need the original table. The migrated table will be [unaware-bucket append-only table]({{< ref "concepts/append-only-table#append-for-scalable-table" >}}).

Now, we can use paimon hive catalog with Migrate Table Procedure and Migrate File Procedure to totally migrate a table from hive to paimon.

* Migrate Table Procedure: Paimon table does not exist, use the procedure upgrade hive table to paimon table. Hive table will disappear after action done.
* Migrate File Procedure: Paimon table already exists, use the procedure to migrate files from hive table to paimon table. **Notice that, Hive table will also disappear after action done.**

These two actions now only support file format of hive "orc" and "parquet", if your table partition formatted by other format like avro, these procedures will fail.
But we will support avro format in the future. Please make sure your table partition format is in "orc" and "parquet" now.

<span style="color: red; "> **We highly recommend to back up hive table data before migrating, because migrating action is not atomic. If been interrupted while migrating, you may lose your data.** </span>

## Example for Migration

**Migrate Hive Table**

Command: <br>

***CALL <font color="green">sys.migrate_table</font>(&#39;hive&#39;, &#39;&lt;hive_database&gt;.&lt;hive_tablename&gt;&#39;, &#39;&lt;paimon_tableconf&gt;&#39;);***

**Example**

```sql
CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 'hive', 'uri' = 'thrift://localhost:9083', 'warehouse'='/path/to/warehouse/');

USE CATALOG PAIMON;

CALL sys.migrate_table('hive', 'default.hivetable', 'file.format=orc');
```
After invoke, "hivetable" will totally convert to paimon format. Writing and reading the table by old "hive way" will fail.
We can add our table properties while importing by sys.migrate_table('<database>.<tablename>', '<tableproperties>').
<tableproperties> here should be separated by ",". For example:

```sql
CALL sys.migrate_table('hive', 'my_db.wait_to_upgrate', 'file.format=orc,read.batch-size=2096,write-only=true')
```

If your flink version is below 1.17, you can use flink action to achieve this:
```bash
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-{{< version >}}.jar \
migrate_table
--warehouse <warehouse-path> \
--source-table-type hive \
--source-table-id <database.table-name> \
[--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]] \
[--table-conf <paimon-table-sink-conf> [--table-conf <paimon-table-sink-conf> ...]]
```
Example:
```bash
<FLINK_HOME>/flink run ./paimon-flink-action-0.7-SNAPSHOT.jar migrate_table \
--warehouse /path/to/warehouse \
--catalog-conf uri=thrift://localhost:9083 \
--catalog-conf metastore=hive \
--source-table-type hive \
--source-table-id default.hive_or_paimon \
```
**Migrate Hive File**
Command: <br>
***CALL <font color="green">sys.migrate_file</font>(&#39;hive&#39;, &#39;&lt;hive_database&gt;.&lt;hive_table_name&gt;&#39;, &#39;&lt;paimon_database&gt;.&lt;paimon_tablename&gt;&#39;);***
**Example**
```sql
CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 'hive', 'uri' = 'thrift://localhost:9083', 'warehouse'='/path/to/warehouse/');
USE CATALOG PAIMON;
CALL sys.migrate_file('hive', 'default.hivetable', 'default.paimontable');
```
After invoke, "hivetable" will disappear. And all files will be moved and renamed to paimon directory. "paimontable" here must have the same
partition keys with "hivetable", and "paimontable" should be in unaware-bucket mode.
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ static void write(
}

/**
* Creates an accessor for setting the elements of an array writer during runtime.
* Creates an accessor for setting the elements of a binary writer during runtime.
*
* @param elementType the element type of the array
* @param elementType the element type
*/
static ValueSetter createValueSetter(DataType elementType) {
// ordered by type root definition
Expand Down Expand Up @@ -208,8 +208,8 @@ static ValueSetter createValueSetter(DataType elementType) {
}
}

/** Accessor for setting the elements of an array writer during runtime. */
/** Accessor for setting the elements of a binary writer during runtime. */
interface ValueSetter extends Serializable {
void setValue(BinaryArrayWriter writer, int pos, Object value);
void setValue(BinaryWriter writer, int pos, Object value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,28 @@

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.utils.Pair;

import java.io.IOException;

/** Extracts statistics directly from file. */
public interface TableStatsExtractor {

FieldStats[] extract(FileIO fileIO, Path path) throws IOException;

Pair<FieldStats[], FileInfo> extractWithFileInfo(FileIO fileIO, Path path) throws IOException;

/** File info fetched from physical file. */
class FileInfo {

private long rowCount;

public FileInfo(long rowCount) {
this.rowCount = rowCount;
}

public long getRowCount() {
return rowCount;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ public static List<Map<String, String>> getPartitions(String... partitionStrings

public static Map<String, String> parseCommaSeparatedKeyValues(String keyValues) {
Map<String, String> kvs = new HashMap<>();
for (String kvString : keyValues.split(",")) {
parseKeyValueString(kvs, kvString);
if (!StringUtils.isBlank(keyValues)) {
for (String kvString : keyValues.split(",")) {
parseKeyValueString(kvs, kvString);
}
}
return kvs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,10 @@ public void copyTableDefaultOptions(Map<String, String> options) {
tableDefaultOptions.forEach(options::putIfAbsent);
}

public FileIO fileIO() {
return fileIO;
}

private String[] tableAndSystemName(Identifier identifier) {
String[] splits = StringUtils.split(identifier.getObjectName(), SYSTEM_TABLE_SPLITTER);
if (splits.length != 2) {
Expand Down
186 changes: 186 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.migrate;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.BinaryWriter;
import org.apache.paimon.format.FieldStats;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.TableStatsExtractor;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.NewFilesIncrement;
import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.stats.BinaryTableStats;
import org.apache.paimon.stats.FieldStatsArraySerializer;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.StatsCollectorFactories;
import org.apache.paimon.utils.TypeUtils;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/** To construct file meta data for external files. */
public class FileMetaUtils {

public static List<DataFileMeta> construct(
FileIO fileIO,
String format,
String location,
Table paimonTable,
Predicate<FileStatus> filter,
Path dir,
Map<Path, Path> rollback)
throws IOException {
List<FileStatus> fileStatuses =
Arrays.stream(fileIO.listStatus(new Path(location)))
.filter(s -> !s.isDir())
.filter(filter)
.collect(Collectors.toList());

return fileStatuses.stream()
.map(
status ->
constructFileMeta(
format, status, fileIO, paimonTable, dir, rollback))
.collect(Collectors.toList());
}

public static CommitMessage commitFile(BinaryRow partition, List<DataFileMeta> dataFileMetas) {
return new CommitMessageImpl(
partition,
0,
new NewFilesIncrement(dataFileMetas, Collections.emptyList()),
new CompactIncrement(
Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
}

// -----------------------------private method---------------------------------------------

private static DataFileMeta constructFileMeta(
String format,
FileStatus fileStatus,
FileIO fileIO,
Table table,
Path dir,
Map<Path, Path> rollback) {

try {
FieldStatsCollector.Factory[] factories =
StatsCollectorFactories.createStatsFactories(
((AbstractFileStoreTable) table).coreOptions(),
table.rowType().getFieldNames());

TableStatsExtractor tableStatsExtractor =
FileFormat.getFileFormat(
((AbstractFileStoreTable) table)
.coreOptions()
.toConfiguration(),
format)
.createStatsExtractor(table.rowType(), factories)
.orElseThrow(
() ->
new RuntimeException(
"Can't get table stats extractor for format "
+ format));
Path newPath = renameFile(fileIO, fileStatus.getPath(), dir, format, rollback);
return constructFileMeta(
newPath.getName(),
fileStatus.getLen(),
newPath,
tableStatsExtractor,
fileIO,
table);
} catch (IOException e) {
throw new RuntimeException("error when construct file meta", e);
}
}

private static Path renameFile(
FileIO fileIO, Path originPath, Path newDir, String format, Map<Path, Path> rollback)
throws IOException {
String subfix = "." + format;
String fileName = originPath.getName();
String newFileName = fileName.endsWith(subfix) ? fileName : fileName + "." + format;
Path newPath = new Path(newDir, newFileName);
rollback.put(newPath, originPath);
fileIO.rename(originPath, newPath);
return newPath;
}

private static DataFileMeta constructFileMeta(
String fileName,
long fileSize,
Path path,
TableStatsExtractor tableStatsExtractor,
FileIO fileIO,
Table table)
throws IOException {
FieldStatsArraySerializer statsArraySerializer =
new FieldStatsArraySerializer(table.rowType());

Pair<FieldStats[], TableStatsExtractor.FileInfo> fileInfo =
tableStatsExtractor.extractWithFileInfo(fileIO, path);
BinaryTableStats stats = statsArraySerializer.toBinary(fileInfo.getLeft());

return DataFileMeta.forAppend(
fileName,
fileSize,
fileInfo.getRight().getRowCount(),
stats,
0,
0,
((AbstractFileStoreTable) table).schema().id());
}

public static BinaryRow writePartitionValue(
RowType partitionRowType,
Map<String, String> partitionValues,
List<BinaryWriter.ValueSetter> valueSetters) {

BinaryRow binaryRow = new BinaryRow(partitionRowType.getFieldCount());
BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRow);

List<DataField> fields = partitionRowType.getFields();

for (int i = 0; i < fields.size(); i++) {
Object value =
TypeUtils.castFromString(
partitionValues.get(fields.get(i).name()), fields.get(i).type());
valueSetters.get(i).setValue(binaryRowWriter, i, value);
}
binaryRowWriter.complete();
return binaryRow;
}
}
25 changes: 25 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/migrate/Migrator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.migrate;

/** Migrator interface for migrating table from other data-lake like hive, iceberg, hudi and etc. */
public interface Migrator {

void executeMigrate() throws Exception;
}
Loading

0 comments on commit c7a17b2

Please sign in to comment.