Storm学习笔记(1)Hello WordCount - 单机模式

2018-06-18 03:15:31来源:未知 阅读 ()

新老客户大回馈,云服务器低至5折

古人云,纸上得来终觉浅,绝知此事要躬行。翻译过来,就是学东西哪有不踩坑的。

因为工作原因要折腾Storm,环境和第一个例子折腾了好久,搞完了回头看,吐血的简单。

 

Storm有两种模式,单机和集群。入门当然选单机。

1、安装JDK,配置Eclipse环境

2、建立一个Maven工程,在pom.xml加上这段

<dependency>
     <groupId>org.apache.storm</groupId>
     <artifactId>storm-core</artifactId>
     <version>1.1.2</version>
     <scope>compile</scope>
</dependency>

3、通过Maven建立项目和下载依赖包。

其实,所需要的storm-core-1.1.2.jar可以从官网下载的storm包里面的lib目录中找到。

Java在下不熟悉,也就不多说了。

4、参考官方或者各种教程的word-count例子编个代码。

5、在Eclipse里面run起来就可以了。

什么Storm, Zookeeper,其实在这个单机入门例子里面,都是不需要的!

就这么简单。

 

具体代码来说,官方提供的storm-starter例子中,WordCountTopology.java挺适合入门的。

只是里面有个坑:

官方采用了python作为句子分割blot的实现,但是如果环境不具备的话,一跑就会出错。

就是这段:

public static class SplitSentence extends ShellBolt implements IRichBolt {

  public SplitSentence() {
     super("python", "splitsentence.py");
   }

// 其余部分略

 

可以用这个类来替代:

 1 public static class SplitSentence extends BaseBasicBolt{  
 2      @Override  
 3      public void execute(Tuple tuple, BasicOutputCollector collector){  
 4          // 接收到一个句子  
 5          String sentence = tuple.getString(0);  
 6          // 把句子切割为单词  
 7          StringTokenizer iter = new StringTokenizer(sentence);  
 8          // 发送每一个单词  
 9          while(iter.hasMoreElements()){  
10              collector.emit(new Values(iter.nextToken()));  
11          }  
12      }  
13        
14      @Override  
15      public void declareOutputFields(OutputFieldsDeclarer declarer){  
16          // 定义一个字段  
17          declarer.declare(new Fields("word"));  
18      }  
19      
20      @Override
21      public Map<String, Object> getComponentConfiguration() {
22        return null;
23      }
24 } 
View Code

 

Run起来以后,在Eclipse的Console窗口里面可以看到运行的详情。

 

完整代码如下:

  1 package storm.blueprints;
  2 
  3 import org.apache.storm.spout.SpoutOutputCollector;
  4 import org.apache.storm.task.TopologyContext;
  5 import org.apache.storm.topology.OutputFieldsDeclarer;
  6 import org.apache.storm.topology.base.BaseRichSpout;
  7 import org.apache.storm.tuple.Fields;
  8 import org.apache.storm.tuple.Values;
  9 
 10 import org.apache.storm.utils.Utils;
 11 import org.slf4j.Logger;
 12 import org.slf4j.LoggerFactory;
 13 
 14 import org.apache.storm.Config;  
 15 import org.apache.storm.LocalCluster;  
 16 import org.apache.storm.StormSubmitter;  
 17 import org.apache.storm.task.ShellBolt;  
 18    
 19 import org.apache.storm.topology.BasicOutputCollector;  
 20 import org.apache.storm.topology.IRichBolt;  
 21 import org.apache.storm.topology.TopologyBuilder;  
 22 import org.apache.storm.topology.base.BaseBasicBolt;  
 23    
 24 import org.apache.storm.tuple.Tuple;  
 25 import java.util.HashMap;
 26 import java.util.Map;
 27 
 28 
 29 import java.util.*;
 30 
 31 public class HelloWordCount 
 32 {
 33      public static class RandomSentenceSpout extends BaseRichSpout {
 34            private static final Logger LOG = LoggerFactory.getLogger(RandomSentenceSpout.class);
 35 
 36           SpoutOutputCollector _collector;
 37            Random _rand;
 38 
 39 
 40            @Override
 41            public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
 42              _collector = collector;
 43              _rand = new Random();
 44            }
 45 
 46           @Override
 47            public void nextTuple() {
 48              Utils.waitForMillis(100);//(100);
 49              String[] sentences = new String[]{
 50                      sentence("the cow jumped over the moon"),
 51                      sentence("an apple a day keeps the doctor away"),
 52                      sentence("four score and seven years ago"),
 53                      sentence("snow white and the seven dwarfs"),
 54                      sentence("i am at two with nature")};
 55              final String sentence = sentences[_rand.nextInt(sentences.length)];
 56 
 57             LOG.debug("Emitting tuple: {}", sentence);
 58 
 59             _collector.emit(new Values(sentence));
 60              
 61              System.out.println("***" + sentence);
 62            }
 63 
 64           protected String sentence(String input) {
 65              return input;
 66            }
 67 
 68           @Override
 69            public void ack(Object id) {
 70            }
 71 
 72           @Override
 73            public void fail(Object id) {
 74            }
 75 
 76           @Override
 77            public void declareOutputFields(OutputFieldsDeclarer declarer) {
 78              declarer.declare(new Fields("sentence"));
 79            }
 80      }
 81      
 82        
 83      // 定义个Bolt,用于将句子切分为单词  
 84      public static class SplitSentence extends BaseBasicBolt{  
 85          @Override  
 86          public void execute(Tuple tuple, BasicOutputCollector collector){  
 87              // 接收到一个句子  
 88              String sentence = tuple.getString(0);  
 89              // 把句子切割为单词  
 90              StringTokenizer iter = new StringTokenizer(sentence);  
 91              // 发送每一个单词  
 92              while(iter.hasMoreElements()){  
 93                  collector.emit(new Values(iter.nextToken()));  
 94              }  
 95          }  
 96            
 97          @Override  
 98          public void declareOutputFields(OutputFieldsDeclarer declarer){  
 99              // 定义一个字段  
100              declarer.declare(new Fields("word"));  
101          }  
102          
103          @Override
104          public Map<String, Object> getComponentConfiguration() {
105            return null;
106          }
107      }  
108        
109      // 定义一个Bolt,用于单词计数  
110      public static class WordCount extends BaseBasicBolt {  
111          Map<String, Integer> counts = new HashMap<String, Integer>();  
112            
113          @Override  
114          public void execute(Tuple tuple, BasicOutputCollector collector){  
115              String word = tuple.getString(0);
116              Integer count = counts.get(word);
117              if (count == null)
118                count = 0;
119              count++;
120              counts.put(word, count);
121              
122              System.out.println(word +"  "+count);
123          }  
124            
125          @Override  
126          public void declareOutputFields(OutputFieldsDeclarer declarer){  
127              // 定义两个字段word和count  
128              declarer.declare(new Fields("word","count"));  
129          }  
130      }  
131      public static void main(String[] args) throws Exception   
132      {  
133          System.out.println("main");
134          // 创建一个拓扑  
135          TopologyBuilder builder = new TopologyBuilder();  
136          // 设置Spout,这个Spout的名字叫做"Spout",设置并行度为5  
137          builder.setSpout("Spout", new RandomSentenceSpout(), 5);  
138          // 设置slot——“split”,并行度为8,它的数据来源是spout的  
139          builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("Spout");  
140          // 设置slot——“count”,你并行度为12,它的数据来源是split的word字段  
141          builder.setBolt("count", new WordCount(), 12).globalGrouping("split");//, new Fields("word"));  
142            
143          Config conf = new Config();  
144                
145              // 本地集群  
146              LocalCluster cluster = new LocalCluster();  
147                
148              System.out.println("LocalCluster");
149              
150              // 提交拓扑(该拓扑的名字叫word-count)  
151              cluster.submitTopology("word-count", conf, builder.createTopology() );  
152                
153              System.out.println("submitTopology");           
154              
155              Utils.waitForSeconds(10);
156              cluster.killTopology("word-count");
157              cluster.shutdown();
158              }  
159      }  
160      
161      public static class Utils {
162 
163         public static void waitForSeconds(int seconds) {
164              try {
165                  Thread.sleep(seconds * 1000);
166              } catch (InterruptedException e) {
167              }
168          }
169 
170         public static void waitForMillis(long milliseconds) {
171              try {
172                  Thread.sleep(milliseconds);
173              } catch (InterruptedException e) {
174              }
175          }
176      }
177 }
View Code

 

请使用手机"扫一扫"x

标签:

版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有

上一篇:撸一撸Spring Cloud Ribbon的原理

下一篇:[Java] File类 递归 获取目录下所有文件/文件夹