您现在的位置是:亿华云 > 应用开发
kafka+storm+hbase
亿华云2025-10-04 03:51:58【应用开发】0人已围观
简介kafka+storm+hbase实现计算WordCount。1)表名:wc2)列族:result3)RowKey:word4)Field:count1、解决:1)第一步:首先准备kafka、stor
kafka+storm+hbase实现计算WordCount。
(1)表名:wc
(2)列族:result
(3)RowKey:word
(4)Field:count
1、解决:
(1)第一步:首先准备kafka、storm和hbase相关jar包。依赖如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 <project xmlns= "http://maven.apache.org/POM/4.0.0" xmlns:xsi= "http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation= "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion> 4.0 . 0 </modelVersion> <groupId>com</groupId> <artifactId>kafkaSpout</artifactId> <version> 0.0 . 1 -SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version> 0.9 . 3 </version> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version> 0.9 . 3 </version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2. 10 </artifactId> <version> 0.8 . 1.1 </version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version> 0.99 . 2 </version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version> 2.5 . 0 </version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version> 2.5 . 0 </version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version> 1.7 </version> <scope>system</scope> <systemPath>C:\Program Files\Java\jdk1. 7 .0_51\lib\tools.jar</systemPath> </dependency> </dependencies> <repositories> <repository> <id>central</id> <url>http: //repo1.maven.org/maven2/</url> <snapshots> <enabled> false </enabled> </snapshots> <releases> <enabled> true </enabled> </releases> </repository> <repository> <id>clojars</id> <url>https: //clojars.org/repo/</url> <snapshots> <enabled> true </enabled> </snapshots> <releases> <enabled> true </enabled> </releases> </repository> <repository> <id>scala-tools</id> <url>http: //scala-tools.org/repo-releases</url> <snapshots> <enabled> true </enabled> </snapshots> <releases> <enabled> true </enabled> </releases> </repository> <repository> <id>conjars</id> <url>http: //conjars.org/repo/</url> <snapshots> <enabled> true </enabled> </snapshots> <releases> <enabled> true </enabled> </releases> </repository> </repositories> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version> 3.1 </version> <configuration> <source> 1.6 </source> <target> 1.6 </target> <encoding>UTF- 8 </encoding> <showDeprecation> true </showDeprecation> <showWarnings> true </showWarnings> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass></mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase> package </phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>(2)将kafka发来的数据通过levelSplit的bolt进行分割处理,然后再发送到下一个Bolt中。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 package com.kafka.spout; import java.util.regex.Matcher; import java.util.regex.Pattern; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class LevelSplit extends BaseBasicBolt { public void execute(Tuple tuple, BasicOutputCollector collector) { String words = tuple.getString( 0 ).toString(); //the cow jumped over the moon String []va=words.split( " " ); for (String word : va) { collector.emit( new Values(word)); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( new Fields( "word" )); } }(3)将levelSplit的Bolt发来的数据到levelCount的Bolt中进行计数处理,服务器租用然后发送到hbase(Bolt)中。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 package com.kafka.spout; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class LevelCount extends BaseBasicBolt { Map<String, Integer> counts = new HashMap<String, Integer>(); public void execute(Tuple tuple, BasicOutputCollector collector) { // TODO Auto-generated method stub String word = tuple.getString( 0 ); Integer count = counts.get(word); if (count == null ) count = 0 ; count++; counts.put(word, count); for (Entry<String, Integer> e : counts.entrySet()) { //sum += e.getValue(); System.out.println(e.getKey() + "----------->" +e.getValue()); } collector.emit( new Values(word, count)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare( new Fields( "word" , "count" )); } }(4) 准备连接kafka和hbase条件以及设置整个拓扑结构并且提交拓扑。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 package com.kafka.spout; import java.util.HashMap; import java.util.Map; import com.google.common.collect.Maps; //import org.apache.storm.guava.collect.Maps; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import backtype.storm.utils.Utils; import storm.kafka.BrokerHosts; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.ZkHosts; public class StormKafkaTopo { public static void main(String[] args) { BrokerHosts brokerHosts = new ZkHosts( "zeb,yjd,ylh" ); SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "yjd" , "/storm" , "kafkaspout" ); Config conf = new Config(); spoutConfig.scheme = new SchemeAsMultiScheme( new MessageScheme()); SimpleHBaseMapper mapper = new SimpleHBaseMapper(); mapper.withColumnFamily( "result" ); mapper.withColumnFields( new Fields( "count" )); mapper.withRowKeyField( "word" ); Map<String, Object> map = Maps.newTreeMap(); map.put( "hbase.rootdir" , "hdfs://zeb:9000/hbase" ); map.put( "hbase.zookeeper.quorum" , "zeb:2181,yjd:2181,ylh:2181" ); // hbase-bolt HBaseBolt hBaseBolt = new HBaseBolt( "wc" , mapper).withConfigKey( "hbase.conf" ); conf.setDebug( true ); conf.put( "hbase.conf" , map); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout( "spout" , new KafkaSpout(spoutConfig)); builder.setBolt( "split" , new LevelSplit(), 1 ).shuffleGrouping( "spout" ); builder.setBolt( "count" , new LevelCount(), 1 ).fieldsGrouping( "split" , new Fields( "word" )); builder.setBolt( "hbase" , hBaseBolt, 1 ).shuffleGrouping( "count" ); if (args != null && args.length > 0 ) { //提交到集群运行 try { StormSubmitter.submitTopology(args[ 0 ], conf, builder.createTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } } else { //本地模式运行 LocalCluster cluster = new LocalCluster(); cluster.submitTopology( "Topotest1121" , conf, builder.createTopology()); Utils.sleep( 1000000 ); cluster.killTopology( "Topotest1121" ); cluster.shutdown(); } } }(5)在kafka端用控制台生产数据,如下:
2、运行结果截图:
3、遇到的问题:
(1)把所有的工作做好后,提交了拓扑,运行代码。发生了错误1,亿华云如下:
解决:原来是因为依赖版本要统一的问题,最后将版本修改一致后,成功解决。
(2)发生了错误2,如下:
解决:原来是忘记开hbase中的HMaster和HRegionServer。启动后问题成功解决。
http://shenzhen.offcn.com/
服务器托管很赞哦!(58)