This repository has been archived by the owner on Jun 16, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Application examples
longdafeng edited this page Oct 21, 2014
·
6 revisions
This page helps readers to quickly implement a JStorm example.
The simplest JStorm example is divided into four steps:
Map conf = new HashMap();
//all custom configurations of topology are placed in the Map
TopologyBuilder builder = new TopologyBuilder();
//create topology builder
int spoutParal = get("spout.parallel", 1);
//get spout concurrency settings
SpoutDeclarer spout = builder.setSpout(SequenceTopologyDef.SEQUENCE_SPOUT_NAME,
new SequenceSpout(), spoutParal);
//create Spout, new SequenceSpout() is spout object,
//SequenceTopologyDef.SEQUENCE_SPOUT_NAME is spout name, note that name do not contain space
int boltParal = get("bolt.parallel", 1);
//get bolt concurrency settings
BoltDeclarer totalBolt = builder.setBolt(SequenceTopologyDef.TOTAL_BOLT_NAME, new TotalCount(),
boltParal).shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
//create bolt, SequenceTopologyDef.TOTAL_BOLT_NAME is bolt name,
// TotalCount is bolt object,boltParal is the number of bolt concurrent,
//shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME),
//receive the data of SequenceTopologyDef.SEQUENCE_SPOUT_NAME by shuffle,
//That is, each spout random polling send tuple to the next bolt
int ackerParal = get("acker.parallel", 1);
Config.setNumAckers(conf, ackerParal);
//Set the The number of acker concurrent
int workerNum = get("worker.num", 10);
conf.put(Config.TOPOLOGY_WORKERS, workerNum);
//indicates the number of worker topology to be used
conf.put(Config.STORM_CLUSTER_MODE, "distributed");
//set topology model for distributed,this topology can put runs on a JStorm cluster
StormSubmitter.submitTopology(streamName, conf, builder.createTopology());
//submit topology
IRichSpout is the easiest Spout Interface
IRichSpout{
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
}
@Override
public void close() {
}
@Override
public void activate() {
}
@Override
public void deactivate() {
}
@Override
public void nextTuple() {
}
@Override
public void ack(Object msgId) {
}
@Override
public void fail(Object msgId) {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
Note:
- Spout object must implement Serializable interface, requiring all the fields within the spout must be serializable, if there are some fileds which are not Serialable, please add "transient" attribute.
- Spout can have a constructor function, but the constructor will be executed only once when the topology is being submitted; At this time the spout object has been created; Therefore, some initialization can be done here before task is assigned to a worker, once constructor has completed, the contents of the initialization will be brought to each task (because when you submit a topology, spout object will be serialized into a file, then the file will be delivered to supervisors, at last spout object will be deserialized from the file when worker starts).
- open action will be called when the worker containing the task has been started.
- close action will be called when the task begin to shutdown.
- activate action will be triggered after the topology is activated.
- deactivate action will be triggered after the topology is deactivated.
- nextTuple is the core function of a spout, all data had better be emitted in this function.
- ack action will be triggered after the message has been done in the whole topology, please refer to Acking Framework for details.
- fail action will be triggered after the message fail to be done or timeout, please refer to Acking Framework for details.
- declareOutputFields defines the meaning of each field of tuple emitted by spout.
- getComponentConfiguration is the interface of component configuration for spout, spout can own himself configuration with the interface.
IRichBolt {
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
}
@Override
public void execute(Tuple input) {
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
Note:
- Bolt object must implement Serializable interface, requiring all the fields within the bolt must be serializable, if there are some fileds which are not Serialable, please add "transient" attribute.
- Bolt can have a constructor function, but the constructor will be called only once when the topology is being submitted; At this time the bolt object has been created; Therefore, some initialization can be done here before task is assigned to a worker, once constructor has finished, the contents of the initialization will be brought to each task (because when you submit a topology, bolt object will be serialized into a file, then the file will be delivered to supervisors, at last bolt object will be deserialized from the file when worker starts).
- prepare action does the initialization action when the worker containing the task has been started.
- cleanup action will be called when the task begins to shutdown.
- execute is the core function of a blot. In the function, when bolt process a message and emit a message if needed, bolt need to call collector.ack, when bolt is unable to process a message or have an error, you need to call collector.fail, please refer to Acking Framework for details.
- declareOutputFields defines the meaning of each field which emitted by bolt.
- getComponentConfiguration is the interface of component configuration for bolt, bolt can own himself configuration with the interface.
Configuration in Maven pom.xml
<dependency>
<groupId>com.alibaba.jstorm</groupId>
<artifactId>jstorm-client</artifactId>
<version>0.9.6.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.jstorm</groupId>
<artifactId>jstorm-client-extension</artifactId>
<version>0.9.6.0</version>
<scope>provided</scope>
</dependency>
If you can not find jstorm-client and jstorm-client-extension package, you can download the source code of JStorm to compile, please refer to the Build-JStorm
When compile, you need package all dependent jars to one package.
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>storm.starter.SequenceTopology</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
</plugins>
</build>
jstorm jar xxxxxx.jar com.xxxx.xxxx.xx parameter
- xxxx.jar is packaged jar.
- com.xxx.xxxx.xx is the entry of the jar.
- parameter is the arguments of topology.