您现在的位置是:亿华云 > 应用开发

kafka+storm+hbase

亿华云2025-10-04 03:51:58【应用开发】0人已围观

简介kafka+storm+hbase实现计算WordCount。1)表名:wc2)列族:result3)RowKey:word4)Field:count1、解决:1)第一步:首先准备kafka、stor

kafka+storm+hbase实现计算WordCount。

(1)表名:wc

(2)列族:result

(3)RowKey:word

(4)Field:count

1、解决:

1)第一步:首先准备kafkastormhbase相关jar包。依赖如下

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 <project xmlns= "http://maven.apache.org/POM/4.0.0"  xmlns:xsi= "http://www.w3.org/2001/XMLSchema-instance"  xsi:schemaLocation= "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion> 4.0 . 0 </modelVersion> <groupId>com</groupId> <artifactId>kafkaSpout</artifactId> <version> 0.0 . 1 -SNAPSHOT</version>   <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version> 0.9 . 3 </version> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version> 0.9 . 3 </version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2. 10 </artifactId> <version> 0.8 . 1.1 </version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version> 0.99 . 2 </version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency>   <dependency>  <groupId>com.google.protobuf</groupId>  <artifactId>protobuf-java</artifactId>  <version> 2.5 . 0 </version>  </dependency>  <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version> 2.5 . 0 </version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency>                <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version> 1.7 </version> <scope>system</scope> <systemPath>C:\Program Files\Java\jdk1. 7 .0_51\lib\tools.jar</systemPath> </dependency>       </dependencies>   <repositories> <repository> <id>central</id> <url>http: //repo1.maven.org/maven2/</url> <snapshots> <enabled> false </enabled> </snapshots> <releases> <enabled> true </enabled> </releases> </repository> <repository> <id>clojars</id> <url>https: //clojars.org/repo/</url> <snapshots> <enabled> true </enabled> </snapshots> <releases> <enabled> true </enabled> </releases> </repository> <repository> <id>scala-tools</id> <url>http: //scala-tools.org/repo-releases</url> <snapshots> <enabled> true </enabled> </snapshots> <releases> <enabled> true </enabled> </releases> </repository> <repository> <id>conjars</id> <url>http: //conjars.org/repo/</url> <snapshots> <enabled> true </enabled> </snapshots> <releases> <enabled> true </enabled> </releases> </repository> </repositories>  <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version> 3.1 </version> <configuration> <source> 1.6 </source> <target> 1.6 </target> <encoding>UTF- 8 </encoding> <showDeprecation> true </showDeprecation> <showWarnings> true </showWarnings> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass></mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase> package </phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>

(2)kafka发来的数据通过levelSplitbolt进行分割处理,然后再发送到下一个Bolt中。代码如下:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 package  com.kafka.spout;  import  java.util.regex.Matcher; import  java.util.regex.Pattern; import  backtype.storm.topology.BasicOutputCollector; import  backtype.storm.topology.OutputFieldsDeclarer; import  backtype.storm.topology.base.BaseBasicBolt; import  backtype.storm.tuple.Fields; import  backtype.storm.tuple.Tuple; import  backtype.storm.tuple.Values;   public  class  LevelSplit  extends  BaseBasicBolt {    public  void  execute(Tuple tuple, BasicOutputCollector collector) { String words = tuple.getString( 0 ).toString(); //the cow jumped over the moon String []va=words.split( " " ); for (String word : va) { collector.emit( new  Values(word)); }   }    public  void  declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( new  Fields( "word" )); }  }

(3)将levelSplit的Bolt发来的数据到levelCount的Bolt中进行计数处理,服务器租用然后发送到hbase(Bolt)中。代码如下:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 package  com.kafka.spout;  import  java.util.HashMap; import  java.util.Map; import  java.util.Map.Entry;  import  backtype.storm.topology.BasicOutputCollector; import  backtype.storm.topology.OutputFieldsDeclarer; import  backtype.storm.topology.base.BaseBasicBolt; import  backtype.storm.tuple.Fields; import  backtype.storm.tuple.Tuple; import  backtype.storm.tuple.Values;   public  class  LevelCount  extends  BaseBasicBolt { Map<String, Integer> counts =  new  HashMap<String, Integer>();  public  void  execute(Tuple tuple, BasicOutputCollector collector) { // TODO Auto-generated method stub String word = tuple.getString( 0 ); Integer count = counts.get(word); if  (count ==  null ) count =  0 ; count++; counts.put(word, count);  for  (Entry<String, Integer> e : counts.entrySet()) { //sum += e.getValue(); System.out.println(e.getKey() +  "----------->"  +e.getValue()); } collector.emit( new  Values(word, count));      }  public  void  declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare( new  Fields( "word" ,  "count" )); } }

(4) 准备连接kafkahbase条件以及设置整个拓扑结构并且提交拓扑。代码如下:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 package  com.kafka.spout;   import  java.util.HashMap; import  java.util.Map;  import  com.google.common.collect.Maps;  //import org.apache.storm.guava.collect.Maps;   import  backtype.storm.Config; import  backtype.storm.LocalCluster; import  backtype.storm.StormSubmitter; import  backtype.storm.generated.AlreadyAliveException; import  backtype.storm.generated.InvalidTopologyException; import  backtype.storm.spout.SchemeAsMultiScheme; import  backtype.storm.topology.TopologyBuilder; import  backtype.storm.tuple.Fields; import  backtype.storm.utils.Utils; import  storm.kafka.BrokerHosts; import  storm.kafka.KafkaSpout; import  storm.kafka.SpoutConfig; import  storm.kafka.ZkHosts;    public  class  StormKafkaTopo { public  static  void  main(String[] args) {     BrokerHosts brokerHosts =  new  ZkHosts( "zeb,yjd,ylh" ); SpoutConfig spoutConfig =  new  SpoutConfig(brokerHosts,  "yjd" ,  "/storm" ,  "kafkaspout" ); Config conf =  new  Config();   spoutConfig.scheme =   new  SchemeAsMultiScheme( new  MessageScheme());      SimpleHBaseMapper mapper =  new  SimpleHBaseMapper(); mapper.withColumnFamily( "result" ); mapper.withColumnFields( new  Fields( "count" )); mapper.withRowKeyField( "word" );   Map<String, Object> map = Maps.newTreeMap(); map.put( "hbase.rootdir" ,  "hdfs://zeb:9000/hbase" ); map.put( "hbase.zookeeper.quorum" ,  "zeb:2181,yjd:2181,ylh:2181" );   // hbase-bolt HBaseBolt hBaseBolt =  new  HBaseBolt( "wc" , mapper).withConfigKey( "hbase.conf" );  conf.setDebug( true ); conf.put( "hbase.conf" , map);    TopologyBuilder builder =  new  TopologyBuilder(); builder.setSpout( "spout" ,  new  KafkaSpout(spoutConfig)); builder.setBolt( "split" ,  new  LevelSplit(),  1 ).shuffleGrouping( "spout" ); builder.setBolt( "count" ,  new  LevelCount(),  1 ).fieldsGrouping( "split" ,  new  Fields( "word" )); builder.setBolt( "hbase" , hBaseBolt,  1 ).shuffleGrouping( "count" );   if (args !=  null  && args.length >  0 ) { //提交到集群运行 try  { StormSubmitter.submitTopology(args[ 0 ], conf, builder.createTopology()); }  catch  (AlreadyAliveException e) { e.printStackTrace(); }  catch  (InvalidTopologyException e) { e.printStackTrace(); } }  else  { //本地模式运行 LocalCluster cluster =  new  LocalCluster(); cluster.submitTopology( "Topotest1121" , conf, builder.createTopology()); Utils.sleep( 1000000 ); cluster.killTopology( "Topotest1121" ); cluster.shutdown(); }           } }

(5)在kafka端用控制台生产数据,如下:

2、运行结果截图:

 

3、遇到的问题:

(1)把所有的工作做好后,提交了拓扑,运行代码。发生了错误1亿华云如下:

 

解决:原来是因为依赖版本要统一的问题,最后将版本修改一致后,成功解决。

(2)发生了错误2,如下:

 

解决:原来是忘记开hbase中的HMaster和HRegionServer。启动后问题成功解决。

http://shenzhen.offcn.com/

服务器托管

很赞哦!(58)