Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
wangzhiwubigdata committed Apr 23, 2020
1 parent e889b34 commit dd36eae
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 0 deletions.
43 changes: 43 additions & 0 deletions src/main/java/org/myorg/quickstart/Table05/Item.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.myorg.quickstart.Table05;

import java.io.Serializable;

/**
* Created by wangchangye on 2020/4/23.
*/
public class Item implements Serializable{
private String name;
private Integer id;

public Item() {
}

public Item(String name, Integer id) {
this.name = name;
this.id = id;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public Integer getId() {
return id;
}

public void setId(Integer id) {
this.id = id;
}

@Override
public String toString() {
return "Item{" +
"name='" + name + '\'' +
", id=" + id +
'}';
}
}
115 changes: 115 additions & 0 deletions src/main/java/org/myorg/quickstart/Table05/MyStreamingSource.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package org.myorg.quickstart.Table05;


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

public class MyStreamingSource implements SourceFunction<Item> {

private boolean isRunning = true;

/**
* 重写run方法产生一个源源不断的数据发送源
* @param ctx
* @throws Exception
*/
public void run(SourceContext<Item> ctx) throws Exception {
while(isRunning){
Item item = generateItem();
ctx.collect(item);

//每秒产生一条数据
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isRunning = false;
}

//随机产生一条商品数据
private Item generateItem(){
int i = new Random().nextInt(100);
ArrayList<String> list = new ArrayList();
list.add("HAT");
list.add("TIE");
list.add("SHOE");
Item item = new Item();
item.setName(list.get(new Random().nextInt(3)));
item.setId(i);
return item;
}
}


class StreamingDemo {
public static void main(String[] args) throws Exception {

EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);

SingleOutputStreamOperator<Item> source = bsEnv.addSource(new MyStreamingSource()).map(new MapFunction<Item, Item>() {
@Override
public Item map(Item item) throws Exception {
return item;
}
});

DataStream<Item> evenSelect = source.split(new OutputSelector<Item>() {
@Override
public Iterable<String> select(Item value) {
List<String> output = new ArrayList<>();
if (value.getId() % 2 == 0) {
output.add("even");
} else {
output.add("odd");
}
return output;
}
}).select("even");

DataStream<Item> oddSelect = source.split(new OutputSelector<Item>() {
@Override
public Iterable<String> select(Item value) {
List<String> output = new ArrayList<>();
if (value.getId() % 2 == 0) {
output.add("even");
} else {
output.add("odd");
}
return output;
}
}).select("odd");


bsTableEnv.createTemporaryView("evenTable", evenSelect, "name,id");
bsTableEnv.createTemporaryView("oddTable", oddSelect, "name,id");

Table queryTable = bsTableEnv.sqlQuery("select a.id,a.name,b.id,b.name from evenTable as a join oddTable as b on a.name = b.name");

queryTable.printSchema();

bsTableEnv.toRetractStream(queryTable, TypeInformation.of(new TypeHint<Tuple4<Integer,String,Integer,String>>(){})).print();

bsEnv.execute("streaming sql job");
}

}
43 changes: 43 additions & 0 deletions src/main/java/org/myorg/quickstart/Table05/ResultItem.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.myorg.quickstart.Table05;

/**
* Created by wangchangye on 2020/4/23.
*/
public class ResultItem {
private String aname;
private Integer aid;
private String bname;
private Integer bid;

public String getAname() {
return aname;
}

public void setAname(String aname) {
this.aname = aname;
}

public Integer getAid() {
return aid;
}

public void setAid(Integer aid) {
this.aid = aid;
}

public String getBname() {
return bname;
}

public void setBname(String bname) {
this.bname = bname;
}

public Integer getBid() {
return bid;
}

public void setBid(Integer bid) {
this.bid = bid;
}
}

0 comments on commit dd36eae

Please sign in to comment.