您现在的位置是:亿华云 > 应用开发
kafka+storm+hbase
亿华云2025-10-04 03:51:58【应用开发】0人已围观
简介kafka+storm+hbase实现计算WordCount。1)表名:wc2)列族:result3)RowKey:word4)Field:count1、解决:1)第一步:首先准备kafka、stor
kafka+storm+hbase实现计算WordCount。
(1)表名:wc
(2)列族:result
(3)RowKey:word
(4)Field:count
1、解决:
(1)第一步:首先准备kafka、storm和hbase相关jar包。依赖如下:
project xmlns= "http://maven.apache.org/POM/4.0.0" xmlns:xsi= "http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation= "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion> 4.0 . 0 </modelVersion> <groupId>com</groupId> <artifactId>kafkaSpout</artifactId> <version> 0.0 . 1 -SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version> 0.9 . 3 </version> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version> 0.9 . 3 </version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2. 10 </artifactId> <version> 0.8 . 1.1 </version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version> 0.99 . 2 </version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version> 2.5 . 0 </version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version> 2.5 . 0 </version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version> 1.7 </version> <scope>system</scope> <systemPath>C:\Program Files\Java\jdk1. 7 .0_51\lib\tools.jar</systemPath> </dependency> </dependencies> <repositories> <repository> <id>central</id> <url>http: //repo1.maven.org/maven2/</url> <snapshots> <enabled> false </enabled> </snapshots> <releases> <enabled> true </enabled> </releases> </repository> <repository> <id>clojars</id> <url>https: //clojars.org/repo/</url> <snapshots> <enabled> true </enabled> </snapshots> <releases> <enabled> true </enabled> </releases> </repository> <repository> <id>scala-tools</id> <url>http: //scala-tools.org/repo-releases</url> <snapshots> <enabled> true </enabled> </snapshots> <releases> <enabled> true </enabled> </releases> </repository> <repository> <id>conjars</id> <url>http: //conjars.org/repo/</url> <snapshots> <enabled> true </enabled> </snapshots> <releases> <enabled> true </enabled> </releases> </repository> </repositories> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version> 3.1 </version> <configuration> <source> 1.6 </source> <target> 1.6 </target> <encoding>UTF- 8 </encoding> <showDeprecation> true </showDeprecation> <showWarnings> true </showWarnings> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass></mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase> package </phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>(2)将kafka发来的数据通过levelSplit的bolt进行分割处理,然后再发送到下一个Bolt中。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 package com.kafka.spout; import java.util.regex.Matcher; import java.util.regex.Pattern; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class LevelSplit extends BaseBasicBolt { public void execute(Tuple tuple, BasicOutputCollector collector) { String words = tuple.getString( 0 ).toString(); //the cow jumped over the moon String []va=words.split( " " ); for (String word : va) { collector.emit( new Values(word)); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( new Fields( "word" )); } }(3)将levelSplit的Bolt发来的数据到levelCount的Bolt中进行计数处理,服务器租用然后发送到hbase(Bolt)中。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 package com.kafka.spout; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class LevelCount extends BaseBasicBolt { Map<String, Integer> counts = new HashMap<String, Integer>(); public void execute(Tuple tuple, BasicOutputCollector collector) { // TODO Auto-generated method stub String word = tuple.getString( 0 ); Integer count = counts.get(word); if (count == null ) count = 0 ; count++; counts.put(word, count); for (Entry<String, Integer> e : counts.entrySet()) { //sum += e.getValue(); System.out.println(e.getKey() + "----------->" +e.getValue()); } collector.emit( new Values(word, count)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare( new Fields( "word" , "count" )); } }(4) 准备连接kafka和hbase条件以及设置整个拓扑结构并且提交拓扑。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 package com.kafka.spout; import java.util.HashMap; import java.util.Map; import com.google.common.collect.Maps; //import org.apache.storm.guava.collect.Maps; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import backtype.storm.utils.Utils; import storm.kafka.BrokerHosts; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.ZkHosts; public class StormKafkaTopo { public static void main(String[] args) { BrokerHosts brokerHosts = new ZkHosts( "zeb,yjd,ylh" ); SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "yjd" , "/storm" , "kafkaspout" ); Config conf = new Config(); spoutConfig.scheme = new SchemeAsMultiScheme( new MessageScheme()); SimpleHBaseMapper mapper = new SimpleHBaseMapper(); mapper.withColumnFamily( "result" ); mapper.withColumnFields( new Fields( "count" )); mapper.withRowKeyField( "word" ); Map<String, Object> map = Maps.newTreeMap(); map.put( "hbase.rootdir" , "hdfs://zeb:9000/hbase" ); map.put( "hbase.zookeeper.quorum" , "zeb:2181,yjd:2181,ylh:2181" ); // hbase-bolt HBaseBolt hBaseBolt = new HBaseBolt( "wc" , mapper).withConfigKey( "hbase.conf" ); conf.setDebug( true ); conf.put( "hbase.conf" , map); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout( "spout" , new KafkaSpout(spoutConfig)); builder.setBolt( "split" , new LevelSplit(), 1 ).shuffleGrouping( "spout" ); builder.setBolt( "count" , new LevelCount(), 1 ).fieldsGrouping( "split" , new Fields( "word" )); builder.setBolt( "hbase" , hBaseBolt, 1 ).shuffleGrouping( "count" ); if (args != null && args.length > 0 ) { //提交到集群运行 try { StormSubmitter.submitTopology(args[ 0 ], conf, builder.createTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } } else { //本地模式运行 LocalCluster cluster = new LocalCluster(); cluster.submitTopology( "Topotest1121" , conf, builder.createTopology()); Utils.sleep( 1000000 ); cluster.killTopology( "Topotest1121" ); cluster.shutdown(); } } }(5)在kafka端用控制台生产数据,如下:
2、运行结果截图:
3、遇到的问题:
(1)把所有的工作做好后,提交了拓扑,运行代码。发生了错误1,亿华云如下:
解决:原来是因为依赖版本要统一的问题,最后将版本修改一致后,成功解决。
(2)发生了错误2,如下:
解决:原来是忘记开hbase中的HMaster和HRegionServer。启动后问题成功解决。
http://shenzhen.offcn.com/
服务器托管很赞哦!(58)