Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-7072] Remove support for Flink 1.13 #10052

Merged
merged 3 commits into from
Nov 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions .github/workflows/bot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@ jobs:
strategy:
matrix:
include:
- flinkProfile: "flink1.13"
- flinkProfile: "flink1.14"
- flinkProfile: "flink1.15"
- flinkProfile: "flink1.16"
Expand Down Expand Up @@ -315,13 +314,13 @@ jobs:
- flinkProfile: 'flink1.14'
sparkProfile: 'spark3.2'
sparkRuntime: 'spark3.2.3'
- flinkProfile: 'flink1.13'
Copy link
Contributor

@danny0405 danny0405 Nov 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have flink1.14.6 and spark 3.1.3 doker image now, can just upgrade to flink1.14.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

- flinkProfile: 'flink1.14'
sparkProfile: 'spark3.1'
sparkRuntime: 'spark3.1.3'
- flinkProfile: 'flink1.14'
sparkProfile: 'spark3.0'
sparkRuntime: 'spark3.0.2'
- flinkProfile: 'flink1.13'
- flinkProfile: 'flink1.14'
sparkProfile: 'spark2.4'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have flink1.14.6 and spark 2.4.8 doker image now, can just upgrade to flink1.14.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

sparkRuntime: 'spark2.4.8'
steps:
Expand Down Expand Up @@ -390,13 +389,13 @@ jobs:
- flinkProfile: 'flink1.14'
sparkProfile: 'spark3.2'
sparkRuntime: 'spark3.2.3'
- flinkProfile: 'flink1.13'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have flink1.14.6 and spark 3.1.3 doker image now, can just upgrade to flink1.14.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

- flinkProfile: 'flink1.14'
sparkProfile: 'spark3.1'
sparkRuntime: 'spark3.1.3'
- flinkProfile: 'flink1.13'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have flink1.14.6 and spark 2.4.8 doker image now, can just upgrade to flink1.14.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

- flinkProfile: 'flink1.14'
sparkProfile: 'spark'
sparkRuntime: 'spark2.4.8'
- flinkProfile: 'flink1.13'
- flinkProfile: 'flink1.14'
sparkProfile: 'spark2.4'
sparkRuntime: 'spark2.4.8'
steps:
Expand Down
5 changes: 0 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ Refer to the table below for building with different Flink and Scala versions.
| `-Dflink1.15` | hudi-flink1.15-bundle | For Flink 1.15 |
| `-Dflink1.14` | hudi-flink1.14-bundle | For Flink 1.14 and Scala 2.12 |
| `-Dflink1.14 -Dscala-2.11` | hudi-flink1.14-bundle | For Flink 1.14 and Scala 2.11 |
| `-Dflink1.13` | hudi-flink1.13-bundle | For Flink 1.13 and Scala 2.12 |
| `-Dflink1.13 -Dscala-2.11` | hudi-flink1.13-bundle | For Flink 1.13 and Scala 2.11 |

For example,
```
Expand All @@ -142,9 +140,6 @@ mvn clean package -DskipTests -Dflink1.15
# Build against Flink 1.14.x and Scala 2.11
mvn clean package -DskipTests -Dflink1.14 -Dscala-2.11
# Build against Flink 1.13.x and Scala 2.12
mvn clean package -DskipTests -Dflink1.13
```

## Running Tests
Expand Down
3 changes: 0 additions & 3 deletions azure-pipelines-20230430.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ parameters:
- 'hudi-common'
- 'hudi-flink-datasource'
- 'hudi-flink-datasource/hudi-flink'
- 'hudi-flink-datasource/hudi-flink1.13.x'
- 'hudi-flink-datasource/hudi-flink1.14.x'
- 'hudi-flink-datasource/hudi-flink1.15.x'
- 'hudi-flink-datasource/hudi-flink1.16.x'
Expand Down Expand Up @@ -65,7 +64,6 @@ parameters:
- '!hudi-examples/hudi-examples-spark'
- '!hudi-flink-datasource'
- '!hudi-flink-datasource/hudi-flink'
- '!hudi-flink-datasource/hudi-flink1.13.x'
- '!hudi-flink-datasource/hudi-flink1.14.x'
- '!hudi-flink-datasource/hudi-flink1.15.x'
- '!hudi-flink-datasource/hudi-flink1.16.x'
Expand All @@ -89,7 +87,6 @@ parameters:
- '!hudi-examples/hudi-examples-spark'
- '!hudi-flink-datasource'
- '!hudi-flink-datasource/hudi-flink'
- '!hudi-flink-datasource/hudi-flink1.13.x'
- '!hudi-flink-datasource/hudi-flink1.14.x'
- '!hudi-flink-datasource/hudi-flink1.15.x'
- '!hudi-flink-datasource/hudi-flink1.16.x'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@

package org.apache.hudi.sink.transform;

import org.apache.hudi.adapter.RateLimiterAdapter;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.RateLimiter;
import org.apache.hudi.configuration.FlinkOptions;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;

import java.util.concurrent.TimeUnit;

/**
* Function that transforms RowData to a HoodieRecord with RateLimit.
*/
Expand All @@ -39,7 +41,7 @@ public class RowDataToHoodieFunctionWithRateLimit<I extends RowData, O extends H
/**
* Rate limit per second for per task.
*/
private transient RateLimiterAdapter rateLimiter;
private transient RateLimiter rateLimiter;

public RowDataToHoodieFunctionWithRateLimit(RowType rowType, Configuration config) {
super(rowType, config);
Expand All @@ -50,12 +52,12 @@ public RowDataToHoodieFunctionWithRateLimit(RowType rowType, Configuration confi
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.rateLimiter =
RateLimiterAdapter.create(totalLimit / getRuntimeContext().getNumberOfParallelSubtasks());
RateLimiter.create((int) totalLimit / getRuntimeContext().getNumberOfParallelSubtasks(), TimeUnit.SECONDS);
}

@Override
public O map(I i) throws Exception {
rateLimiter.acquire();
rateLimiter.acquire(1);
return super.map(i);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,28 @@

package org.apache.hudi.source;

import org.apache.hudi.adapter.AbstractStreamOperatorAdapter;
import org.apache.hudi.adapter.AbstractStreamOperatorFactoryAdapter;
import org.apache.hudi.adapter.MailboxExecutorAdapter;
import org.apache.hudi.adapter.Utils;
import org.apache.hudi.metrics.FlinkStreamReadMetrics;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;

import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.StreamSourceContexts;
import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
Expand All @@ -60,7 +63,7 @@
* This architecture allows the separation of split reading from processing the checkpoint barriers,
* thus removing any potential back-pressure.
*/
public class StreamReadOperator extends AbstractStreamOperatorAdapter<RowData>
public class StreamReadOperator extends AbstractStreamOperator<RowData>
implements OneInputStreamOperator<MergeOnReadInputSplit, RowData> {

private static final Logger LOG = LoggerFactory.getLogger(StreamReadOperator.class);
Expand All @@ -70,7 +73,7 @@ public class StreamReadOperator extends AbstractStreamOperatorAdapter<RowData>
// It's the same thread that runs this operator and checkpoint actions. Use this executor to schedule only
// splits for subsequent reading, so that a new checkpoint could be triggered without blocking a long time
// for exhausting all scheduled split reading tasks.
private final MailboxExecutorAdapter executor;
private final MailboxExecutor executor;

private MergeOnReadInputFormat format;

Expand All @@ -89,7 +92,7 @@ public class StreamReadOperator extends AbstractStreamOperatorAdapter<RowData>
private transient FlinkStreamReadMetrics readMetrics;

private StreamReadOperator(MergeOnReadInputFormat format, ProcessingTimeService timeService,
MailboxExecutorAdapter mailboxExecutor) {
MailboxExecutor mailboxExecutor) {
this.format = Preconditions.checkNotNull(format, "The InputFormat should not be null.");
this.processingTimeService = timeService;
this.executor = Preconditions.checkNotNull(mailboxExecutor, "The mailboxExecutor should not be null.");
Expand Down Expand Up @@ -119,10 +122,9 @@ public void initializeState(StateInitializationContext context) throws Exception
}
}

this.sourceContext = Utils.getSourceContext(
this.sourceContext = getSourceContext(
getOperatorConfig().getTimeCharacteristic(),
getProcessingTimeService(),
getContainingTask(),
output,
getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval());

Expand Down Expand Up @@ -247,8 +249,8 @@ private enum SplitState {
IDLE, RUNNING
}

private static class OperatorFactory extends AbstractStreamOperatorFactoryAdapter<RowData>
implements OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> {
private static class OperatorFactory extends AbstractStreamOperatorFactory<RowData>
implements OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData>, YieldingOperatorFactory<RowData> {

private final MergeOnReadInputFormat format;

Expand All @@ -259,7 +261,7 @@ private OperatorFactory(MergeOnReadInputFormat format) {
@SuppressWarnings("unchecked")
@Override
public <O extends StreamOperator<RowData>> O createStreamOperator(StreamOperatorParameters<RowData> parameters) {
StreamReadOperator operator = new StreamReadOperator(format, processingTimeService, getMailboxExecutorAdapter());
StreamReadOperator operator = new StreamReadOperator(format, processingTimeService, getMailboxExecutor());
operator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
return (O) operator;
}
Expand All @@ -269,4 +271,19 @@ public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classL
return StreamReadOperator.class;
}
}

private static <O> SourceFunction.SourceContext<O> getSourceContext(
TimeCharacteristic timeCharacteristic,
ProcessingTimeService processingTimeService,
Output<StreamRecord<O>> output,
long watermarkInterval) {
return StreamSourceContexts.getSourceContext(
timeCharacteristic,
processingTimeService,
new Object(), // no actual locking needed
output,
watermarkInterval,
-1,
true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@

package org.apache.hudi.sink.utils;

import org.apache.hudi.adapter.OutputAdapter;

import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.OutputTag;

import java.util.ArrayList;
Expand All @@ -32,7 +31,7 @@
/**
* Collecting {@link Output} for {@link StreamRecord}.
*/
public class CollectorOutput<T> implements OutputAdapter<StreamRecord<T>> {
public class CollectorOutput<T> implements Output<StreamRecord<T>> {

private final List<T> records;

Expand Down Expand Up @@ -68,4 +67,9 @@ public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
public void close() {
this.records.clear();
}

@Override
public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {
// no operation
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@

package org.apache.hudi.sink.utils;

import org.apache.hudi.adapter.StateInitializationContextAdapter;

import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StatePartitionStreamProvider;

import java.util.OptionalLong;

/**
* A {@link FunctionInitializationContext} for testing purpose.
*/
public class MockStateInitializationContext implements StateInitializationContextAdapter {
public class MockStateInitializationContext implements StateInitializationContext {

private final MockOperatorStateStore operatorStateStore;

Expand Down Expand Up @@ -59,4 +60,9 @@ public Iterable<StatePartitionStreamProvider> getRawOperatorStateInputs() {
public Iterable<KeyGroupStatePartitionStreamProvider> getRawKeyedStateInputs() {
return null;
}

@Override
public OptionalLong getRestoredCheckpointId() {
return OptionalLong.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.hudi.sink.utils;

import org.apache.hudi.adapter.StreamingRuntimeContextAdapter;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
Expand All @@ -37,7 +37,7 @@
*
* <p>NOTE: Adapted from Apache Flink, the MockStreamOperator is modified to support MapState.
*/
public class MockStreamingRuntimeContext extends StreamingRuntimeContextAdapter {
public class MockStreamingRuntimeContext extends StreamingRuntimeContext {

private final boolean isCheckpointingEnabled;

Expand Down Expand Up @@ -128,4 +128,9 @@ public KeyedStateStore getKeyedStateStore() {
return mockOperatorStateStore;
}
}

@Override
public OperatorMetricGroup getMetricGroup() {
return UnregisteredMetricsGroup.createOperatorMetricGroup();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.table;

import org.apache.hudi.adapter.TestTableEnvs;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
Expand All @@ -32,6 +31,7 @@
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestSQL;
import org.apache.hudi.utils.TestTableEnvs;
import org.apache.hudi.utils.TestUtils;
import org.apache.hudi.utils.factory.CollectSinkTableFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.hudi.table.catalog;

import org.apache.hudi.adapter.TestTableEnvs;
import org.apache.hudi.utils.TestTableEnvs;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableEnvironment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.hudi.adapter;
package org.apache.hudi.utils;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down
Loading
Loading