kafka
下载
http://kafka.apache.org/downloads.html
解压
tar -zxvf kafka_2.10-0.8.1.1.tgz
启动服务
首先启动zookeeper服务
bin/zookeeper-server-start.sh config/zookeeper.properties
启动Kafka
bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &
创建topic
创建一个"test"的topic,一个分区一个副本
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看主题
bin/kafka-topics.sh --list --zookeeper localhost:2181
查看主题详情
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
删除主题
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
创建生产者 producer
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
创建消费者 consumer
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class TestProducer { public static void main(String[] args) { Properties originalProps = new Properties(); //broker originalProps.put("metadata.broker.list", "192.168.1.111:9092"); //把数据序列化到broker originalProps.put("serializer.class", "kafka.serializer.StringEncoder"); originalProps.put("request.required.acks", "1"); Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(originalProps )); for(int j = 0; j < 100; j++) { producer.send(new KeyedMessage<String, String>("testkafka", null, j+"kafka")); } producer.close(); }
package testkafka; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; import kafka.serializer.StringDecoder; import kafka.utils.VerifiableProperties; public class TestConsumer { public static void main(String[] args) { Properties originalProps = new Properties(); originalProps.put("zookeeper.connect", "192.168.1.111:2181"); originalProps.put("group.id", "234"); originalProps.put("serializer.class", "kafka.serializer.StringEncoder"); ConsumerConnector consumer = Consumer .createJavaConsumerConnector(new ConsumerConfig(originalProps)); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put("testkafka", 1); StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); StringDecoder valueDecoder = new StringDecoder( new VerifiableProperties()); Map<String, List<KafkaStream<String, String>>> topicMessageStreams = consumer .createMessageStreams(topicCountMap, keyDecoder, valueDecoder); KafkaStream<String, String> kafkaStream = topicMessageStreams.get( "testkafka").get(0); ConsumerIterator<String, String> iterator = kafkaStream.iterator(); while (iterator.hasNext()) { MessageAndMetadata<String, String> next = iterator.next(); System.out.println(next.message()); } } }
}
flume
下载上传解压
$ cp conf/flume-conf.properties.template conf/flume.conf
$ cp conf/flume-env.sh.template conf/flume-env.sh
配置
JAVA_HOME
启动
bin/flume-ng agent-conf -f ./conf/agent1.conf -n agent1 -Dflume.root.logger=DEBUG,console
启动客户端
$ bin/flume-ng avro-client --conf conf -H localhost -p 41414 -F ~/.bashrc