Kafka 的範例,可參考這個 project:A kafka producer and consumer example in scala and java,通常是先執行 Consumer,再執行 Producer,才會在 Consumer console 中看到 Producer 的訊息。
Kafka Comsumer Examples
Scala 版本
import java.util.Properties
import java.util.concurrent._
import scala.collection.JavaConversions._
import kafka.consumer.Consumer
import kafka.consumer.ConsumerConfig
import kafka.utils._
import kafka.utils.Logging
import kafka.consumer.KafkaStream
class ScalaConsumerExample(val zookeeper: String,
val groupId: String,
val topic: String,
val delay: Long) extends Logging {
val config = createConsumerConfig(zookeeper, groupId)
val consumer = Consumer.create(config)
var executor: ExecutorService = null
def shutdown() = {
if (consumer != null)
consumer.shutdown();
if (executor != null)
executor.shutdown();
}
def createConsumerConfig(zookeeper: String, groupId: String): ConsumerConfig = {
val props = new Properties()
props.put("zookeeper.connect", zookeeper);
props.put("group.id", groupId);
props.put("auto.offset.reset", "largest");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
val config = new ConsumerConfig(props)
config
}
def run(numThreads: Int) = {
val topicCountMap = Map(topic -> numThreads)
val consumerMap = consumer.createMessageStreams(topicCountMap);
val streams = consumerMap.get(topic).get;
executor = Executors.newFixedThreadPool(numThreads);
var threadNumber = 0;
for (stream <- streams) {
executor.submit(new ScalaConsumerTest(stream, threadNumber, delay))
threadNumber += 1
}
}
}
object ScalaConsumerExample extends App {
// 程式的進入點
if (args.length <= 0) {
val server = "192.168.1.7:2181";
val group = "group1";
val topic = "test";
val delay = 0
val numThreads = 10
val example = new ScalaConsumerExample(server, group, topic, delay)
example.run(numThreads)
} else {
val server = args(0)
val group = args(1)
val topic = args(2)
val numThreads = args(3).toInt
val delay = args(4).toLong
val example = new ScalaConsumerExample(server, group, topic, delay)
example.run(numThreads)
}
}
class ScalaConsumerTest(val stream: KafkaStream[Array[Byte], Array[Byte]], val threadNumber: Int, val delay: Long) extends Logging with Runnable {
def run {
val it = stream.iterator()
while (it.hasNext()) {
val msg = new String(it.next().message());
System.out.println(System.currentTimeMillis() + ",Thread " + threadNumber + ": " + msg);
}
System.out.println("Shutting down Thread: " + threadNumber);
}
}
Java 版本
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ConsumerExample {
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;
private long delay;
public ConsumerExample(String zookeeper, String groupId, String topic, long delay) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(zookeeper, groupId));
this.topic = topic;
this.delay = delay;
}
public void shutdown() {
if (consumer != null)
consumer.shutdown();
if (executor != null)
executor.shutdown();
}
public void run(int numThreads) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
executor = Executors.newFixedThreadPool(numThreads);
int threadNumber = 0;
for (final KafkaStream<byte[], byte[]> stream : streams) {
executor.submit(new ConsumerTest(consumer, stream, threadNumber, delay));
threadNumber++;
}
}
private static ConsumerConfig createConsumerConfig(String zookeeper, String groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", zookeeper);
props.put("auto.offset.reset", "largest");
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
//props.put("auto.commit.enable", "false");
return new ConsumerConfig(props);
}
public static void main(String[] args) throws InterruptedException {
String args0 = "";
String args1 = "";
String args2 = "";
int args3 = 0;
long args4 = 0;
if (args.length <= 0) {
args0 = "192.168.1.7:2181";
args1 = "group1";
args2 = "test";
args3 = 10;
args4 = 0;
} else {
args0 = args[0];
args1 = args[1];
args2 = args[2];
args3 = Integer.parseInt(args[3]);
args4 = Long.parseLong(args[4]);
}
String zooKeeper = args0;
String groupId = args1;
String topic = args2;
int threads = args3;
long delay = args4;
ConsumerExample example = new ConsumerExample(zooKeeper, groupId, topic, delay);
example.run(threads);
Thread.sleep(24 * 60 * 60 * 1000);
example.shutdown();
}
}
Kafka Producer Examples
Scala 版本
import kafka.producer.ProducerConfig
import java.util.Properties
import scala.util.Random
import kafka.producer.Producer
import kafka.producer.KeyedMessage
import java.util.Date
object ScalaProducerExample extends App {
// java -cp kafka_example-0.1.0-SNAPSHOT.jar com.colobu.kafka.ScalaProducerExample 10000 colobu localhost:9092
var args0 = 0;
var args1 = "";
var args2 = "";
if (args.length <= 0) {
args0 = 500
args1 = "test"
args2 = "192.168.1.7:9092,192.168.1.7:9093,192.168.1.7:9094"
} else {
args0 = args(0).toInt
args1 = args(1)
args2 = args(2)
}
val events = args0
val topic = args1
val brokers = args2
val rnd = new Random()
val props = new Properties()
props.put("metadata.broker.list", brokers)
props.put("serializer.class", "kafka.serializer.StringEncoder")
//props.put("partitioner.class", "com.colobu.kafka.SimplePartitioner")
props.put("producer.type", "async")
//props.put("request.required.acks", "1")
val config = new ProducerConfig(props)
val producer = new Producer[String, String](config)
val t = System.currentTimeMillis()
for (nEvents <- Range(0, events)) {
val runtime = new Date().getTime()
val ip = "192.168.2." + rnd.nextInt(255)
val msg = runtime + "," + nEvents + ",www.example.com," + ip
val data = new KeyedMessage[String, String](topic, ip, msg)
producer.send(data)
}
System.out.println("sent per second: " + events * 1000 / (System.currentTimeMillis() - t));
producer.close();
}
Java 版本
import java.util.Date;
import java.util.Properties;
import java.util.Random;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class ProducerExample {
public static void main(String[] args) {
long args0 = 0;
String args1 = "";
String args2 = "";
if (args.length <= 0) {
args0 = 500;
args1 = "test";
args2 = "192.168.1.7:9092,192.168.1.7:9093,192.168.1.7:9094";
} else {
args0 = Long.parseLong(args[0]);
args1 = args[1];
args2 = args[2];
}
long events = args0;
String topic = args1;
String brokers = args2;
Random rnd = new Random();
Properties props = new Properties();
props.put("metadata.broker.list", brokers);
props.put("serializer.class", "kafka.serializer.StringEncoder");
//props.put("partitioner.class", "com.colobu.kafka.SimplePartitioner");
props.put("producer.type", "async");
//props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
long t = System.currentTimeMillis();
for (long nEvents = 0; nEvents < events; nEvents++) {
long runtime = new Date().getTime();
String ip = "192.168.2." + rnd.nextInt(255);
String msg = runtime + "," + nEvents + ",www.example.com," + ip;
KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, ip, msg);
producer.send(data);
}
System.out.println("sent per second: " + events * 1000/ (System.currentTimeMillis() - t));
producer.close();
}
}
沒有留言:
張貼留言