Skip to content

Commit ef51008

Browse files
authored
Merge pull request #297 from elenacliu/hopping
[ISSUE #304] add 2 window examples
2 parents 4bd58c8 + cbf6a92 commit ef51008

File tree

2 files changed

+129
-0
lines changed

2 files changed

+129
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package org.apache.rocketmq.streams.examples.pojo;
2+
3+
public class Order {
4+
private String type; // drink, food,
5+
private Integer price; // order price
6+
private String customer; // customer name
7+
8+
public Order() {
9+
10+
}
11+
12+
public Order(String type, Integer price, String customer) {
13+
this.type = type;
14+
this.price = price;
15+
this.customer = customer;
16+
}
17+
18+
public String getType() {
19+
return type;
20+
}
21+
22+
public String getCustomer() {
23+
return customer;
24+
}
25+
26+
public Integer getPrice() {
27+
return price;
28+
}
29+
30+
public void setType(String type) {
31+
this.type = type;
32+
}
33+
34+
public void setCustomer(String customer) {
35+
this.customer = customer;
36+
}
37+
38+
public void setPrice(Integer price) {
39+
this.price = price;
40+
}
41+
42+
@Override
43+
public String toString() {
44+
return "Order{" + "type=" + type + ", price=" + price + ", customer=" + customer + "}";
45+
}
46+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package org.apache.rocketmq.streams.examples.window;
2+
3+
import java.util.Properties;
4+
5+
import org.apache.rocketmq.common.MixAll;
6+
import org.apache.rocketmq.streams.core.RocketMQStream;
7+
import org.apache.rocketmq.streams.core.function.SelectAction;
8+
import org.apache.rocketmq.streams.core.metadata.StreamConfig;
9+
import org.apache.rocketmq.streams.core.rstream.StreamBuilder;
10+
import org.apache.rocketmq.streams.core.topology.TopologyBuilder;
11+
import org.apache.rocketmq.streams.core.util.Pair;
12+
import org.apache.rocketmq.streams.core.window.Time;
13+
import org.apache.rocketmq.streams.core.window.WindowBuilder;
14+
import org.apache.rocketmq.streams.examples.pojo.Order;
15+
16+
import com.alibaba.fastjson.JSON;
17+
18+
public class WindowOrderCount {
19+
public static void main(String[] args) {
20+
StreamBuilder builder = getOrder2();
21+
22+
TopologyBuilder topologyBuilder = builder.build();
23+
Properties properties = new Properties();
24+
properties.putIfAbsent(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");
25+
properties.put(StreamConfig.ALLOW_LATENESS_MILLISECOND, 2000);
26+
27+
28+
RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties);
29+
30+
Runtime.getRuntime().addShutdownHook(new Thread("ordercount-shutdown-hook") {
31+
@Override
32+
public void run() {
33+
rocketMQStream.stop();
34+
}
35+
});
36+
37+
rocketMQStream.start();
38+
}
39+
40+
private static StreamBuilder getOrder1() {
41+
/**
42+
* Get the count of drink/food orders in last 30 seconds every 10 seconds
43+
**/
44+
StreamBuilder builder = new StreamBuilder("windowOrderCount");
45+
46+
builder.source("order", source -> {
47+
Order order = JSON.parseObject(source, Order.class);
48+
System.out.println(order.toString());
49+
return new Pair<>(null, order);
50+
})
51+
.keyBy(Order::getType)
52+
.window(WindowBuilder.slidingWindow(Time.seconds(30), Time.seconds(10)))
53+
.count()
54+
.toRStream()
55+
.print();
56+
57+
return builder;
58+
}
59+
60+
private static StreamBuilder getOrder2() {
61+
/**
62+
* Get how much the customers pay for drink/food every 100 seconds
63+
**/
64+
StreamBuilder builder = new StreamBuilder("windowOrderCount");
65+
builder.source("order", source -> {
66+
Order order = JSON.parseObject(source, Order.class);
67+
System.out.println(order.toString());
68+
return new Pair<>(null, order);
69+
})
70+
.keyBy(new SelectAction<String, Order>() {
71+
@Override
72+
public String select(Order order) {
73+
return order.getCustomer() + "@" + order.getType();
74+
}
75+
})
76+
.window(WindowBuilder.tumblingWindow(Time.seconds(100)))
77+
.sum(Order::getPrice)
78+
.toRStream()
79+
.print();
80+
81+
return builder;
82+
}
83+
}

0 commit comments

Comments
 (0)