博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
storm集成kafka的应用,从kafka读取,写入kafka
阅读量:5954 次
发布时间:2019-06-19

本文共 12687 字,大约阅读时间需要 42 分钟。

storm集成kafka的应用,从kafka读取,写入kafka

                                                      by 小闪电

0前言

  storm的主要作用是进行流式的实时计算,对于一直产生的数据流处理是非常迅速的,然而大部分数据并不是均匀的数据流,而是时而多时而少。对于这种情况下进行批处理是不合适的,因此引入了kafka作为消息队列,与storm完美配合,这样可以实现稳定的流式计算。下面是一个简单的示例实现从kafka读取数据,并写入到kafka,以此来掌握storm与kafka之间的交互。

 

1程序框图

  实质上就是storm的kafkaspout作为一个consumer,kafkabolt作为一个producer。

  框图如下:

         

2 pom.xml

  建立一个maven项目,将storm,kafka,zookeeper的外部依赖叠加起来。

  

4.0.0
org.tony
storm-example
1.0-SNAPSHOT
org.apache.storm
storm-core
0.9.3
org.apache.storm
storm-kafka
0.9.3
com.google.protobuf
protobuf-java
2.5.0
org.apache.curator
curator-framework
2.5.0
log4j
log4j
org.slf4j
slf4j-log4j12
org.apache.kafka
kafka_2.10
0.8.1.1
org.apache.zookeeper
zookeeper
log4j
log4j
central
http://repo1.maven.org/maven2/
false
true
clojars
https://clojars.org/repo/
true
true
scala-tools
http://scala-tools.org/repo-releases
true
true
conjars
http://conjars.org/repo/
true
true
org.apache.maven.plugins
maven-compiler-plugin
3.1
1.6
1.6
UTF-8
true
true
maven-assembly-plugin
jar-with-dependencies
make-assembly
package
single

3 kafkaspout的消费逻辑,修改MessageScheme类,其中定义了俩个字段,key和message,方便分发到kafkabolt。代码如下

package com.tony.storm_kafka.util;import java.io.UnsupportedEncodingException;import java.util.List;import backtype.storm.spout.Scheme;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;/* *author: hi *public class MessageScheme{ } **/public class MessageScheme implements Scheme {    @Override    public List deserialize(byte[] arg0) {        try{         String msg = new String(arg0, "UTF-8");         String msg_0 = "hello";         return new Values(msg_0,msg);        }        catch (UnsupportedEncodingException  e) {            // TODO: handle exception            e.printStackTrace();        }        return null;    }    @Override    public Fields getOutputFields() {                return new Fields("key","message");    }    }

4.编写topology主类,配置kafka,提交topology到storm的代码,其中kafkaspout的zkhost有动态和静态俩种配置,尽量使用动态自寻的方式。

package org.tony.storm_kafka.common;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.StormSubmitter;import backtype.storm.generated.AlreadyAliveException;import backtype.storm.generated.InvalidTopologyException;import backtype.storm.generated.StormTopology;import backtype.storm.spout.SchemeAsMultiScheme;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.TopologyBuilder;import backtype.storm.topology.base.BaseBasicBolt;import backtype.storm.tuple.Tuple;import storm.kafka.BrokerHosts;import storm.kafka.KafkaSpout;import storm.kafka.SpoutConfig;import storm.kafka.ZkHosts;import storm.kafka.trident.TridentKafkaState;import java.util.Arrays;import java.util.Properties;import org.tony.storm_kafka.bolt.ToKafkaBolt;import com.tony.storm_kafka.util.MessageScheme;public class KafkaBoltTestTopology {        //配置kafka spout参数    public static String kafka_zk_port = null;    public static String topic = null;    public static String kafka_zk_rootpath = null;    public static BrokerHosts brokerHosts;    public static String spout_name = "spout";    public static String kafka_consume_from_start = null;        public static class PrinterBolt extends BaseBasicBolt {        /**         *          */            private static final long serialVersionUID = 9114512339402566580L;            //    @Override            public void declareOutputFields(OutputFieldsDeclarer declarer) {            }         //   @Override            public void execute(Tuple tuple, BasicOutputCollector collector) {                System.out.println("-----"+(tuple.getValue(1)).toString());            }        }            public StormTopology buildTopology(){        //kafkaspout 配置文件        kafka_consume_from_start = "true";        kafka_zk_rootpath = "/kafka08";        String spout_id = spout_name;        brokerHosts = new ZkHosts("192.168.201.190:2191,192.168.201.191:2191,192.168.201.192:2191", kafka_zk_rootpath+"/brokers");        kafka_zk_port = "2191";              SpoutConfig spoutConf = new SpoutConfig(brokerHosts, "testfromkafka", kafka_zk_rootpath, spout_id);        spoutConf.scheme = new SchemeAsMultiScheme(new MessageScheme());        spoutConf.zkPort = Integer.parseInt(kafka_zk_port);        spoutConf.zkRoot = kafka_zk_rootpath;        spoutConf.zkServers = Arrays.asList(new String[] {"10.9.201.190", "10.9.201.191", "10.9.201.192"});                //是否從kafka第一條數據開始讀取        if (kafka_consume_from_start == null) {            kafka_consume_from_start = "false";        }        boolean kafka_consume_frome_start_b = Boolean.valueOf(kafka_consume_from_start);        if (kafka_consume_frome_start_b != true && kafka_consume_frome_start_b != false) {            System.out.println("kafka_comsume_from_start must be true or false!");        }        System.out.println("kafka_consume_from_start: " + kafka_consume_frome_start_b);        spoutConf.forceFromStart=kafka_consume_frome_start_b;                        TopologyBuilder builder = new TopologyBuilder();        builder.setSpout("spout", new KafkaSpout(spoutConf));        builder.setBolt("forwardToKafka", new ToKafkaBolt
()).shuffleGrouping("spout"); return builder.createTopology(); } public static void main(String[] args) { KafkaBoltTestTopology kafkaBoltTestTopology = new KafkaBoltTestTopology(); StormTopology stormTopology = kafkaBoltTestTopology.buildTopology(); Config conf = new Config(); //设置kafka producer的配置 Properties props = new Properties(); props.put("metadata.broker.list", "192.10.43.150:9092"); props.put("producer.type","async"); props.put("request.required.acks", "0"); // 0 ,-1 ,1 props.put("serializer.class", "kafka.serializer.StringEncoder"); conf.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props); conf.put("topic","testTokafka"); if(args.length > 0){ // cluster submit. try { StormSubmitter.submitTopology("kafkaboltTest", conf, stormTopology); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } }else{ new LocalCluster().submitTopology("kafkaboltTest", conf, stormTopology); } }}

5 示例结果,testfromkafka topic里面的数据可以通过另外写个类来进行持续的生产。

  topic testfromkafka的数据

  topic testTokafka的数据

6 补充ToKfakaBolt,集成基础的Bolt类,主要改写Excute,同时加上Ack机制。

import java.util.Map;import java.util.Properties;import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;import storm.kafka.bolt.mapper.TupleToKafkaMapper;import storm.kafka.bolt.selector.KafkaTopicSelector;import storm.kafka.bolt.selector.DefaultTopicSelector;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Tuple;/* *author: yue  *public class ToKafkaBolt{ } **/public class ToKafkaBolt
extends BaseRichBolt{ private static final Logger Log = LoggerFactory.getLogger(ToKafkaBolt.class); public static final String TOPIC = "topic"; public static final String KAFKA_BROKER_PROPERTIES = "kafka.broker.properties"; private Producer
producer; private OutputCollector collector; private TupleToKafkaMapper
Mapper; private KafkaTopicSelector topicselector; public ToKafkaBolt
withTupleToKafkaMapper(TupleToKafkaMapper
mapper){ this.Mapper = mapper; return this; } public ToKafkaBolt
withTopicSelector(KafkaTopicSelector topicSelector){ this.topicselector = topicSelector; return this; } @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { if (Mapper == null) { this.Mapper = new FieldNameBasedTupleToKafkaMapper
(); } if (topicselector == null) { this.topicselector = new DefaultTopicSelector((String)stormConf.get(TOPIC)); } Map configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES); Properties properties = new Properties(); properties.putAll(configMap); ProducerConfig config = new ProducerConfig(properties); producer = new Producer
(config); this.collector = collector; } @Override public void execute(Tuple input) {// String iString = input.getString(0); K key = null; V message = null; String topic = null; try { key = Mapper.getKeyFromTuple(input); message = Mapper.getMessageFromTuple(input); topic = topicselector.getTopic(input); if (topic != null) { producer.send(new KeyedMessage
(topic,message)); }else { Log.warn("skipping key = "+key+ ",topic selector returned null."); } } catch ( Exception e) { // TODO: handle exception Log.error("Could not send message with key = " + key + " and value = " + message + " to topic = " + topic, e); }finally{ collector.ack(input); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } }

 


作 者: 

出处:
本文版权归作者和博客园共有,欢迎转载、交流,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文链接。如果觉得本文对您有益,欢迎点赞、欢迎探讨。本博客来源于互联网的资源,若侵犯到您的权利,请联系博主予以删除。


 

转载于:https://www.cnblogs.com/yueyanyu/p/5563873.html

你可能感兴趣的文章
Parallels Desktop12推出 新增Parallels Toolbox
查看>>
Python高效编程技巧
查看>>
Kafka服务端脚本详解(1)一topics
查看>>
js中var self=this的解释
查看>>
js--字符串reverse
查看>>
面试题
查看>>
Facebook 接入之获取各个配置参数
查看>>
android ant Compile failed; see the compiler error
查看>>
项目经理笔记一
查看>>
[原]Jenkins(三)---Jenkins初始配置和插件配置
查看>>
Cache Plugin 实现过程
查看>>
TCP服务器端口转发: netsh
查看>>
nginx实现rtmp,flv,mp4流媒体服务器
查看>>
46.tornado绑定域名或者子域名泛域名的处理
查看>>
文本过滤--sed 1
查看>>
PHP CURL并发,多线程
查看>>
ES 概念及动态索引结构和索引更新机制
查看>>
iOS 开发百问(2)
查看>>
MySQL for Mac 安装和基本操作(包含后期的环境变量设置)
查看>>
Linux及windows下常见压缩程序的压缩能力对比
查看>>