diff --git a/.github/workflows/bot.yml b/.github/workflows/bot.yml
index 7b4376b04ce4b..d81fa56491795 100644
--- a/.github/workflows/bot.yml
+++ b/.github/workflows/bot.yml
@@ -214,7 +214,6 @@ jobs:
strategy:
matrix:
include:
- - flinkProfile: "flink1.13"
- flinkProfile: "flink1.14"
- flinkProfile: "flink1.15"
- flinkProfile: "flink1.16"
@@ -302,15 +301,9 @@ jobs:
- flinkProfile: 'flink1.14'
sparkProfile: 'spark3.2'
sparkRuntime: 'spark3.2.3'
- - flinkProfile: 'flink1.13'
- sparkProfile: 'spark3.1'
- sparkRuntime: 'spark3.1.3'
- flinkProfile: 'flink1.14'
sparkProfile: 'spark3.0'
sparkRuntime: 'spark3.0.2'
- - flinkProfile: 'flink1.13'
- sparkProfile: 'spark2.4'
- sparkRuntime: 'spark2.4.8'
steps:
- uses: actions/checkout@v3
- name: Set up JDK 8
@@ -377,15 +370,6 @@ jobs:
- flinkProfile: 'flink1.14'
sparkProfile: 'spark3.2'
sparkRuntime: 'spark3.2.3'
- - flinkProfile: 'flink1.13'
- sparkProfile: 'spark3.1'
- sparkRuntime: 'spark3.1.3'
- - flinkProfile: 'flink1.13'
- sparkProfile: 'spark'
- sparkRuntime: 'spark2.4.8'
- - flinkProfile: 'flink1.13'
- sparkProfile: 'spark2.4'
- sparkRuntime: 'spark2.4.8'
steps:
- uses: actions/checkout@v3
- name: Set up JDK 8
diff --git a/README.md b/README.md
index 20016f689ad33..aa3920bd131a3 100644
--- a/README.md
+++ b/README.md
@@ -131,8 +131,6 @@ Refer to the table below for building with different Flink and Scala versions.
| `-Dflink1.15` | hudi-flink1.15-bundle | For Flink 1.15 |
| `-Dflink1.14` | hudi-flink1.14-bundle | For Flink 1.14 and Scala 2.12 |
| `-Dflink1.14 -Dscala-2.11` | hudi-flink1.14-bundle | For Flink 1.14 and Scala 2.11 |
-| `-Dflink1.13` | hudi-flink1.13-bundle | For Flink 1.13 and Scala 2.12 |
-| `-Dflink1.13 -Dscala-2.11` | hudi-flink1.13-bundle | For Flink 1.13 and Scala 2.11 |
For example,
```
@@ -141,9 +139,6 @@ mvn clean package -DskipTests -Dflink1.15
# Build against Flink 1.14.x and Scala 2.11
mvn clean package -DskipTests -Dflink1.14 -Dscala-2.11
-
-# Build against Flink 1.13.x and Scala 2.12
-mvn clean package -DskipTests -Dflink1.13
```
## Running Tests
diff --git a/azure-pipelines-20230430.yml b/azure-pipelines-20230430.yml
index 85d185fbc2c5c..21c6d932ef9c2 100644
--- a/azure-pipelines-20230430.yml
+++ b/azure-pipelines-20230430.yml
@@ -32,7 +32,6 @@ parameters:
- 'hudi-common'
- 'hudi-flink-datasource'
- 'hudi-flink-datasource/hudi-flink'
- - 'hudi-flink-datasource/hudi-flink1.13.x'
- 'hudi-flink-datasource/hudi-flink1.14.x'
- 'hudi-flink-datasource/hudi-flink1.15.x'
- 'hudi-flink-datasource/hudi-flink1.16.x'
@@ -65,7 +64,6 @@ parameters:
- '!hudi-examples/hudi-examples-spark'
- '!hudi-flink-datasource'
- '!hudi-flink-datasource/hudi-flink'
- - '!hudi-flink-datasource/hudi-flink1.13.x'
- '!hudi-flink-datasource/hudi-flink1.14.x'
- '!hudi-flink-datasource/hudi-flink1.15.x'
- '!hudi-flink-datasource/hudi-flink1.16.x'
@@ -89,7 +87,6 @@ parameters:
- '!hudi-examples/hudi-examples-spark'
- '!hudi-flink-datasource'
- '!hudi-flink-datasource/hudi-flink'
- - '!hudi-flink-datasource/hudi-flink1.13.x'
- '!hudi-flink-datasource/hudi-flink1.14.x'
- '!hudi-flink-datasource/hudi-flink1.15.x'
- '!hudi-flink-datasource/hudi-flink1.16.x'
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/pom.xml b/hudi-flink-datasource/hudi-flink1.13.x/pom.xml
deleted file mode 100644
index 43cf20b379a42..0000000000000
--- a/hudi-flink-datasource/hudi-flink1.13.x/pom.xml
+++ /dev/null
@@ -1,144 +0,0 @@
-
-
-
-
- hudi-flink-datasource
- org.apache.hudi
- 1.0.0-SNAPSHOT
-
- 4.0.0
-
- hudi-flink1.13.x
- 1.0.0-SNAPSHOT
- jar
-
-
- ${project.parent.parent.basedir}
-
-
-
-
-
- org.apache.logging.log4j
- log4j-1.2-api
-
-
- org.apache.logging.log4j
- log4j-slf4j-impl
-
-
- org.slf4j
- slf4j-api
-
-
-
-
- org.apache.hudi
- hudi-common
- ${project.version}
-
-
- org.apache.hadoop
- hadoop-common
- ${hadoop.version}
- provided
-
-
-
-
- org.apache.flink
- flink-table-runtime-blink_${scala.binary.version}
- ${flink1.13.version}
- provided
-
-
- org.apache.flink
- flink-streaming-java_${scala.binary.version}
- ${flink1.13.version}
- provided
-
-
- org.apache.flink
- flink-core
- ${flink1.13.version}
- provided
-
-
- org.apache.flink
- flink-parquet_${scala.binary.version}
- ${flink1.13.version}
- provided
-
-
- org.apache.flink
- flink-json
- ${flink1.13.version}
- provided
-
-
- org.apache.flink
- flink-table-planner-blink_${scala.binary.version}
- ${flink1.13.version}
- provided
-
-
-
-
- org.apache.flink
- flink-runtime_${scala.binary.version}
- ${flink1.13.version}
- test
- test-jar
-
-
- org.apache.hudi
- hudi-tests-common
- ${project.version}
- test
-
-
-
-
-
-
- org.jacoco
- jacoco-maven-plugin
-
-
- org.apache.maven.plugins
- maven-jar-plugin
-
-
-
- test-jar
-
- test-compile
-
-
-
- false
-
-
-
- org.apache.rat
- apache-rat-plugin
-
-
-
-
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/AbstractStreamOperatorAdapter.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/AbstractStreamOperatorAdapter.java
deleted file mode 100644
index 51c53f368fb9d..0000000000000
--- a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/AbstractStreamOperatorAdapter.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.adapter;
-
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-
-/**
- * Adapter clazz for {@code AbstractStreamOperator}.
- */
-public abstract class AbstractStreamOperatorAdapter extends AbstractStreamOperator {
- @Override
- public void close() throws Exception {
- super.dispose();
- }
-
- public void finish() throws Exception {
- super.close();
- }
-}
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/AbstractStreamOperatorFactoryAdapter.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/AbstractStreamOperatorFactoryAdapter.java
deleted file mode 100644
index 0ea0968f17585..0000000000000
--- a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/AbstractStreamOperatorFactoryAdapter.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.adapter;
-
-import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
-import org.apache.flink.streaming.api.operators.MailboxExecutor;
-import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Adapter clazz for {@link AbstractStreamOperatorFactory}.
- */
-public abstract class AbstractStreamOperatorFactoryAdapter
- extends AbstractStreamOperatorFactory implements YieldingOperatorFactory {
- private transient MailboxExecutor mailboxExecutor;
-
- @Override
- public void setMailboxExecutor(MailboxExecutor mailboxExecutor) {
- this.mailboxExecutor = mailboxExecutor;
- }
-
- public MailboxExecutorAdapter getMailboxExecutorAdapter() {
- return new MailboxExecutorAdapter(getMailboxExecutor());
- }
-
- /**
- * Provides the mailbox executor iff this factory implements {@link YieldingOperatorFactory}.
- */
- protected MailboxExecutor getMailboxExecutor() {
- return checkNotNull(
- mailboxExecutor, "Factory does not implement %s", YieldingOperatorFactory.class);
- }
-}
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/DataStreamScanProviderAdapter.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/DataStreamScanProviderAdapter.java
deleted file mode 100644
index 867395c43f199..0000000000000
--- a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/DataStreamScanProviderAdapter.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.adapter;
-
-import org.apache.flink.table.connector.source.DataStreamScanProvider;
-
-/**
- * Adapter clazz for {@code DataStreamScanProvider}.
- */
-public interface DataStreamScanProviderAdapter extends DataStreamScanProvider {
-}
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/DataStreamSinkProviderAdapter.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/DataStreamSinkProviderAdapter.java
deleted file mode 100644
index e8eaa3c62d441..0000000000000
--- a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/DataStreamSinkProviderAdapter.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.adapter;
-
-import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
-
-/**
- * Adapter clazz for {@code DataStreamSinkProvider}.
- */
-public interface DataStreamSinkProviderAdapter extends DataStreamSinkProvider {
-}
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/HiveCatalogConstants.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/HiveCatalogConstants.java
deleted file mode 100644
index 94ed3b5388797..0000000000000
--- a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/HiveCatalogConstants.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.adapter;
-
-import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase;
-import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner;
-import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveDatabase;
-
-/**
- * Constants for Hive Catalog.
- */
-public class HiveCatalogConstants {
-
- // -----------------------------------------------------------------------------------
- // Constants for ALTER DATABASE
- // -----------------------------------------------------------------------------------
- public static final String ALTER_DATABASE_OP = SqlAlterHiveDatabase.ALTER_DATABASE_OP;
-
- public static final String DATABASE_LOCATION_URI = SqlCreateHiveDatabase.DATABASE_LOCATION_URI;
-
- public static final String DATABASE_OWNER_NAME = SqlAlterHiveDatabaseOwner.DATABASE_OWNER_NAME;
-
- public static final String DATABASE_OWNER_TYPE = SqlAlterHiveDatabaseOwner.DATABASE_OWNER_TYPE;
-
- public static final String ROLE_OWNER = SqlAlterHiveDatabaseOwner.ROLE_OWNER;
-
- public static final String USER_OWNER = SqlAlterHiveDatabaseOwner.USER_OWNER;
-
- /** Type of ALTER DATABASE operation. */
- public enum AlterHiveDatabaseOp {
- CHANGE_PROPS,
- CHANGE_LOCATION,
- CHANGE_OWNER
- }
-}
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/MailboxExecutorAdapter.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/MailboxExecutorAdapter.java
deleted file mode 100644
index 9ae3ca6912f65..0000000000000
--- a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/MailboxExecutorAdapter.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.adapter;
-
-import org.apache.flink.streaming.api.operators.MailboxExecutor;
-import org.apache.flink.util.function.ThrowingRunnable;
-
-/**
- * Adapter clazz for {@link MailboxExecutor}.
- */
-public class MailboxExecutorAdapter {
- private final MailboxExecutor executor;
-
- public MailboxExecutorAdapter(MailboxExecutor executor) {
- this.executor = executor;
- }
-
- public void execute(ThrowingRunnable extends Exception> command, String description) {
- this.executor.execute(command, description);
- }
-}
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/MaskingOutputAdapter.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/MaskingOutputAdapter.java
deleted file mode 100644
index ea0ba0419214b..0000000000000
--- a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/MaskingOutputAdapter.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hudi.adapter;
-
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.OutputTag;
-
-/** Adapter class for {@code Output} to handle async compaction/clustering service thread safe issues */
-public class MaskingOutputAdapter implements Output> {
-
- private final Output> output;
-
- public MaskingOutputAdapter(Output> output) {
- this.output = output;
- }
-
- @Override
- public void emitWatermark(Watermark watermark) {
- // For thread safe, not to propagate the watermark
- }
-
- @Override
- public void emitLatencyMarker(LatencyMarker latencyMarker) {
- // For thread safe, not to propagate latency marker
- }
-
- @Override
- public void collect(OutputTag outputTag, StreamRecord streamRecord) {
- this.output.collect(outputTag, streamRecord);
- }
-
- @Override
- public void collect(StreamRecord outStreamRecord) {
- this.output.collect(outStreamRecord);
- }
-
- @Override
- public void close() {
- this.output.close();
- }
-}
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/OperatorCoordinatorAdapter.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/OperatorCoordinatorAdapter.java
deleted file mode 100644
index 887833c90e16b..0000000000000
--- a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/OperatorCoordinatorAdapter.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.adapter;
-
-import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
-
-/**
- * Adapter clazz for {@code OperatorCoordinator}.
- */
-public interface OperatorCoordinatorAdapter extends OperatorCoordinator {
-}
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/RateLimiterAdapter.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/RateLimiterAdapter.java
deleted file mode 100644
index 6d058de89bc55..0000000000000
--- a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/RateLimiterAdapter.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.adapter;
-
-import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.RateLimiter;
-
-/**
- * Bridge class for shaded guava clazz {@code RateLimiter}.
- */
-public class RateLimiterAdapter {
- private final RateLimiter rateLimiter;
-
- private RateLimiterAdapter(double permitsPerSecond) {
- this.rateLimiter = RateLimiter.create(permitsPerSecond);
- }
-
- public static RateLimiterAdapter create(double permitsPerSecond) {
- return new RateLimiterAdapter(permitsPerSecond);
- }
-
- public void acquire() {
- this.rateLimiter.acquire();
- }
-}
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/SortCodeGeneratorAdapter.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/SortCodeGeneratorAdapter.java
deleted file mode 100644
index a3ee8e6eed174..0000000000000
--- a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/SortCodeGeneratorAdapter.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.adapter;
-
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
-import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
-import org.apache.flink.table.types.logical.RowType;
-
-/**
- * Adapter clazz for {@code SortCodeGenerator}.
- */
-public class SortCodeGeneratorAdapter extends SortCodeGenerator {
- public SortCodeGeneratorAdapter(TableConfig conf, RowType input, SortSpec sortSpec) {
- super(conf, input, sortSpec);
- }
-}
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java
deleted file mode 100644
index cd5c4eb891b06..0000000000000
--- a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelDeleteAdapter.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.adapter;
-
-/**
- * Adapter clazz for {@code org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete}.
- */
-public interface SupportsRowLevelDeleteAdapter {
-
- RowLevelDeleteInfoAdapter applyRowLevelDelete();
-
- /**
- * Adapter clazz for {@code SupportsRowLevelDelete.RowLevelDeleteInfo}.
- */
- interface RowLevelDeleteInfoAdapter {
- }
-}
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java
deleted file mode 100644
index 6a62763ec5b7e..0000000000000
--- a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/SupportsRowLevelUpdateAdapter.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.adapter;
-
-import org.apache.flink.table.catalog.Column;
-
-import java.util.List;
-
-/**
- * Adapter clazz for {@code org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate}.
- */
-public interface SupportsRowLevelUpdateAdapter {
-
- RowLevelUpdateInfoAdapter applyRowLevelUpdate(List updatedColumns);
-
- /**
- * Adapter clazz for {@code SupportsRowLevelUpdate.RowLevelUpdateInfo}.
- */
- interface RowLevelUpdateInfoAdapter {
- }
-}
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/Utils.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/Utils.java
deleted file mode 100644
index 521fd50c8d8ac..0000000000000
--- a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/adapter/Utils.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.adapter;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.StreamSourceContexts;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.catalog.ResolvedCatalogTable;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
-import org.apache.flink.table.runtime.generated.RecordComparator;
-import org.apache.flink.table.runtime.operators.sort.BinaryExternalSorter;
-import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
-import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
-
-/**
- * Adapter utils.
- */
-public class Utils {
- public static SourceFunction.SourceContext getSourceContext(
- TimeCharacteristic timeCharacteristic,
- ProcessingTimeService processingTimeService,
- StreamTask, ?> streamTask,
- Output> output,
- long watermarkInterval) {
- return StreamSourceContexts.getSourceContext(
- timeCharacteristic,
- processingTimeService,
- new Object(), // no actual locking needed
- streamTask.getStreamStatusMaintainer(),
- output,
- watermarkInterval,
- -1);
- }
-
- public static FactoryUtil.DefaultDynamicTableContext getTableContext(
- ObjectIdentifier tablePath,
- ResolvedCatalogTable catalogTable,
- ReadableConfig conf) {
- return new FactoryUtil.DefaultDynamicTableContext(tablePath, catalogTable,
- conf, Thread.currentThread().getContextClassLoader(), false);
- }
-
- public static BinaryExternalSorter getBinaryExternalSorter(
- final Object owner,
- MemoryManager memoryManager,
- long reservedMemorySize,
- IOManager ioManager,
- AbstractRowDataSerializer inputSerializer,
- BinaryRowDataSerializer serializer,
- NormalizedKeyComputer normalizedKeyComputer,
- RecordComparator comparator,
- Configuration conf) {
- return new BinaryExternalSorter(owner, memoryManager, reservedMemorySize,
- ioManager, inputSerializer, serializer, normalizedKeyComputer, comparator, conf);
- }
-}
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/data/ColumnarArrayData.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/data/ColumnarArrayData.java
deleted file mode 100644
index 20c63d26f7492..0000000000000
--- a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/data/ColumnarArrayData.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.data;
-
-import org.apache.hudi.table.data.vector.MapColumnVector;
-import org.apache.hudi.table.data.vector.RowColumnVector;
-
-import org.apache.flink.table.data.ArrayData;
-import org.apache.flink.table.data.DecimalData;
-import org.apache.flink.table.data.MapData;
-import org.apache.flink.table.data.RawValueData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.data.binary.TypedSetters;
-import org.apache.flink.table.data.vector.ArrayColumnVector;
-import org.apache.flink.table.data.vector.BooleanColumnVector;
-import org.apache.flink.table.data.vector.ByteColumnVector;
-import org.apache.flink.table.data.vector.BytesColumnVector;
-import org.apache.flink.table.data.vector.ColumnVector;
-import org.apache.flink.table.data.vector.DecimalColumnVector;
-import org.apache.flink.table.data.vector.DoubleColumnVector;
-import org.apache.flink.table.data.vector.FloatColumnVector;
-import org.apache.flink.table.data.vector.IntColumnVector;
-import org.apache.flink.table.data.vector.LongColumnVector;
-import org.apache.flink.table.data.vector.ShortColumnVector;
-import org.apache.flink.table.data.vector.TimestampColumnVector;
-
-import java.util.Arrays;
-
-/**
- * Columnar array to support access to vector column data.
- *
- *
References {@code org.apache.flink.table.data.ColumnarArrayData} to include FLINK-15390.
- */
-public final class ColumnarArrayData implements ArrayData, TypedSetters {
-
- private final ColumnVector data;
- private final int offset;
- private final int numElements;
-
- public ColumnarArrayData(ColumnVector data, int offset, int numElements) {
- this.data = data;
- this.offset = offset;
- this.numElements = numElements;
- }
-
- @Override
- public int size() {
- return numElements;
- }
-
- @Override
- public boolean isNullAt(int pos) {
- return data.isNullAt(offset + pos);
- }
-
- @Override
- public void setNullAt(int pos) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public boolean getBoolean(int pos) {
- return ((BooleanColumnVector) data).getBoolean(offset + pos);
- }
-
- @Override
- public byte getByte(int pos) {
- return ((ByteColumnVector) data).getByte(offset + pos);
- }
-
- @Override
- public short getShort(int pos) {
- return ((ShortColumnVector) data).getShort(offset + pos);
- }
-
- @Override
- public int getInt(int pos) {
- return ((IntColumnVector) data).getInt(offset + pos);
- }
-
- @Override
- public long getLong(int pos) {
- return ((LongColumnVector) data).getLong(offset + pos);
- }
-
- @Override
- public float getFloat(int pos) {
- return ((FloatColumnVector) data).getFloat(offset + pos);
- }
-
- @Override
- public double getDouble(int pos) {
- return ((DoubleColumnVector) data).getDouble(offset + pos);
- }
-
- @Override
- public StringData getString(int pos) {
- BytesColumnVector.Bytes byteArray = getByteArray(pos);
- return StringData.fromBytes(byteArray.data, byteArray.offset, byteArray.len);
- }
-
- @Override
- public DecimalData getDecimal(int pos, int precision, int scale) {
- return ((DecimalColumnVector) data).getDecimal(offset + pos, precision, scale);
- }
-
- @Override
- public TimestampData getTimestamp(int pos, int precision) {
- return ((TimestampColumnVector) data).getTimestamp(offset + pos, precision);
- }
-
- @Override
- public RawValueData getRawValue(int pos) {
- throw new UnsupportedOperationException("RawValueData is not supported.");
- }
-
- @Override
- public byte[] getBinary(int pos) {
- BytesColumnVector.Bytes byteArray = getByteArray(pos);
- if (byteArray.len == byteArray.data.length) {
- return byteArray.data;
- } else {
- return Arrays.copyOfRange(byteArray.data, byteArray.offset, byteArray.len);
- }
- }
-
- @Override
- public ArrayData getArray(int pos) {
- return ((ArrayColumnVector) data).getArray(offset + pos);
- }
-
- @Override
- public MapData getMap(int pos) {
- return ((MapColumnVector) data).getMap(offset + pos);
- }
-
- @Override
- public RowData getRow(int pos, int numFields) {
- return ((RowColumnVector) data).getRow(offset + pos);
- }
-
- @Override
- public void setBoolean(int pos, boolean value) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public void setByte(int pos, byte value) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public void setShort(int pos, short value) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public void setInt(int pos, int value) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public void setLong(int pos, long value) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public void setFloat(int pos, float value) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public void setDouble(int pos, double value) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public void setDecimal(int pos, DecimalData value, int precision) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public void setTimestamp(int pos, TimestampData value, int precision) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public boolean[] toBooleanArray() {
- boolean[] res = new boolean[numElements];
- for (int i = 0; i < numElements; i++) {
- res[i] = getBoolean(i);
- }
- return res;
- }
-
- @Override
- public byte[] toByteArray() {
- byte[] res = new byte[numElements];
- for (int i = 0; i < numElements; i++) {
- res[i] = getByte(i);
- }
- return res;
- }
-
- @Override
- public short[] toShortArray() {
- short[] res = new short[numElements];
- for (int i = 0; i < numElements; i++) {
- res[i] = getShort(i);
- }
- return res;
- }
-
- @Override
- public int[] toIntArray() {
- int[] res = new int[numElements];
- for (int i = 0; i < numElements; i++) {
- res[i] = getInt(i);
- }
- return res;
- }
-
- @Override
- public long[] toLongArray() {
- long[] res = new long[numElements];
- for (int i = 0; i < numElements; i++) {
- res[i] = getLong(i);
- }
- return res;
- }
-
- @Override
- public float[] toFloatArray() {
- float[] res = new float[numElements];
- for (int i = 0; i < numElements; i++) {
- res[i] = getFloat(i);
- }
- return res;
- }
-
- @Override
- public double[] toDoubleArray() {
- double[] res = new double[numElements];
- for (int i = 0; i < numElements; i++) {
- res[i] = getDouble(i);
- }
- return res;
- }
-
- private BytesColumnVector.Bytes getByteArray(int pos) {
- return ((BytesColumnVector) data).getBytes(offset + pos);
- }
-}
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/data/ColumnarMapData.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/data/ColumnarMapData.java
deleted file mode 100644
index bba462f404b35..0000000000000
--- a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/data/ColumnarMapData.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.data;
-
-import org.apache.flink.table.data.ArrayData;
-import org.apache.flink.table.data.MapData;
-import org.apache.flink.table.data.vector.ColumnVector;
-
-/**
- * Columnar map to support access to vector column data.
- *
- *
Referenced from flink 1.14.0 {@code org.apache.flink.table.data.ColumnarMapData}.
- */
-public final class ColumnarMapData implements MapData {
-
- private final ColumnVector keyColumnVector;
- private final ColumnVector valueColumnVector;
- private final int offset;
- private final int numElements;
-
- public ColumnarMapData(
- ColumnVector keyColumnVector,
- ColumnVector valueColumnVector,
- int offset,
- int numElements) {
- this.keyColumnVector = keyColumnVector;
- this.valueColumnVector = valueColumnVector;
- this.offset = offset;
- this.numElements = numElements;
- }
-
- @Override
- public int size() {
- return numElements;
- }
-
- @Override
- public ArrayData keyArray() {
- return new ColumnarArrayData(keyColumnVector, offset, numElements);
- }
-
- @Override
- public ArrayData valueArray() {
- return new ColumnarArrayData(valueColumnVector, offset, numElements);
- }
-
- @Override
- public boolean equals(Object o) {
- throw new UnsupportedOperationException(
- "ColumnarMapData do not support equals, please compare fields one by one!");
- }
-
- @Override
- public int hashCode() {
- throw new UnsupportedOperationException(
- "ColumnarMapData do not support hashCode, please hash fields one by one!");
- }
-}
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/data/ColumnarRowData.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/data/ColumnarRowData.java
deleted file mode 100644
index 9a95035b27038..0000000000000
--- a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/data/ColumnarRowData.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.data;
-
-import org.apache.hudi.table.data.vector.VectorizedColumnBatch;
-
-import org.apache.flink.table.data.ArrayData;
-import org.apache.flink.table.data.DecimalData;
-import org.apache.flink.table.data.MapData;
-import org.apache.flink.table.data.RawValueData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.data.binary.TypedSetters;
-import org.apache.flink.table.data.vector.BytesColumnVector.Bytes;
-import org.apache.flink.types.RowKind;
-
-/**
- * Columnar row to support access to vector column data.
- * It is a row view in {@link VectorizedColumnBatch}.
- *
- *
References {@code org.apache.flink.table.data.ColumnarRowData} to include FLINK-15390.
- */
-public final class ColumnarRowData implements RowData, TypedSetters {
-
- private RowKind rowKind = RowKind.INSERT;
- private VectorizedColumnBatch vectorizedColumnBatch;
- private int rowId;
-
- public ColumnarRowData() {
- }
-
- public ColumnarRowData(VectorizedColumnBatch vectorizedColumnBatch) {
- this(vectorizedColumnBatch, 0);
- }
-
- public ColumnarRowData(VectorizedColumnBatch vectorizedColumnBatch, int rowId) {
- this.vectorizedColumnBatch = vectorizedColumnBatch;
- this.rowId = rowId;
- }
-
- public void setVectorizedColumnBatch(VectorizedColumnBatch vectorizedColumnBatch) {
- this.vectorizedColumnBatch = vectorizedColumnBatch;
- this.rowId = 0;
- }
-
- public void setRowId(int rowId) {
- this.rowId = rowId;
- }
-
- @Override
- public RowKind getRowKind() {
- return rowKind;
- }
-
- @Override
- public void setRowKind(RowKind kind) {
- this.rowKind = kind;
- }
-
- @Override
- public int getArity() {
- return vectorizedColumnBatch.getArity();
- }
-
- @Override
- public boolean isNullAt(int pos) {
- return vectorizedColumnBatch.isNullAt(rowId, pos);
- }
-
- @Override
- public boolean getBoolean(int pos) {
- return vectorizedColumnBatch.getBoolean(rowId, pos);
- }
-
- @Override
- public byte getByte(int pos) {
- return vectorizedColumnBatch.getByte(rowId, pos);
- }
-
- @Override
- public short getShort(int pos) {
- return vectorizedColumnBatch.getShort(rowId, pos);
- }
-
- @Override
- public int getInt(int pos) {
- return vectorizedColumnBatch.getInt(rowId, pos);
- }
-
- @Override
- public long getLong(int pos) {
- return vectorizedColumnBatch.getLong(rowId, pos);
- }
-
- @Override
- public float getFloat(int pos) {
- return vectorizedColumnBatch.getFloat(rowId, pos);
- }
-
- @Override
- public double getDouble(int pos) {
- return vectorizedColumnBatch.getDouble(rowId, pos);
- }
-
- @Override
- public StringData getString(int pos) {
- Bytes byteArray = vectorizedColumnBatch.getByteArray(rowId, pos);
- return StringData.fromBytes(byteArray.data, byteArray.offset, byteArray.len);
- }
-
- @Override
- public DecimalData getDecimal(int pos, int precision, int scale) {
- return vectorizedColumnBatch.getDecimal(rowId, pos, precision, scale);
- }
-
- @Override
- public TimestampData getTimestamp(int pos, int precision) {
- return vectorizedColumnBatch.getTimestamp(rowId, pos, precision);
- }
-
- @Override
- public RawValueData getRawValue(int pos) {
- throw new UnsupportedOperationException("RawValueData is not supported.");
- }
-
- @Override
- public byte[] getBinary(int pos) {
- Bytes byteArray = vectorizedColumnBatch.getByteArray(rowId, pos);
- if (byteArray.len == byteArray.data.length) {
- return byteArray.data;
- } else {
- byte[] ret = new byte[byteArray.len];
- System.arraycopy(byteArray.data, byteArray.offset, ret, 0, byteArray.len);
- return ret;
- }
- }
-
- @Override
- public RowData getRow(int pos, int numFields) {
- return vectorizedColumnBatch.getRow(rowId, pos);
- }
-
- @Override
- public ArrayData getArray(int pos) {
- return vectorizedColumnBatch.getArray(rowId, pos);
- }
-
- @Override
- public MapData getMap(int pos) {
- return vectorizedColumnBatch.getMap(rowId, pos);
- }
-
- @Override
- public void setNullAt(int pos) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public void setBoolean(int pos, boolean value) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public void setByte(int pos, byte value) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public void setShort(int pos, short value) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public void setInt(int pos, int value) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public void setLong(int pos, long value) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public void setFloat(int pos, float value) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public void setDouble(int pos, double value) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public void setDecimal(int pos, DecimalData value, int precision) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public void setTimestamp(int pos, TimestampData value, int precision) {
- throw new UnsupportedOperationException("Not support the operation!");
- }
-
- @Override
- public boolean equals(Object o) {
- throw new UnsupportedOperationException(
- "ColumnarRowData do not support equals, please compare fields one by one!");
- }
-
- @Override
- public int hashCode() {
- throw new UnsupportedOperationException(
- "ColumnarRowData do not support hashCode, please hash fields one by one!");
- }
-}
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/data/vector/MapColumnVector.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/data/vector/MapColumnVector.java
deleted file mode 100644
index 6bdf8782f4d3e..0000000000000
--- a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/data/vector/MapColumnVector.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.data.vector;
-
-import org.apache.flink.table.data.MapData;
-import org.apache.flink.table.data.vector.ColumnVector;
-
-/**
- * Map column vector.
- */
-public interface MapColumnVector extends ColumnVector {
- MapData getMap(int i);
-}
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/data/vector/RowColumnVector.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/data/vector/RowColumnVector.java
deleted file mode 100644
index bd0e9bbe7de72..0000000000000
--- a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/data/vector/RowColumnVector.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.data.vector;
-
-import org.apache.hudi.table.data.ColumnarRowData;
-
-import org.apache.flink.table.data.vector.ColumnVector;
-
-/**
- * Row column vector.
- */
-public interface RowColumnVector extends ColumnVector {
- ColumnarRowData getRow(int i);
-}
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/data/vector/VectorizedColumnBatch.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/data/vector/VectorizedColumnBatch.java
deleted file mode 100644
index bccaec8fdcadf..0000000000000
--- a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/data/vector/VectorizedColumnBatch.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.data.vector;
-
-import org.apache.flink.table.data.ArrayData;
-import org.apache.flink.table.data.DecimalData;
-import org.apache.flink.table.data.MapData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.data.vector.ArrayColumnVector;
-import org.apache.flink.table.data.vector.BooleanColumnVector;
-import org.apache.flink.table.data.vector.ByteColumnVector;
-import org.apache.flink.table.data.vector.BytesColumnVector;
-import org.apache.flink.table.data.vector.BytesColumnVector.Bytes;
-import org.apache.flink.table.data.vector.ColumnVector;
-import org.apache.flink.table.data.vector.DecimalColumnVector;
-import org.apache.flink.table.data.vector.DoubleColumnVector;
-import org.apache.flink.table.data.vector.FloatColumnVector;
-import org.apache.flink.table.data.vector.IntColumnVector;
-import org.apache.flink.table.data.vector.LongColumnVector;
-import org.apache.flink.table.data.vector.ShortColumnVector;
-import org.apache.flink.table.data.vector.TimestampColumnVector;
-
-import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
-
-/**
- * A VectorizedColumnBatch is a set of rows, organized with each column as a vector. It is the unit
- * of query execution, organized to minimize the cost per row.
- *
- *
{@code VectorizedColumnBatch}s are influenced by Apache Hive VectorizedRowBatch.
- *
- *
References {@code org.apache.flink.table.data.vector.VectorizedColumnBatch} to include FLINK-15390.
- */
-public class VectorizedColumnBatch implements Serializable {
- private static final long serialVersionUID = 8180323238728166155L;
-
- /**
- * This number is carefully chosen to minimize overhead and typically allows one
- * VectorizedColumnBatch to fit in cache.
- */
- public static final int DEFAULT_SIZE = 2048;
-
- private int numRows;
- public final ColumnVector[] columns;
-
- public VectorizedColumnBatch(ColumnVector[] vectors) {
- this.columns = vectors;
- }
-
- public void setNumRows(int numRows) {
- this.numRows = numRows;
- }
-
- public int getNumRows() {
- return numRows;
- }
-
- public int getArity() {
- return columns.length;
- }
-
- public boolean isNullAt(int rowId, int colId) {
- return columns[colId].isNullAt(rowId);
- }
-
- public boolean getBoolean(int rowId, int colId) {
- return ((BooleanColumnVector) columns[colId]).getBoolean(rowId);
- }
-
- public byte getByte(int rowId, int colId) {
- return ((ByteColumnVector) columns[colId]).getByte(rowId);
- }
-
- public short getShort(int rowId, int colId) {
- return ((ShortColumnVector) columns[colId]).getShort(rowId);
- }
-
- public int getInt(int rowId, int colId) {
- return ((IntColumnVector) columns[colId]).getInt(rowId);
- }
-
- public long getLong(int rowId, int colId) {
- return ((LongColumnVector) columns[colId]).getLong(rowId);
- }
-
- public float getFloat(int rowId, int colId) {
- return ((FloatColumnVector) columns[colId]).getFloat(rowId);
- }
-
- public double getDouble(int rowId, int colId) {
- return ((DoubleColumnVector) columns[colId]).getDouble(rowId);
- }
-
- public Bytes getByteArray(int rowId, int colId) {
- return ((BytesColumnVector) columns[colId]).getBytes(rowId);
- }
-
- private byte[] getBytes(int rowId, int colId) {
- Bytes byteArray = getByteArray(rowId, colId);
- if (byteArray.len == byteArray.data.length) {
- return byteArray.data;
- } else {
- return byteArray.getBytes();
- }
- }
-
- public String getString(int rowId, int colId) {
- Bytes byteArray = getByteArray(rowId, colId);
- return new String(byteArray.data, byteArray.offset, byteArray.len, StandardCharsets.UTF_8);
- }
-
- public DecimalData getDecimal(int rowId, int colId, int precision, int scale) {
- return ((DecimalColumnVector) (columns[colId])).getDecimal(rowId, precision, scale);
- }
-
- public TimestampData getTimestamp(int rowId, int colId, int precision) {
- return ((TimestampColumnVector) (columns[colId])).getTimestamp(rowId, precision);
- }
-
- public ArrayData getArray(int rowId, int colId) {
- return ((ArrayColumnVector) columns[colId]).getArray(rowId);
- }
-
- public RowData getRow(int rowId, int colId) {
- return ((RowColumnVector) columns[colId]).getRow(rowId);
- }
-
- public MapData getMap(int rowId, int colId) {
- return ((MapColumnVector) columns[colId]).getMap(rowId);
- }
-}
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
deleted file mode 100644
index ac9ca59d574d0..0000000000000
--- a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++ /dev/null
@@ -1,579 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.format.cow;
-
-import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.table.data.vector.VectorizedColumnBatch;
-import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
-import org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
-import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
-import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector;
-import org.apache.hudi.table.format.cow.vector.reader.ArrayColumnReader;
-import org.apache.hudi.table.format.cow.vector.reader.EmptyColumnReader;
-import org.apache.hudi.table.format.cow.vector.reader.FixedLenBytesColumnReader;
-import org.apache.hudi.table.format.cow.vector.reader.Int64TimestampColumnReader;
-import org.apache.hudi.table.format.cow.vector.reader.MapColumnReader;
-import org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader;
-import org.apache.hudi.table.format.cow.vector.reader.RowColumnReader;
-
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.formats.parquet.vector.reader.BooleanColumnReader;
-import org.apache.flink.formats.parquet.vector.reader.ByteColumnReader;
-import org.apache.flink.formats.parquet.vector.reader.BytesColumnReader;
-import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
-import org.apache.flink.formats.parquet.vector.reader.DoubleColumnReader;
-import org.apache.flink.formats.parquet.vector.reader.FloatColumnReader;
-import org.apache.flink.formats.parquet.vector.reader.IntColumnReader;
-import org.apache.flink.formats.parquet.vector.reader.LongColumnReader;
-import org.apache.flink.formats.parquet.vector.reader.ShortColumnReader;
-import org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader;
-import org.apache.flink.table.data.DecimalData;
-import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.data.vector.ColumnVector;
-import org.apache.flink.table.data.vector.heap.HeapBooleanVector;
-import org.apache.flink.table.data.vector.heap.HeapByteVector;
-import org.apache.flink.table.data.vector.heap.HeapBytesVector;
-import org.apache.flink.table.data.vector.heap.HeapDoubleVector;
-import org.apache.flink.table.data.vector.heap.HeapFloatVector;
-import org.apache.flink.table.data.vector.heap.HeapIntVector;
-import org.apache.flink.table.data.vector.heap.HeapLongVector;
-import org.apache.flink.table.data.vector.heap.HeapShortVector;
-import org.apache.flink.table.data.vector.heap.HeapTimestampVector;
-import org.apache.flink.table.data.vector.writable.WritableColumnVector;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.ArrayType;
-import org.apache.flink.table.types.logical.DecimalType;
-import org.apache.flink.table.types.logical.IntType;
-import org.apache.flink.table.types.logical.LocalZonedTimestampType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.MapType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.TimestampType;
-import org.apache.flink.table.types.logical.VarBinaryType;
-import org.apache.flink.util.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.parquet.ParquetRuntimeException;
-import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.page.PageReadStore;
-import org.apache.parquet.column.page.PageReader;
-import org.apache.parquet.filter.UnboundRecordFilter;
-import org.apache.parquet.filter2.predicate.FilterPredicate;
-import org.apache.parquet.schema.GroupType;
-import org.apache.parquet.schema.InvalidSchemaException;
-import org.apache.parquet.schema.OriginalType;
-import org.apache.parquet.schema.PrimitiveType;
-import org.apache.parquet.schema.Type;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.table.runtime.functions.SqlDateTimeUtils.dateToInternal;
-import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
-import static org.apache.parquet.Preconditions.checkArgument;
-
-/**
- * Util for generating {@link ParquetColumnarRowSplitReader}.
- *
- *
NOTE: reference from Flink release 1.11.2 {@code ParquetSplitReaderUtil}, modify to support INT64
- * based TIMESTAMP_MILLIS as ConvertedType, should remove when Flink supports that.
- */
-public class ParquetSplitReaderUtil {
-
- /**
- * Util for generating partitioned {@link ParquetColumnarRowSplitReader}.
- */
- public static ParquetColumnarRowSplitReader genPartColumnarRowReader(
- boolean utcTimestamp,
- boolean caseSensitive,
- Configuration conf,
- String[] fullFieldNames,
- DataType[] fullFieldTypes,
- Map partitionSpec,
- int[] selectedFields,
- int batchSize,
- Path path,
- long splitStart,
- long splitLength,
- FilterPredicate filterPredicate,
- UnboundRecordFilter recordFilter) throws IOException {
- List selNonPartNames = Arrays.stream(selectedFields)
- .mapToObj(i -> fullFieldNames[i])
- .filter(n -> !partitionSpec.containsKey(n))
- .collect(Collectors.toList());
-
- int[] selParquetFields = Arrays.stream(selectedFields)
- .filter(i -> !partitionSpec.containsKey(fullFieldNames[i]))
- .toArray();
-
- ParquetColumnarRowSplitReader.ColumnBatchGenerator gen = readVectors -> {
- // create and initialize the row batch
- ColumnVector[] vectors = new ColumnVector[selectedFields.length];
- for (int i = 0; i < vectors.length; i++) {
- String name = fullFieldNames[selectedFields[i]];
- LogicalType type = fullFieldTypes[selectedFields[i]].getLogicalType();
- vectors[i] = createVector(readVectors, selNonPartNames, name, type, partitionSpec, batchSize);
- }
- return new VectorizedColumnBatch(vectors);
- };
-
- return new ParquetColumnarRowSplitReader(
- utcTimestamp,
- caseSensitive,
- conf,
- Arrays.stream(selParquetFields)
- .mapToObj(i -> fullFieldTypes[i].getLogicalType())
- .toArray(LogicalType[]::new),
- selNonPartNames.toArray(new String[0]),
- gen,
- batchSize,
- new org.apache.hadoop.fs.Path(path.toUri()),
- splitStart,
- splitLength,
- filterPredicate,
- recordFilter);
- }
-
- private static ColumnVector createVector(
- ColumnVector[] readVectors,
- List selNonPartNames,
- String name,
- LogicalType type,
- Map partitionSpec,
- int batchSize) {
- if (partitionSpec.containsKey(name)) {
- return createVectorFromConstant(type, partitionSpec.get(name), batchSize);
- }
- ColumnVector readVector = readVectors[selNonPartNames.indexOf(name)];
- if (readVector == null) {
- // when the read vector is null, use a constant null vector instead
- readVector = createVectorFromConstant(type, null, batchSize);
- }
- return readVector;
- }
-
- private static ColumnVector createVectorFromConstant(
- LogicalType type,
- Object value,
- int batchSize) {
- switch (type.getTypeRoot()) {
- case CHAR:
- case VARCHAR:
- case BINARY:
- case VARBINARY:
- HeapBytesVector bsv = new HeapBytesVector(batchSize);
- if (value == null) {
- bsv.fillWithNulls();
- } else {
- bsv.fill(value instanceof byte[]
- ? (byte[]) value
- : getUTF8Bytes(value.toString()));
- }
- return bsv;
- case BOOLEAN:
- HeapBooleanVector bv = new HeapBooleanVector(batchSize);
- if (value == null) {
- bv.fillWithNulls();
- } else {
- bv.fill((boolean) value);
- }
- return bv;
- case TINYINT:
- HeapByteVector byteVector = new HeapByteVector(batchSize);
- if (value == null) {
- byteVector.fillWithNulls();
- } else {
- byteVector.fill(((Number) value).byteValue());
- }
- return byteVector;
- case SMALLINT:
- HeapShortVector sv = new HeapShortVector(batchSize);
- if (value == null) {
- sv.fillWithNulls();
- } else {
- sv.fill(((Number) value).shortValue());
- }
- return sv;
- case INTEGER:
- HeapIntVector iv = new HeapIntVector(batchSize);
- if (value == null) {
- iv.fillWithNulls();
- } else {
- iv.fill(((Number) value).intValue());
- }
- return iv;
- case BIGINT:
- HeapLongVector lv = new HeapLongVector(batchSize);
- if (value == null) {
- lv.fillWithNulls();
- } else {
- lv.fill(((Number) value).longValue());
- }
- return lv;
- case DECIMAL:
- DecimalType decimalType = (DecimalType) type;
- int precision = decimalType.getPrecision();
- int scale = decimalType.getScale();
- DecimalData decimal = value == null
- ? null
- : Preconditions.checkNotNull(DecimalData.fromBigDecimal((BigDecimal) value, precision, scale));
- ColumnVector internalVector = createVectorFromConstant(
- new VarBinaryType(),
- decimal == null ? null : decimal.toUnscaledBytes(),
- batchSize);
- return new ParquetDecimalVector(internalVector);
- case FLOAT:
- HeapFloatVector fv = new HeapFloatVector(batchSize);
- if (value == null) {
- fv.fillWithNulls();
- } else {
- fv.fill(((Number) value).floatValue());
- }
- return fv;
- case DOUBLE:
- HeapDoubleVector dv = new HeapDoubleVector(batchSize);
- if (value == null) {
- dv.fillWithNulls();
- } else {
- dv.fill(((Number) value).doubleValue());
- }
- return dv;
- case DATE:
- if (value instanceof LocalDate) {
- value = Date.valueOf((LocalDate) value);
- }
- return createVectorFromConstant(
- new IntType(),
- value == null ? null : dateToInternal((Date) value),
- batchSize);
- case TIMESTAMP_WITHOUT_TIME_ZONE:
- HeapTimestampVector tv = new HeapTimestampVector(batchSize);
- if (value == null) {
- tv.fillWithNulls();
- } else {
- tv.fill(TimestampData.fromLocalDateTime((LocalDateTime) value));
- }
- return tv;
- case ARRAY:
- HeapArrayVector arrayVector = new HeapArrayVector(batchSize);
- if (value == null) {
- arrayVector.fillWithNulls();
- return arrayVector;
- } else {
- throw new UnsupportedOperationException("Unsupported create array with default value.");
- }
- case MAP:
- HeapMapColumnVector mapVector = new HeapMapColumnVector(batchSize, null, null);
- if (value == null) {
- mapVector.fillWithNulls();
- return mapVector;
- } else {
- throw new UnsupportedOperationException("Unsupported create map with default value.");
- }
- case ROW:
- HeapRowColumnVector rowVector = new HeapRowColumnVector(batchSize);
- if (value == null) {
- rowVector.fillWithNulls();
- return rowVector;
- } else {
- throw new UnsupportedOperationException("Unsupported create row with default value.");
- }
- default:
- throw new UnsupportedOperationException("Unsupported type: " + type);
- }
- }
-
- private static List filterDescriptors(int depth, Type type, List columns) throws ParquetRuntimeException {
- List filtered = new ArrayList<>();
- for (ColumnDescriptor descriptor : columns) {
- if (depth >= descriptor.getPath().length) {
- throw new InvalidSchemaException("Expect depth " + depth + " for schema: " + descriptor);
- }
- if (type.getName().equals(descriptor.getPath()[depth])) {
- filtered.add(descriptor);
- }
- }
- ValidationUtils.checkState(filtered.size() > 0, "Corrupted Parquet schema");
- return filtered;
- }
-
- public static ColumnReader createColumnReader(
- boolean utcTimestamp,
- LogicalType fieldType,
- Type physicalType,
- List descriptors,
- PageReadStore pages) throws IOException {
- return createColumnReader(utcTimestamp, fieldType, physicalType, descriptors,
- pages, 0);
- }
-
- private static ColumnReader createColumnReader(
- boolean utcTimestamp,
- LogicalType fieldType,
- Type physicalType,
- List columns,
- PageReadStore pages,
- int depth) throws IOException {
- List descriptors = filterDescriptors(depth, physicalType, columns);
- ColumnDescriptor descriptor = descriptors.get(0);
- PageReader pageReader = pages.getPageReader(descriptor);
- switch (fieldType.getTypeRoot()) {
- case BOOLEAN:
- return new BooleanColumnReader(descriptor, pageReader);
- case TINYINT:
- return new ByteColumnReader(descriptor, pageReader);
- case DOUBLE:
- return new DoubleColumnReader(descriptor, pageReader);
- case FLOAT:
- return new FloatColumnReader(descriptor, pageReader);
- case INTEGER:
- case DATE:
- case TIME_WITHOUT_TIME_ZONE:
- return new IntColumnReader(descriptor, pageReader);
- case BIGINT:
- return new LongColumnReader(descriptor, pageReader);
- case SMALLINT:
- return new ShortColumnReader(descriptor, pageReader);
- case CHAR:
- case VARCHAR:
- case BINARY:
- case VARBINARY:
- return new BytesColumnReader(descriptor, pageReader);
- case TIMESTAMP_WITHOUT_TIME_ZONE:
- case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
- switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
- case INT64:
- int precision = fieldType instanceof TimestampType
- ? ((TimestampType) fieldType).getPrecision()
- : ((LocalZonedTimestampType) fieldType).getPrecision();
- return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, precision);
- case INT96:
- return new TimestampColumnReader(utcTimestamp, descriptor, pageReader);
- default:
- throw new AssertionError();
- }
- case DECIMAL:
- switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
- case INT32:
- return new IntColumnReader(descriptor, pageReader);
- case INT64:
- return new LongColumnReader(descriptor, pageReader);
- case BINARY:
- return new BytesColumnReader(descriptor, pageReader);
- case FIXED_LEN_BYTE_ARRAY:
- return new FixedLenBytesColumnReader(
- descriptor, pageReader);
- default:
- throw new AssertionError();
- }
- case ARRAY:
- return new ArrayColumnReader(
- descriptor,
- pageReader,
- utcTimestamp,
- descriptor.getPrimitiveType(),
- fieldType);
- case MAP:
- MapType mapType = (MapType) fieldType;
- ArrayColumnReader keyReader =
- new ArrayColumnReader(
- descriptor,
- pageReader,
- utcTimestamp,
- descriptor.getPrimitiveType(),
- new ArrayType(mapType.getKeyType()));
- ArrayColumnReader valueReader =
- new ArrayColumnReader(
- descriptors.get(1),
- pages.getPageReader(descriptors.get(1)),
- utcTimestamp,
- descriptors.get(1).getPrimitiveType(),
- new ArrayType(mapType.getValueType()));
- return new MapColumnReader(keyReader, valueReader, fieldType);
- case ROW:
- RowType rowType = (RowType) fieldType;
- GroupType groupType = physicalType.asGroupType();
- List fieldReaders = new ArrayList<>();
- for (int i = 0; i < rowType.getFieldCount(); i++) {
- // schema evolution: read the parquet file with a new extended field name.
- int fieldIndex = getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
- if (fieldIndex < 0) {
- fieldReaders.add(new EmptyColumnReader());
- } else {
- fieldReaders.add(
- createColumnReader(
- utcTimestamp,
- rowType.getTypeAt(i),
- groupType.getType(fieldIndex),
- descriptors,
- pages,
- depth + 1));
- }
- }
- return new RowColumnReader(fieldReaders);
- default:
- throw new UnsupportedOperationException(fieldType + " is not supported now.");
- }
- }
-
- public static WritableColumnVector createWritableColumnVector(
- int batchSize,
- LogicalType fieldType,
- Type physicalType,
- List descriptors) {
- return createWritableColumnVector(batchSize, fieldType, physicalType, descriptors, 0);
- }
-
- private static WritableColumnVector createWritableColumnVector(
- int batchSize,
- LogicalType fieldType,
- Type physicalType,
- List columns,
- int depth) {
- List descriptors = filterDescriptors(depth, physicalType, columns);
- PrimitiveType primitiveType = descriptors.get(0).getPrimitiveType();
- PrimitiveType.PrimitiveTypeName typeName = primitiveType.getPrimitiveTypeName();
- switch (fieldType.getTypeRoot()) {
- case BOOLEAN:
- checkArgument(
- typeName == PrimitiveType.PrimitiveTypeName.BOOLEAN,
- "Unexpected type: %s", typeName);
- return new HeapBooleanVector(batchSize);
- case TINYINT:
- checkArgument(
- typeName == PrimitiveType.PrimitiveTypeName.INT32,
- "Unexpected type: %s", typeName);
- return new HeapByteVector(batchSize);
- case DOUBLE:
- checkArgument(
- typeName == PrimitiveType.PrimitiveTypeName.DOUBLE,
- "Unexpected type: %s", typeName);
- return new HeapDoubleVector(batchSize);
- case FLOAT:
- checkArgument(
- typeName == PrimitiveType.PrimitiveTypeName.FLOAT,
- "Unexpected type: %s", typeName);
- return new HeapFloatVector(batchSize);
- case INTEGER:
- case DATE:
- case TIME_WITHOUT_TIME_ZONE:
- checkArgument(
- typeName == PrimitiveType.PrimitiveTypeName.INT32,
- "Unexpected type: %s", typeName);
- return new HeapIntVector(batchSize);
- case BIGINT:
- checkArgument(
- typeName == PrimitiveType.PrimitiveTypeName.INT64,
- "Unexpected type: %s", typeName);
- return new HeapLongVector(batchSize);
- case SMALLINT:
- checkArgument(
- typeName == PrimitiveType.PrimitiveTypeName.INT32,
- "Unexpected type: %s", typeName);
- return new HeapShortVector(batchSize);
- case CHAR:
- case VARCHAR:
- case BINARY:
- case VARBINARY:
- checkArgument(
- typeName == PrimitiveType.PrimitiveTypeName.BINARY,
- "Unexpected type: %s", typeName);
- return new HeapBytesVector(batchSize);
- case TIMESTAMP_WITHOUT_TIME_ZONE:
- case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
- checkArgument(primitiveType.getOriginalType() != OriginalType.TIME_MICROS,
- "TIME_MICROS original type is not ");
- return new HeapTimestampVector(batchSize);
- case DECIMAL:
- checkArgument(
- (typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
- || typeName == PrimitiveType.PrimitiveTypeName.BINARY)
- && primitiveType.getOriginalType() == OriginalType.DECIMAL,
- "Unexpected type: %s", typeName);
- return new HeapBytesVector(batchSize);
- case ARRAY:
- ArrayType arrayType = (ArrayType) fieldType;
- return new HeapArrayVector(
- batchSize,
- createWritableColumnVector(
- batchSize,
- arrayType.getElementType(),
- physicalType,
- descriptors,
- depth));
- case MAP:
- MapType mapType = (MapType) fieldType;
- GroupType repeatedType = physicalType.asGroupType().getType(0).asGroupType();
- // the map column has three level paths.
- return new HeapMapColumnVector(
- batchSize,
- createWritableColumnVector(
- batchSize,
- mapType.getKeyType(),
- repeatedType.getType(0),
- descriptors,
- depth + 2),
- createWritableColumnVector(
- batchSize,
- mapType.getValueType(),
- repeatedType.getType(1),
- descriptors,
- depth + 2));
- case ROW:
- RowType rowType = (RowType) fieldType;
- GroupType groupType = physicalType.asGroupType();
- WritableColumnVector[] columnVectors = new WritableColumnVector[rowType.getFieldCount()];
- for (int i = 0; i < columnVectors.length; i++) {
- // schema evolution: read the file with a new extended field name.
- int fieldIndex = getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
- if (fieldIndex < 0) {
- columnVectors[i] = (WritableColumnVector) createVectorFromConstant(rowType.getTypeAt(i), null, batchSize);
- } else {
- columnVectors[i] =
- createWritableColumnVector(
- batchSize,
- rowType.getTypeAt(i),
- groupType.getType(fieldIndex),
- descriptors,
- depth + 1);
- }
- }
- return new HeapRowColumnVector(batchSize, columnVectors);
- default:
- throw new UnsupportedOperationException(fieldType + " is not supported now.");
- }
- }
-
- /**
- * Returns the field index with given physical row type {@code groupType} and field name {@code fieldName}.
- *
- * @return The physical field index or -1 if the field does not exist
- */
- private static int getFieldIndexInPhysicalType(String fieldName, GroupType groupType) {
- // get index from fileSchema type, else, return -1
- return groupType.containsField(fieldName) ? groupType.getFieldIndex(fieldName) : -1;
- }
-}
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayVector.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayVector.java
deleted file mode 100644
index 6d31d26b8d978..0000000000000
--- a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayVector.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.format.cow.vector;
-
-import org.apache.hudi.table.data.ColumnarArrayData;
-
-import org.apache.flink.table.data.ArrayData;
-import org.apache.flink.table.data.vector.ArrayColumnVector;
-import org.apache.flink.table.data.vector.ColumnVector;
-import org.apache.flink.table.data.vector.heap.AbstractHeapVector;
-import org.apache.flink.table.data.vector.writable.WritableColumnVector;
-
-/**
- * This class represents a nullable heap array column vector.
- */
-public class HeapArrayVector extends AbstractHeapVector
- implements WritableColumnVector, ArrayColumnVector {
-
- public long[] offsets;
- public long[] lengths;
- public ColumnVector child;
- private int size;
-
- public HeapArrayVector(int len) {
- super(len);
- offsets = new long[len];
- lengths = new long[len];
- }
-
- public HeapArrayVector(int len, ColumnVector vector) {
- super(len);
- offsets = new long[len];
- lengths = new long[len];
- this.child = vector;
- }
-
- public int getSize() {
- return size;
- }
-
- public void setSize(int size) {
- this.size = size;
- }
-
- public int getLen() {
- return this.isNull.length;
- }
-
- @Override
- public ArrayData getArray(int i) {
- long offset = offsets[i];
- long length = lengths[i];
- return new ColumnarArrayData(child, (int) offset, (int) length);
- }
-}
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
deleted file mode 100644
index cf39fc981624a..0000000000000
--- a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.format.cow.vector;
-
-import org.apache.hudi.table.data.ColumnarMapData;
-import org.apache.hudi.table.data.vector.MapColumnVector;
-
-import org.apache.flink.table.data.MapData;
-import org.apache.flink.table.data.vector.ColumnVector;
-import org.apache.flink.table.data.vector.heap.AbstractHeapVector;
-import org.apache.flink.table.data.vector.writable.WritableColumnVector;
-
-/**
- * This class represents a nullable heap map column vector.
- */
-public class HeapMapColumnVector extends AbstractHeapVector
- implements WritableColumnVector, MapColumnVector {
-
- private long[] offsets;
- private long[] lengths;
- private int size;
- private ColumnVector keys;
- private ColumnVector values;
-
- public HeapMapColumnVector(int len, ColumnVector keys, ColumnVector values) {
- super(len);
- size = 0;
- offsets = new long[len];
- lengths = new long[len];
- this.keys = keys;
- this.values = values;
- }
-
- public void setOffsets(long[] offsets) {
- this.offsets = offsets;
- }
-
- public void setLengths(long[] lengths) {
- this.lengths = lengths;
- }
-
- public void setKeys(ColumnVector keys) {
- this.keys = keys;
- }
-
- public void setValues(ColumnVector values) {
- this.values = values;
- }
-
- public int getSize() {
- return size;
- }
-
- public void setSize(int size) {
- this.size = size;
- }
-
- @Override
- public MapData getMap(int i) {
- long offset = offsets[i];
- long length = lengths[i];
- return new ColumnarMapData(keys, values, (int) offset, (int) length);
- }
-}
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
deleted file mode 100644
index 03da9205d313e..0000000000000
--- a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.format.cow.vector;
-
-import org.apache.hudi.table.data.ColumnarRowData;
-import org.apache.hudi.table.data.vector.RowColumnVector;
-import org.apache.hudi.table.data.vector.VectorizedColumnBatch;
-
-import org.apache.flink.table.data.vector.heap.AbstractHeapVector;
-import org.apache.flink.table.data.vector.writable.WritableColumnVector;
-
-/**
- * This class represents a nullable heap row column vector.
- */
-public class HeapRowColumnVector extends AbstractHeapVector
- implements WritableColumnVector, RowColumnVector {
-
- public WritableColumnVector[] vectors;
-
- public HeapRowColumnVector(int len, WritableColumnVector... vectors) {
- super(len);
- this.vectors = vectors;
- }
-
- @Override
- public ColumnarRowData getRow(int i) {
- ColumnarRowData columnarRowData = new ColumnarRowData(new VectorizedColumnBatch(vectors));
- columnarRowData.setRowId(i);
- return columnarRowData;
- }
-
- @Override
- public void reset() {
- super.reset();
- for (WritableColumnVector vector : vectors) {
- vector.reset();
- }
- }
-}
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/ParquetDecimalVector.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/ParquetDecimalVector.java
deleted file mode 100644
index a2f6d5b0cd74c..0000000000000
--- a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/ParquetDecimalVector.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.format.cow.vector;
-
-import org.apache.flink.table.data.DecimalData;
-import org.apache.flink.table.data.vector.BytesColumnVector;
-import org.apache.flink.table.data.vector.ColumnVector;
-import org.apache.flink.table.data.vector.DecimalColumnVector;
-
-/**
- * Parquet write decimal as int32 and int64 and binary, this class wrap the real vector to
- * provide {@link DecimalColumnVector} interface.
- *
- *
Reference Flink release 1.11.2 {@link org.apache.flink.formats.parquet.vector.ParquetDecimalVector}
- * because it is not public.
- */
-public class ParquetDecimalVector implements DecimalColumnVector {
-
- public final ColumnVector vector;
-
- public ParquetDecimalVector(ColumnVector vector) {
- this.vector = vector;
- }
-
- @Override
- public DecimalData getDecimal(int i, int precision, int scale) {
- return DecimalData.fromUnscaledBytes(
- ((BytesColumnVector) vector).getBytes(i).getBytes(),
- precision,
- scale);
- }
-
- @Override
- public boolean isNullAt(int i) {
- return vector.isNullAt(i);
- }
-}
-
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/AbstractColumnReader.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/AbstractColumnReader.java
deleted file mode 100644
index 07416a371715c..0000000000000
--- a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/AbstractColumnReader.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.format.cow.vector.reader;
-
-import org.apache.flink.formats.parquet.vector.ParquetDictionary;
-import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
-import org.apache.flink.table.data.vector.writable.WritableColumnVector;
-import org.apache.flink.table.data.vector.writable.WritableIntVector;
-import org.apache.parquet.Preconditions;
-import org.apache.parquet.bytes.ByteBufferInputStream;
-import org.apache.parquet.bytes.BytesInput;
-import org.apache.parquet.bytes.BytesUtils;
-import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.Dictionary;
-import org.apache.parquet.column.Encoding;
-import org.apache.parquet.column.page.DataPage;
-import org.apache.parquet.column.page.DataPageV1;
-import org.apache.parquet.column.page.DataPageV2;
-import org.apache.parquet.column.page.DictionaryPage;
-import org.apache.parquet.column.page.PageReader;
-import org.apache.parquet.column.values.ValuesReader;
-import org.apache.parquet.io.ParquetDecodingException;
-import org.apache.parquet.schema.PrimitiveType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
-
-/**
- * Abstract {@link ColumnReader}.
- * See {@link org.apache.parquet.column.impl.ColumnReaderImpl},
- * part of the code is referred from Apache Spark and Apache Parquet.
- *
- *
Note: Reference Flink release 1.11.2 {@link org.apache.flink.formats.parquet.vector.reader.AbstractColumnReader}
- * because some of the package scope methods.
- */
-public abstract class AbstractColumnReader
- implements ColumnReader {
-
- private static final Logger LOG = LoggerFactory.getLogger(org.apache.flink.formats.parquet.vector.reader.AbstractColumnReader.class);
-
- private final PageReader pageReader;
-
- /**
- * The dictionary, if this column has dictionary encoding.
- */
- protected final Dictionary dictionary;
-
- /**
- * Maximum definition level for this column.
- */
- protected final int maxDefLevel;
-
- protected final ColumnDescriptor descriptor;
-
- /**
- * Total number of values read.
- */
- private long valuesRead;
-
- /**
- * value that indicates the end of the current page. That is, if valuesRead ==
- * endOfPageValueCount, we are at the end of the page.
- */
- private long endOfPageValueCount;
-
- /**
- * If true, the current page is dictionary encoded.
- */
- private boolean isCurrentPageDictionaryEncoded;
-
- /**
- * Total values in the current page.
- */
- private int pageValueCount;
-
- /*
- * Input streams:
- * 1.Run length encoder to encode every data, so we have run length stream to get
- * run length information.
- * 2.Data maybe is real data, maybe is dictionary ids which need be decode to real
- * data from Dictionary.
- *
- * Run length stream ------> Data stream
- * |
- * ------> Dictionary ids stream
- */
-
- /**
- * Run length decoder for data and dictionary.
- */
- protected RunLengthDecoder runLenDecoder;
-
- /**
- * Data input stream.
- */
- ByteBufferInputStream dataInputStream;
-
- /**
- * Dictionary decoder to wrap dictionary ids input stream.
- */
- private RunLengthDecoder dictionaryIdsDecoder;
-
- public AbstractColumnReader(
- ColumnDescriptor descriptor,
- PageReader pageReader) throws IOException {
- this.descriptor = descriptor;
- this.pageReader = pageReader;
- this.maxDefLevel = descriptor.getMaxDefinitionLevel();
-
- DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
- if (dictionaryPage != null) {
- try {
- this.dictionary = dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage);
- this.isCurrentPageDictionaryEncoded = true;
- } catch (IOException e) {
- throw new IOException("could not decode the dictionary for " + descriptor, e);
- }
- } else {
- this.dictionary = null;
- this.isCurrentPageDictionaryEncoded = false;
- }
- /*
- * Total number of values in this column (in this row group).
- */
- long totalValueCount = pageReader.getTotalValueCount();
- if (totalValueCount == 0) {
- throw new IOException("totalValueCount == 0");
- }
- }
-
- protected void checkTypeName(PrimitiveType.PrimitiveTypeName expectedName) {
- PrimitiveType.PrimitiveTypeName actualName = descriptor.getPrimitiveType().getPrimitiveTypeName();
- Preconditions.checkArgument(
- actualName == expectedName,
- "Expected type name: %s, actual type name: %s",
- expectedName,
- actualName);
- }
-
- /**
- * Reads `total` values from this columnReader into column.
- */
- @Override
- public final void readToVector(int readNumber, V vector) throws IOException {
- int rowId = 0;
- WritableIntVector dictionaryIds = null;
- if (dictionary != null) {
- dictionaryIds = vector.reserveDictionaryIds(readNumber);
- }
- while (readNumber > 0) {
- // Compute the number of values we want to read in this page.
- int leftInPage = (int) (endOfPageValueCount - valuesRead);
- if (leftInPage == 0) {
- DataPage page = pageReader.readPage();
- if (page instanceof DataPageV1) {
- readPageV1((DataPageV1) page);
- } else if (page instanceof DataPageV2) {
- readPageV2((DataPageV2) page);
- } else {
- throw new RuntimeException("Unsupported page type: " + page.getClass());
- }
- leftInPage = (int) (endOfPageValueCount - valuesRead);
- }
- int num = Math.min(readNumber, leftInPage);
- if (isCurrentPageDictionaryEncoded) {
- // Read and decode dictionary ids.
- runLenDecoder.readDictionaryIds(
- num, dictionaryIds, vector, rowId, maxDefLevel, this.dictionaryIdsDecoder);
-
- if (vector.hasDictionary() || (rowId == 0 && supportLazyDecode())) {
- // Column vector supports lazy decoding of dictionary values so just set the dictionary.
- // We can't do this if rowId != 0 AND the column doesn't have a dictionary (i.e. some
- // non-dictionary encoded values have already been added).
- vector.setDictionary(new ParquetDictionary(dictionary));
- } else {
- readBatchFromDictionaryIds(rowId, num, vector, dictionaryIds);
- }
- } else {
- if (vector.hasDictionary() && rowId != 0) {
- // This batch already has dictionary encoded values but this new page is not. The batch
- // does not support a mix of dictionary and not so we will decode the dictionary.
- readBatchFromDictionaryIds(0, rowId, vector, vector.getDictionaryIds());
- }
- vector.setDictionary(null);
- readBatch(rowId, num, vector);
- }
-
- valuesRead += num;
- rowId += num;
- readNumber -= num;
- }
- }
-
- private void readPageV1(DataPageV1 page) throws IOException {
- this.pageValueCount = page.getValueCount();
- ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL);
-
- // Initialize the decoders.
- if (page.getDlEncoding() != Encoding.RLE && descriptor.getMaxDefinitionLevel() != 0) {
- throw new UnsupportedOperationException("Unsupported encoding: " + page.getDlEncoding());
- }
- int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
- this.runLenDecoder = new RunLengthDecoder(bitWidth);
- try {
- BytesInput bytes = page.getBytes();
- ByteBufferInputStream in = bytes.toInputStream();
- rlReader.initFromPage(pageValueCount, in);
- this.runLenDecoder.initFromStream(pageValueCount, in);
- prepareNewPage(page.getValueEncoding(), in);
- } catch (IOException e) {
- throw new IOException("could not read page " + page + " in col " + descriptor, e);
- }
- }
-
- private void readPageV2(DataPageV2 page) throws IOException {
- this.pageValueCount = page.getValueCount();
-
- int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
- // do not read the length from the stream. v2 pages handle dividing the page bytes.
- this.runLenDecoder = new RunLengthDecoder(bitWidth, false);
- this.runLenDecoder.initFromStream(
- this.pageValueCount, page.getDefinitionLevels().toInputStream());
- try {
- prepareNewPage(page.getDataEncoding(), page.getData().toInputStream());
- } catch (IOException e) {
- throw new IOException("could not read page " + page + " in col " + descriptor, e);
- }
- }
-
- private void prepareNewPage(
- Encoding dataEncoding,
- ByteBufferInputStream in) throws IOException {
- this.endOfPageValueCount = valuesRead + pageValueCount;
- if (dataEncoding.usesDictionary()) {
- if (dictionary == null) {
- throw new IOException("Could not read page in col "
- + descriptor
- + " as the dictionary was missing for encoding "
- + dataEncoding);
- }
- @SuppressWarnings("deprecation")
- Encoding plainDict = Encoding.PLAIN_DICTIONARY; // var to allow warning suppression
- if (dataEncoding != plainDict && dataEncoding != Encoding.RLE_DICTIONARY) {
- throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding);
- }
- this.dataInputStream = null;
- this.dictionaryIdsDecoder = new RunLengthDecoder();
- try {
- this.dictionaryIdsDecoder.initFromStream(pageValueCount, in);
- } catch (IOException e) {
- throw new IOException("could not read dictionary in col " + descriptor, e);
- }
- this.isCurrentPageDictionaryEncoded = true;
- } else {
- if (dataEncoding != Encoding.PLAIN) {
- throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding);
- }
- this.dictionaryIdsDecoder = null;
- LOG.debug("init from page at offset {} for length {}", in.position(), in.available());
- this.dataInputStream = in.remainingStream();
- this.isCurrentPageDictionaryEncoded = false;
- }
-
- afterReadPage();
- }
-
- final ByteBuffer readDataBuffer(int length) {
- try {
- return dataInputStream.slice(length).order(ByteOrder.LITTLE_ENDIAN);
- } catch (IOException e) {
- throw new ParquetDecodingException("Failed to read " + length + " bytes", e);
- }
- }
-
- /**
- * After read a page, we may need some initialization.
- */
- protected void afterReadPage() {
- }
-
- /**
- * Support lazy dictionary ids decode. See more in {@link ParquetDictionary}.
- * If return false, we will decode all the data first.
- */
- protected boolean supportLazyDecode() {
- return true;
- }
-
- /**
- * Read batch from {@link #runLenDecoder} and {@link #dataInputStream}.
- */
- protected abstract void readBatch(int rowId, int num, V column);
-
- /**
- * Decode dictionary ids to data.
- * From {@link #runLenDecoder} and {@link #dictionaryIdsDecoder}.
- */
- protected abstract void readBatchFromDictionaryIds(
- int rowId,
- int num,
- V column,
- WritableIntVector dictionaryIds);
-}
-
diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayColumnReader.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayColumnReader.java
deleted file mode 100644
index 67dbb74902605..0000000000000
--- a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayColumnReader.java
+++ /dev/null
@@ -1,473 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.format.cow.vector.reader;
-
-import org.apache.hudi.table.data.vector.VectorizedColumnBatch;
-import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
-import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector;
-
-import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
-import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.data.vector.heap.HeapBooleanVector;
-import org.apache.flink.table.data.vector.heap.HeapByteVector;
-import org.apache.flink.table.data.vector.heap.HeapBytesVector;
-import org.apache.flink.table.data.vector.heap.HeapDoubleVector;
-import org.apache.flink.table.data.vector.heap.HeapFloatVector;
-import org.apache.flink.table.data.vector.heap.HeapIntVector;
-import org.apache.flink.table.data.vector.heap.HeapLongVector;
-import org.apache.flink.table.data.vector.heap.HeapShortVector;
-import org.apache.flink.table.data.vector.heap.HeapTimestampVector;
-import org.apache.flink.table.data.vector.writable.WritableColumnVector;
-import org.apache.flink.table.types.logical.ArrayType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.page.PageReader;
-import org.apache.parquet.schema.PrimitiveType;
-import org.apache.parquet.schema.Type;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Array {@link ColumnReader}.
- */
-public class ArrayColumnReader extends BaseVectorizedColumnReader {
-
- // The value read in last time
- private Object lastValue;
-
- // flag to indicate if there is no data in parquet data page
- private boolean eof = false;
-
- // flag to indicate if it's the first time to read parquet data page with this instance
- boolean isFirstRow = true;
-
- public ArrayColumnReader(
- ColumnDescriptor descriptor,
- PageReader pageReader,
- boolean isUtcTimestamp,
- Type type,
- LogicalType logicalType)
- throws IOException {
- super(descriptor, pageReader, isUtcTimestamp, type, logicalType);
- }
-
- @Override
- public void readToVector(int readNumber, WritableColumnVector vector) throws IOException {
- HeapArrayVector lcv = (HeapArrayVector) vector;
- // before readBatch, initial the size of offsets & lengths as the default value,
- // the actual size will be assigned in setChildrenInfo() after reading complete.
- lcv.offsets = new long[VectorizedColumnBatch.DEFAULT_SIZE];
- lcv.lengths = new long[VectorizedColumnBatch.DEFAULT_SIZE];
- // Because the length of ListColumnVector.child can't be known now,
- // the valueList will save all data for ListColumnVector temporary.
- List