Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
wangzhiwubigdata committed May 5, 2020
1 parent dd36eae commit b7ccc65
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 2 deletions.
4 changes: 2 additions & 2 deletions src/main/java/org/myorg/quickstart/BatchJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public static void main(String[] args) throws Exception {

// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(5);
// get input data
DataSet<String> text = env.fromElements(
"Flink Spark Storm",
Expand All @@ -44,7 +44,7 @@ public static void main(String[] args) throws Exception {
DataSet<Tuple2<String, Integer>> counts =
text.flatMap(new LineSplitter())
.groupBy(0)
.sum(1);
.sum(1).setParallelism(1);

counts.printToErr();

Expand Down
75 changes: 75 additions & 0 deletions src/main/java/org/myorg/quickstart/CoreConcepts07/BatchJob.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@

package org.myorg.quickstart.CoreConcepts07;

import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class BatchJob {


public static void main(String[] args) throws Exception {

// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// env.setRestartStrategy(RestartStrategies.noRestart());
// env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
// 3, // 尝试重启的次数
// Time.of(10, TimeUnit.SECONDS) // 延时
// ));

env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 每个时间间隔的最大故障次数
Time.of(5, TimeUnit.MINUTES), // 测量故障率的时间间隔
Time.of(5, TimeUnit.SECONDS) // 延时
));






env.registerCachedFile("/Users/wangchangye/WorkSpace/quickstart/distributedcache.txt", "distributedCache");
//1:注册一个文件,可以使用hdfs上的文件 也可以是本地文件进行测试
DataSource<String> data = env.fromElements("Linea", "Lineb", "Linec", "Lined");

DataSet<String> result = data.map(new RichMapFunction<String, String>() {
private ArrayList<String> dataList = new ArrayList<String>();

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//2:使用文件
File myFile = getRuntimeContext().getDistributedCache().getFile("distributedCache");
List<String> lines = FileUtils.readLines(myFile);
for (String line : lines) {
this.dataList.add(line);
System.err.println("分布式缓存为:" + line);
}
}

@Override
public String map(String value) throws Exception {
//在这里就可以使用dataList
System.err.println("使用datalist:" + dataList + "------------" +value);
//业务逻辑
return dataList +":" + value;
}
});

result.printToErr();
}
}

0 comments on commit b7ccc65

Please sign in to comment.