分布式消息系统jafka快速起步(转)

Jafka 是一个开源的/性能良好的分布式消息系统。在上一篇文章中有所简单介绍。
下面是一篇简单的入门文档。更多详细的文档参考wiki

Step 1: 下载最新的安装包

完整的安装指南在这里。
最新的发行版地址在:https://github.com/adyliu/jafka/downloads

$wget https://github.com/downloads/adyliu/jafka/jafka-1.0.tgz
$tar xzf jafka-1.0.tgz
$cd jafka-1.0 

可选配置,设置一个环境变量。 $export $JAFKA_HOME=/opt/apps/jafka-1.0 以下假设所有操作目录都在$JAFKA_HOME下。

Step 2: 启动服务端

这里启动一个单进程的服务端,使用默认的配置启动即可。由于一些路径使用了相对路径,因此需要在jafka的主目录下运行。

$bash bin/server-single.sh config/server-single.properties 

默认情况下,无需任何配置即可运行服务端。这时服务端会将9092端口绑定到所有网卡上。

Step 3: 发送消息

使用自带的小命令行就可以发送简单的文本消息。

$bin/producer-console.sh --broker-list 0:localhost:9092 --topic demo
> Welcome to jafka
> 中文中国 

producer-console.sh有一些参数,这可以通过执行下面的命令得到。 $bin/producer-console.sh

发送消息只需要在提示符号‘>‘输入文本即可,没有出错意味着发送成功,直接回车或者输入CTRL+C退出程序。

Step 4: 启动消费者

现在是时候消费刚才发送的消息。

同样Jafka自带一个小程序能够消费简单的文本消息。

$bin/simple-consumer-console.sh --topic demo --server jafka://localhost:9092
[1] 26: Welcome to jafka
[2] 48: 中文中国 

连接上服务端后,立即就看到有消息消费了。默认情况下simple-consumer-console.sh输出消息的序号(实际上不存在)以及消息的下一个偏移量(offset)。

解压缩后只需要执行上面三条命令就可以完成简单的消息发送和接受演示。这就是一个简单的消息系统。

Step 5: 手动编码

我们希望利用提供的API手动编码能够发送和接受一些消息。

消息发送者

首先写一个简单的消息发送者。

public static void main(String[] args) throws Exception {
    Properties props = new Properties();
    props.put("broker.list", "0:127.0.0.1:9092");
    props.put("serializer.class", StringEncoder.class.getName());
    //
    ProducerConfig config = new ProducerConfig(props);
    Producer<String, String> producer = new Producer<String, String>(config);
    //
    StringProducerData data = new StringProducerData("demo");
    for(int i=0;i<1000;i++) {
        data.add("Hello world #"+i);
    }
    //
    try {
        long start = System.currentTimeMillis();
        for (int i = 0; i < 100; i++) {
            producer.send(data);
        }
        long cost = System.currentTimeMillis() - start;
        System.out.println("send 100000 message cost: "+cost+" ms");
    } finally {
        producer.close();
    }
}

看起来有点复杂,我们简单分解下。

配置参数

首先需要配置服务端的地址。一个jfaka服务端地址格式如下:

brokerId:host:port 
  • brokerId 用于标识服务进程,这在一个集群里面是全局唯一的
  • host/port 用户描述服务监听的ip地址和端口,默认情况下会在所有网卡的9092端口监听数据。

配置完服务端信息后,我们需要提供一个消息编码。

消息编码用于将任意消息类型编码成字节数组,这些字节数组就是我们的消息体。 

默认情况下Jafka解析字节数组编码,也就是原封不动的发送出去。这里简单替换下,使用字符串UTF-8编码。

构造消息客户端

使用上面简单的参数就可以构造出来一个简单的消息发送客户端。

消息发送客户端(Producer)用于管理与服务端之间的连接,并将消息按照指定的编码方式发送给服务端。 

构造消息

用于使用字符串编码,因此这里只能发送字符串的数据。每一个消息数据包都可以带有多条消息,只需要满足一个消息数据包的大小不超过默认的1M即可。比如下面就构造发往主题为demo的100条消息的数据包:

StringProducerData data = new StringProducerData("demo");
for(int i=0;i<1000;i++) {
data.add("Hello world #"+i);
} 

发送消息

最后发送消息只需要调用producer.send()即可。上述例子中循环发送100次。

下面是某次发送的结果:

$bin/run-console.sh demo.client.StaticBrokerSender
send 100000 message cost: 685 ms 

消息接受者

接受消息的逻辑非常简单,只需要配置服务端的地址,然后从偏移量0开始顺序消费消息即可。

下面的逻辑是简单的将接受的消息以UTF-8的字符串展示。

SimpleConsumer consumer = new SimpleConsumer("127.0.0.1", 9092);
//
long offset = 0;
while (true) {
    FetchRequest request = new FetchRequest("test", 0, offset);
    for (MessageAndOffset msg : consumer.fetch(request)) {
        System.out.println(Utils.toString(msg.message.payload(), "UTF-8"));
        offset = msg.offset;
    }
}


整合ZooKeeper

Jafka 使用zookeeper进行自动broker寻址以及消费者负载均衡。

(1)启动zookeeper服务

测试时可以使用一个单进程的zookeeper用于替换zookeeper集群。

$bin/zookeeper-server.sh config/zookeeper.properties 

(2)启动Jafka服务端

$bin/server-single.sh config/server.properties 
[2012-04-24 12:29:56,526] INFO Starting Jafka server (com.sohu.jafka.server.Server.java:68)
[2012-04-24 12:29:56,532] INFO starting log cleaner every 60000 ms (com.sohu.jafka.log.LogManager.java:155)
[2012-04-24 12:29:56,552] INFO connecting to zookeeper: 127.0.0.1:2181 (com.sohu.jafka.server.Zookeeper.java:80)
[2012-04-24 12:29:56,568] INFO Starting ZkClient event thread. (com.github.zkclient.ZkEventThread.java:64)

服务端启动后自动向zookeeper注册服务端的信息,例如ip地址、端口、已存在的消息等。

(3)启动消息发送者

$bin/producer-console.sh --zookeeper localhost:2181 --topic demo
Enter you message and exit with empty string.
> Jafka second day
> Jafka use zookeeper to search brokers and consumers                                       
>

和上面启动的消息发送者类似,只不过这里使用zookeeper配置自动寻找服务端,而不是指定服务端地址。

(4)启动消息接受者

$bin/consumer-console.sh --zookeeper localhost:2181 --topic demo --from-beginning
Jafka second day
Jafka use zookeeper to search brokers and consumers

这时候很快就看到刚才发送的消息了。

由于使用zookeeper作为配置中心,因此可以启动更多的服务端、消息发送者、消息接受者。只需要保证都连接zookeeper,并且所有的服务端都有唯一的brokerId(位于server.properties中).

(5)API使用

上面是使用自带的程序发送简单的文本消息。这里利用API来进行开发。

发送消息

public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("zk.connect", "localhost:2181");
        props.put("serializer.class", StringEncoder.class.getName());
        //
        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(config);
        //
        StringProducerData data = new StringProducerData("demo");
        for(int i=0;i<100;i++) {
            data.add("Hello world #"+i);
        }
        //
        try {
            long start = System.currentTimeMillis();
            for (int i = 0; i < 100; i++) {
                producer.send(data);
            }
            long cost = System.currentTimeMillis() - start;
            System.out.println("send 10000 message cost: "+cost+" ms");
        } finally {
            producer.close();
        }
    }

和不使用zookeeper的消息发送者对比,只需要将服务端配置信息替换成zookeeper连接地址即可。其它完全一致。

接收消息

接受消息看起来稍微有点复杂,简单来说是如下几步:

  • 配置zookeeper以及客户端groupid
  • 与服务端的连接
  • 创建消息流
  • 启动线程池消费消息

public static void main(String[] args) throws Exception {

Properties props = new Properties();
    props.put("zk.connect", "localhost:2181");
    props.put("groupid", "test_group");
    //
    ConsumerConfig consumerConfig = new ConsumerConfig(props);
    ConsumerConnector connector = Consumer.create(consumerConfig);
    //
    Map<String, List<MessageStream<String>>> topicMessageStreams = connector.createMessageStreams(ImmutableMap.of("demo", 2), new StringDecoder());
    List<MessageStream<String>> streams = topicMessageStreams.get("demo");
    //
    ExecutorService executor = Executors.newFixedThreadPool(2);
    final AtomicInteger count = new AtomicInteger();
    for (final MessageStream<String> stream : streams) {
        executor.submit(new Runnable() {

public void run() {
                for (String message : stream) {
                    System.out.println(count.incrementAndGet() + " => " + message);
                }
            }
        });
    }
    //
    executor.awaitTermination(1, TimeUnit.HOURS);
}

所有消息的消费方式几乎都相同,只是消费的topic名称不同而已。



是不是很简单,动手试试吧

https://github.com/adyliu/jafka/wiki/quickstart.zh_CN

http://www.blogjava.net/xylz/archive/2012/05/11/377938.html

时间: 2024-10-13 10:57:24

分布式消息系统jafka快速起步(转)的相关文章

分布式消息系统Jafka入门指南之二

分布式消息系统Jafka入门指南之二 作者:chszs,转载需注明.博客主页:http://blog.csdn.net/chszs 三.Jafka的文件夹结构 1.安装tree命令 $ sudo yum install tree 2.查看文件夹 $ tree -L 1 . ?..? ? bin ? ..?? conf ?..?? data ? ..?? lib ? ..?? LICENSE ?..? ? logs ?..?? VERSION 说明:bin文件夹:命令行脚本conf文件夹:存放配置

分布式消息系统Jafka入门指南

分布式消息系统Jafka入门指南 作者:chszs,转载需注明.博客主页:http://blog.csdn.net/chszs 一.JafkaMQ简介 JafkaMQ是一个分布式的发布/订阅消息系统,它是Apache Kafka的Java移植版. 2013年11月28日,JafkaMQ发布了1.2.3版. JafkaMQ的特征如下: 1)消息持久化到磁盘的算法时间复杂度为O(1),即使是TB级的消息存储,也能保证常量时间的执行性能.2)高吞吐量:即使是低配制的硬件条件,单个Broker也能支持每

[kfaka] Apache Kafka:下一代分布式消息系统

简介 Apache Kafka是分布式发布-订阅消息系统.它最初由LinkedIn公司开发,之后成为Apache项目的一部分.Kafka是一种快速.可扩展的.设计内在就是分布式的,分区的和可复制的提交日志服务. Apache Kafka与传统消息系统相比,有以下不同: 它被设计为一个分布式系统,易于向外扩展: 它同时为发布和订阅提供高吞吐量: 它支持多订阅者,当失败时能自动平衡消费者: 它将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序. 本文我将重点介绍Apache Kaf

转 Apache Kafka:下一代分布式消息系统

简介 Apache Kafka是分布式发布-订阅消息系统.它最初由LinkedIn公司开发,之后成为Apache项目的一部分.Kafka是一种快速.可扩展的.设计内在就是分布式的,分区的和可复制的提交日志服务. Apache Kafka与传统消息系统相比,有以下不同: 它被设计为一个分布式系统,易于向外扩展: 它同时为发布和订阅提供高吞吐量: 它支持多订阅者,当失败时能自动平衡消费者: 它将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序. 本文我将重点介绍Apache Kaf

KAFKA分布式消息系统

KAFKA分布式消息系统 2011-08-28 18:32:46 分类: LINUX Kafka[1]是linkedin用于日志处理的分布式消息队列,linkedin的日志数据容量大,但对可靠性要求不高,其日志数据主要包括用户行为(登录.浏览.点击.分享.喜欢)以及系统运行日志(CPU.内存.磁盘.网络.系统及进程状态). 当前很多的消息队列服务提供可靠交付保证,并默认是即时消费(不适合离线).高可靠交付对linkedin的日志不是必须的,故可通过降低可靠性来提高性能,同时通过构建分布式的集群,

kafka--高性能的分布式消息系统

kafka是一个分布式的,高吞吐量的.信息分片存储,消息同步复制的开源消息服务,它提供了消息系统的功能,但是采用了独特的设计. kafka最初由LinkedIn设计开发,使用Scala语言编写,用作LinkedIn网站的活动流数据和运营数据处理工具,这其中活动流数据是指页面访问量.被查看内容方面的信息以及搜索情况等内容,运营数据是指服务器的性能数据(CPU.IO使用率.请求时间.服务日志等数据). 现在kafka已被多家不同类型的公司采用,作为其内部各种数据的处理工具或消息队列服务.如今kafk

KAFKA分布式消息系统[转]

KAFKA分布式消息系统  转自:http://blog.chinaunix.net/uid-20196318-id-2420884.html Kafka[1]是linkedin用于日志处理的分布式消息队列,linkedin的日志数据容量大,但对可靠性要求不高,其日志数据主要包括用户行为(登录.浏览.点击.分享.喜欢)以及系统运行日志(CPU.内存.磁盘.网络.系统及进程状态). 当前很多的消息队列服务提供可靠交付保证,并默认是即时消费(不适合离线).高可靠交付对linkedin的日志不是必须的

Kafka——分布式消息系统

Kafka——分布式消息系统 架构 Apache Kafka是2010年12月份开源的项目,采用scala语言编写,使用了多种效率优化机制,整体架构比较新颖(push/pull),更适合异构集群. 设计目标: (1) 数据在磁盘上的存取代价为O(1)(2) 高吞吐率,在普通的服务器上每秒也能处理几十万条消息(3) 分布式架构,能够对消息分区(4) 支持将数据并行的加载到hadoop Kafka实际上是一个消息发布订阅系统.producer向某个topic发布消息,而consumer订阅某个top

分布式消息系统 Kafka 简介

Kafka是分布式发布-订阅消息系统.它最初由LinkedIn公司开发,之后成为Apache项目的一部分.Kafka是一个分布式的,可划分的,冗余备份的持久性的日志服务.它主要用于处理活跃的流式数据. 在大数据系统中,常常会碰到一个问题,整个大数据是由各个子系统组成,数据需要在各个子系统中高性能,低延迟的不停流转.传统的企业消息系统并不是非常适合大规模的数据处理.为了已在同时搞定在线应用(消息)和离线应用(数据文件,日志)Kafka就出现了.Kafka可以起到两个作用: 降低系统组网复杂度. 降