现在的位置: 首页 > 综合 > 正文

kafka开发

2018年03月21日 ⁄ 综合 ⁄ 共 6848字 ⁄ 字号 评论关闭
上篇文章中我们搭建了kafka的服务器,并可以使用Kafka的命令行工具创建topic,发送和接收消息。下面我们来搭建kafka的开发环境。
添加依赖

搭建开发环境需要引入kafka的jar包,一种方式是将Kafka安装包中lib下的jar包加入到项目的classpath中,这种比较简单了。不过我们使用另一种更加流行的方式:使用maven管理jar包依赖。
创建好maven项目后,在pom.xml中添加以下依赖:
               

[html] view
plain
copy

  1. <dependency>  
  2.          <groupId> org.apache.kafka</groupId >  
  3.          <artifactId> kafka_2.10</artifactId >  
  4.          <version> 0.8.0</ version>  
  5. </dependency>  

添加依赖后你会发现有两个jar包的依赖找不到。没关系我都帮你想好了,点击这里下载这两个jar包,解压后你有两种选择,第一种是使用mvn的install命令将jar包安装到本地仓库,另一种是直接将解压后的文件夹拷贝到mvn本地仓库的com文件夹下,比如我的本地仓库是d:\mvn,完成后我的目录结构是这样的:
目录结构
配置程序

首先是一个充当配置文件作用的接口,配置了Kafka的各种连接参数:
[java] view
plain
copy

  1. package com.sohu.kafkademon;  
  2.   
  3. public interface KafkaProperties  
  4. {  
  5.     final static String zkConnect = "10.22.10.139:2181";  
  6.     final static String groupId = "group1";  
  7.     final static String topic = "topic1";  
  8.     final static String kafkaServerURL = "10.22.10.139";  
  9.     final static int kafkaServerPort = 9092;  
  10.     final static int kafkaProducerBufferSize = 64 * 1024;  
  11.     final static int connectionTimeOut = 20000;  
  12.     final static int reconnectInterval = 10000;  
  13.     final static String topic2 = "topic2";  
  14.     final static String topic3 = "topic3";  
  15.     final static String clientId = "SimpleConsumerDemoClient";  
  16. }  
producer

[java] view
plain
copy

  1. package com.sohu.kafkademon;  
  2.   
  3. import java.util.Properties;  
  4.   
  5. import kafka.producer.KeyedMessage;  
  6. import kafka.producer.ProducerConfig;  
  7.   
  8. /** 
  9.  * @author leicui bourne_cui@163.com 
  10.  */  
  11. public class KafkaProducer extends Thread  
  12. {  
  13.     private final kafka.javaapi.producer.Producer<Integer, String> producer;  
  14.     private final String topic;  
  15.     private final Properties props = new Properties();  
  16.   
  17.     public KafkaProducer(String topic)  
  18.     {  
  19.         props.put("serializer.class""kafka.serializer.StringEncoder");  
  20.         props.put("metadata.broker.list""10.22.10.139:9092");  
  21.         producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));  
  22.         this.topic = topic;  
  23.     }  
  24.   
  25.     @Override  
  26.     public void run() {  
  27.         int messageNo = 1;  
  28.         while (true)  
  29.         {  
  30.             String messageStr = new String("Message_" + messageNo);  
  31.             System.out.println("Send:" + messageStr);  
  32.             producer.send(new KeyedMessage<Integer, String>(topic, messageStr));  
  33.             messageNo++;  
  34.             try {  
  35.                 sleep(3000);  
  36.             } catch (InterruptedException e) {  
  37.                 // TODO Auto-generated catch block  
  38.                 e.printStackTrace();  
  39.             }  
  40.         }  
  41.     }  
  42.   
  43. }  

consumer

[java] view
plain
copy

  1. package com.sohu.kafkademon;  
  2.   
  3. import java.util.HashMap;  
  4. import java.util.List;  
  5. import java.util.Map;  
  6. import java.util.Properties;  
  7.   
  8. import kafka.consumer.ConsumerConfig;  
  9. import kafka.consumer.ConsumerIterator;  
  10. import kafka.consumer.KafkaStream;  
  11. import kafka.javaapi.consumer.ConsumerConnector;  
  12.   
  13. /** 
  14.  * @author leicui bourne_cui@163.com 
  15.  */  
  16. public class KafkaConsumer extends Thread  
  17. {  
  18.     private final ConsumerConnector consumer;  
  19.     private final String topic;  
  20.   
  21.     public KafkaConsumer(String topic)  
  22.     {  
  23.         consumer = kafka.consumer.Consumer.createJavaConsumerConnector(  
  24.                 createConsumerConfig());  
  25.         this.topic = topic;  
  26.     }  
  27.   
  28.     private static ConsumerConfig createConsumerConfig()  
  29.     {  
  30.         Properties props = new Properties();  
  31.         props.put("zookeeper.connect", KafkaProperties.zkConnect);  
  32.         props.put("group.id", KafkaProperties.groupId);  
  33.         props.put("zookeeper.session.timeout.ms""40000");  
  34.         props.put("zookeeper.sync.time.ms""200");  
  35.         props.put("auto.commit.interval.ms""1000");  
  36.         return new ConsumerConfig(props);  
  37.     }  
  38.   
  39.     @Override  
  40.     public void run() {  
  41.         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
  42.         topicCountMap.put(topic, new Integer(1));  
  43.         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);  
  44.         KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);  
  45.         ConsumerIterator<byte[], byte[]> it = stream.iterator();  
  46.         while (it.hasNext()) {  
  47.             System.out.println("receive:" + new String(it.next().message()));  
  48.             try {  
  49.                 sleep(3000);  
  50.             } catch (InterruptedException e) {  
  51.                 e.printStackTrace();  
  52.             }  
  53.         }  
  54.     }  
  55. }  
简单的发送接收

运行下面这个程序,就可以进行简单的发送接收消息了:

[java] view
plain
copy

  1. package com.sohu.kafkademon;  
  2.   
  3. /** 
  4.  * @author leicui bourne_cui@163.com 
  5.  */  
  6. public class KafkaConsumerProducerDemo  
  7. {  
  8.     public static void main(String[] args)  
  9.     {  
  10.         KafkaProducer producerThread = new KafkaProducer(KafkaProperties.topic);  
  11.         producerThread.start();  
  12.   
  13.         KafkaConsumer consumerThread = new KafkaConsumer(KafkaProperties.topic);  
  14.         consumerThread.start();  
  15.     }  
  16. }  
高级别的consumer

下面是比较负载的发送接收的程序:
[java] view
plain
copy

  1. package com.sohu.kafkademon;  
  2.   
  3. import java.util.HashMap;  
  4. import java.util.List;  
  5. import java.util.Map;  
  6. import java.util.Properties;  
  7.   
  8. import kafka.consumer.ConsumerConfig;  
  9. import kafka.consumer.ConsumerIterator;  
  10. import kafka.consumer.KafkaStream;  
  11. import kafka.javaapi.consumer.ConsumerConnector;  
  12.   
  13. /** 
  14.  * @author leicui bourne_cui@163.com 
  15.  */  
  16. public class KafkaConsumer extends Thread  
  17. {  
  18.     private final ConsumerConnector consumer;  
  19.     private final String topic;  
  20.   
  21.     public KafkaConsumer(String topic)  
  22.     {  
  23.         consumer = kafka.consumer.Consumer.createJavaConsumerConnector(  
  24.                 createConsumerConfig());  
  25.         this.topic = topic;  
  26.     }  
  27.   
  28.     private static ConsumerConfig createConsumerConfig()  
  29.     {  
  30.         Properties props = new Properties();  
  31.         props.put("zookeeper.connect", KafkaProperties.zkConnect);  
  32.         props.put("group.id", KafkaProperties.groupId);  
  33.         props.put("zookeeper.session.timeout.ms""40000");  
  34.         props.put("zookeeper.sync.time.ms""200");  
  35.         props.put("auto.commit.interval.ms""1000");  
  36.         return new ConsumerConfig(props);  
  37.     }  
  38.   
  39.     @Override  
  40.     public void run() {  
  41.         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
  42.         topicCountMap.put(topic, new Integer(1));  
  43.         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);  
  44.         KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);  
  45.         ConsumerIterator<byte[], byte[]> it = stream.iterator();  
  46.         while (it.hasNext()) {  
  47.             System.out.println("receive:" + new String(it.next().message()));  
  48.             try {  
  49.                 sleep(3000);  
  50.             } catch (InterruptedException e) {  
  51.                 e.printStackTrace();  
  52.             }  
  53.         }  
  54.     }  
  55. }  
【上篇】
【下篇】

抱歉!评论已关闭.