本地模式运行storm的demo
本例实现的是本地模式运行storm的wordcount demo!
开发过程中,可以用本地模式来运行Storm,这样就能在本地开发,在进程中测试Topology。一切就绪后,以远程模式运行 Storm,提交用于在集群中运行的Topology。
创建工程:demo-storm
目录结构如下:
demo-storm
——src/main/java
————com.youku.demo
————————bolts
————————spouts
——src/test/java
——src/main/resource
————words.txt
WordCounter.java:
package com.youku.demo.bolts;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
public class WordCounter extends BaseBasicBolt {
Integer id;
String name;
Map<String, Integer> counters;
/**
* At the end of the spout (when the cluster is shutdown
* We will show the word counters
*/
@Override
public void cleanup() {
System.out.println("-- Word Counter ["+name+"-"+id+"] --");
for(Map.Entry<String, Integer> entry : counters.entrySet()){
System.out.println(entry.getKey()+": "+entry.getValue());
}
}
/**
* On create
*/
@Override
public void prepare(Map stormConf, TopologyContext context) {
this.counters = new HashMap<String, Integer>();
this.name = context.getThisComponentId();
this.id = context.getThisTaskId();
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {}
public void execute(Tuple input, BasicOutputCollector collector) {
String str = input.getString(0);
/**
* If the word dosn't exist in the map we will create
* this, if not We will add 1
*/
if(!counters.containsKey(str)){
counters.put(str, 1);
}else{
Integer c = counters.get(str) + 1;
counters.put(str, c);
}
}
}
WordNormalizer.java:
package com.youku.demo.bolts;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class WordNormalizer extends BaseBasicBolt {
public void cleanup() {}
/**
* The bolt will receive the line from the
* words file and process it to Normalize this line
*
* The normalize will be put the words in lower case
* and split the line to get all words in this
*/
public void execute(Tuple input, BasicOutputCollector collector) {
String sentence = input.getString(0);
String[] words = sentence.split(" ");
for(String word : words){
word = word.trim();
if(!word.isEmpty()){
word = word.toLowerCase();
collector.emit(new Values(word));
}
}
}
/**
* The bolt will only emit the field "word"
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
WordReader.java:
package com.youku.demo.spouts;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class WordReader extends BaseRichSpout {
private SpoutOutputCollector collector;
private FileReader fileReader;
private boolean completed = false;
public void ack(Object msgId) {
System.out.println("OK:"+msgId);
}
public void close() {}
public void fail(Object msgId) {
System.out.println("FAIL:"+msgId);
}
/**
* The only thing that the methods will do It is emit each
* file line
*/
public void nextTuple() {
/**
* The nextuple it is called forever, so if we have been readed the file
* we will wait and then return
*/
if(completed){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
//Do nothing
}
return;
}
String str;
//Open the reader
BufferedReader reader = new BufferedReader(fileReader);
try{
//Read all lines
while((str = reader.readLine()) != null){
/**
* By each line emmit a new value with the line as a their
*/
this.collector.emit(new Values(str),str);
}
}catch(Exception e){
throw new RuntimeException("Error reading tuple",e);
}finally{
completed = true;
}
}
/**
* We will create the file and get the collector object
*/
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
try {
this.fileReader = new FileReader(conf.get("wordsFile").toString());
} catch (FileNotFoundException e) {
throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
}
this.collector = collector;
}
/**
* Declare the output field "word"
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line"));
}
}
TopologyMain.java:
package com.youku.demo;
import com.youku.demo.bolts.WordCounter;
import com.youku.demo.bolts.WordNormalizer;
import com.youku.demo.spouts.WordReader;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
public class TopologyMain {
public static void main(String[] args) throws InterruptedException {
//Topology definition
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word-reader",new WordReader());
builder.setBolt("word-normalizer", new WordNormalizer())
.shuffleGrouping("word-reader");
builder.setBolt("word-counter", new WordCounter(),1)
.fieldsGrouping("word-normalizer", new Fields("word"));
//Configuration
Config conf = new Config();
conf.put("wordsFile", args[0]);
conf.setDebug(true);
//Topology run
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
Thread.sleep(2000);
cluster.shutdown();
}
}
pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.youku.demo</groupId> <artifactId>demo-storm</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>demo-storm</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.6</source> <target>1.6</target> <compilerVersion>1.6</compilerVersion> </configuration> </plugin> </plugins> </build> <repositories> <!-- Repository where we can found the storm dependencies --> <repository> <id>clojars.org</id> <url>http://clojars.org/repo</url> </repository> </repositories> <dependencies> <!-- Storm Dependency --> <dependency> <groupId>storm</groupId> <artifactId>storm</artifactId> <version>0.8.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> </dependencies> </project>
words.txt:
storm test are great is an storm simple application but very powerfull really StOrm is great
运行的时候需要配置参数:src/main/resources/words.txt 指定输入文件
日志输出:
会报好多zookeeper异常,还有最后的日志文件无法删除的异常,目前忽略了,O(∩_∩)O呵呵~
271 [main-SendThread(localhost:2000)] WARN org.apache.zookeeper.ClientCnxn - Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect java.net.SocketException: Address family not supported by protocol family: connect at sun.nio.ch.Net.connect(Native Method) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:507) at org.apache.zookeeper.ClientCnxn$SendThread.startConnect(ClientCnxn.java:1050) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1077) java.io.IOException: Unable to delete file: C:UsersThinkPadAppDataLocalTemp3fbb080f-e585-42e6-8b1b-d6ae024503acversion-2log.1 at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:1390) at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1044) at org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:977) at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:1381) at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1044) at org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:977) at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:1381) at backtype.storm.util$rmr.invoke(util.clj:413) at backtype.storm.testing$kill_local_storm_cluster.invoke(testing.clj:163) at backtype.storm.LocalCluster$_shutdown.invoke(LocalCluster.clj:21) at backtype.storm.LocalCluster.shutdown(Unknown Source) at com.youku.demo.TopologyMain.main(TopologyMain.java:33)


hi,博主可以提供下工程的打包吗?非常感谢:)
新建maven工程,然后按照我的工程结构,把代码拷到相应的目录下就好了,依赖maven,不需要多余的jar包
hi,博主你的例子应该是和这个差不多对吗?https://github.com/storm-book/examples-ch02-getting_started/zipball/master,本地就当作Java程序运行,但是我调了好几次没有调通,所以才想看下博主的工程,如果麻烦就算了吧。。
明天发给你吧,今天没带电脑
没有使用maven模式搞定程序。。。
变相搞定了。。没有使用maven来管理,多谢回复:)
发给你用maven管理的了,maven更简单的
执行了你的demo报java.lang.ClassNotFoundException: org.apache.zookeeper.server.NIOServerCnxn$Factory 我看是zookeeper里面的类,但是我已经依赖了,不知道大家遇到过没
按道理应该报我文章指出的错误才是,这里面应该是连不上zookeeper集群的错,而不是找不到类,如果能出现,日志结果,可以忽略掉!
可能是我本地配置storm时出错了,多谢回复,我再看看
另外再问个问题,当一个bolt有两个源的时候有样例么?从网上看到有那样的拓扑图,一个bolt的源为两个bolt
应该是可以的吧 我也见过你说的那张图,应该是通过grouping机制来实现的,不过现在没时间深入研究了
文章中提到的异常是storm的bug,在0.8.2的版本中还没有修复,是在topology.shutdown的时候报出来的