歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
Linux教程網 >> Linux編程 >> Linux編程 >> Kafka的Producer和Consumer的示例(使用Java語言)

Kafka的Producer和Consumer的示例(使用Java語言)

日期:2017/3/1 9:39:11   编辑:Linux編程

我使用的kafka版本是:0.7.2

jdk版本是:1.6.0_20

http://kafka.apache.org/07/quickstart.html官方給的示例並不是很完整,以下代碼是經過我補充的並且編譯後能運行的。

分布式發布訂閱消息系統 Kafka 架構設計 http://www.linuxidc.com/Linux/2013-11/92751.htm

Apache Kafka 代碼實例 http://www.linuxidc.com/Linux/2013-11/92754.htm

Apache Kafka 教程筆記 http://www.linuxidc.com/Linux/2014-01/94682.htm

Kafka使用入門教程 http://www.linuxidc.com/Linux/2014-07/104470.htm

Producer Code

import java.util.*;
import kafka.message.Message;
import kafka.producer.ProducerConfig;
import kafka.javaapi.producer.Producer;
import kafka.javaapi.producer.ProducerData;

public class ProducerSample {


public static void main(String[] args) {
ProducerSample ps = new ProducerSample();

Properties props = new Properties();
props.put("zk.connect", "127.0.0.1:2181");
props.put("serializer.class", "kafka.serializer.StringEncoder");

ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
ProducerData<String, String> data = new ProducerData<String, String>("test-topic", "test-message2");
producer.send(data);
producer.close();
}
}

Consumer Code

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;

public class ConsumerSample {

public static void main(String[] args) {
// specify some consumer properties
Properties props = new Properties();
props.put("zk.connect", "localhost:2181");
props.put("zk.connectiontimeout.ms", "1000000");
props.put("groupid", "test_group");

// Create the connection to the cluster
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

// create 4 partitions of the stream for topic “test-topic”, to allow 4 threads to consume
HashMap<String, Integer> map = new HashMap<String, Integer>();
map.put("test-topic", 4);
Map<String, List<KafkaStream<Message>>> topicMessageStreams =
consumerConnector.createMessageStreams(map);
List<KafkaStream<Message>> streams = topicMessageStreams.get("test-topic");

// create list of 4 threads to consume from each of the partitions
ExecutorService executor = Executors.newFixedThreadPool(4);

// consume the messages in the threads
for (final KafkaStream<Message> stream : streams) {
executor.submit(new Runnable() {
public void run() {
for (MessageAndMetadata msgAndMetadata : stream) {
// process message (msgAndMetadata.message())
System.out.println("topic: " + msgAndMetadata.topic());
Message message = (Message) msgAndMetadata.message();
ByteBuffer buffer = message.payload();
<SPAN > </SPAN>byte[] bytes = new byte[message.payloadSize()];
buffer.get(bytes);
String tmp = new String(bytes);
System.out.println("message content: " + tmp);
}
}
});
}

}
}

分別啟動zookeeper,kafka server之後,依次運行Producer,Consumer的代碼

運行ProducerSample:

運行ConsumerSample:

更多詳情見請繼續閱讀下一頁的精彩內容: http://www.linuxidc.com/Linux/2014-09/107385p2.htm

Copyright © Linux教程網 All Rights Reserved