php连接kafka

1、首先安装kafka扩展

#安装librdkafka:   版本: https://github.com/edenhill/librdkafka/releases/tag/v0.9.2
$   git clone https://github.com/edenhill/librdkafka.git
$  ./configure
$  make
$  sudo make install

#安装 rdkafka.so  版本:https://github.com/arnaud-lb/php-rdkafka/releases/tag/3.0.1
$ git clone https://github.com/arnaud-lb/php-rdkafka.git
$ cd php-rdkafka
$ phpize
$ ./configure
$ make all -j 5
$ sudo make install

2、生产者代码示例

$rcf = new RdKafka\Conf();
    $rcf->set(‘group.id‘, ‘test‘);  //topicname
    $cf = new RdKafka\TopicConf();
    $cf->set(‘offset.store.method‘, ‘broker‘);
    $cf->set(‘auto.offset.reset‘, ‘smallest‘);
    $rk = new RdKafka\Producer($rcf);
    $rk->setLogLevel(LOG_DEBUG);
    $rk->addBrokers("127.0.0.1"); //brokeraddr
    $topic = $rk->newTopic("test", $cf);  //topicname
    for($i = 0; $i < 10; $i++) {
       $topic->produce(0,0,‘test‘ . $i);
     }

3、消费者代码示例

$rcf = new RdKafka\Conf();
    $rcf->set(‘group.id‘, ‘test‘);
    $rcf->set(‘broker.version.fallback‘, ‘0.8.2‘);  //brokername,kafkaversion
    $cf = new RdKafka\TopicConf();
    $cf->set(‘auto.offset.reset‘, ‘smallest‘);
    $cf->set(‘auto.commit.enable‘, true);
    $rk = new RdKafka\Consumer($rcf);
    $rk->setLogLevel(LOG_DEBUG);
    $rk->addBrokers("127.0.0.1"); //brokeraddr
    $topic = $rk->newTopic("test", $cf);  //topicname,topicobject
    $topic->consumeStart(0,10);  //partition,offset
    $msg = $topic->consume(0, 1000);   //partition,timeout
    var_dump($msg);

时间: 2024-10-26 21:11:37

php连接kafka的相关文章

SpringBoot 连接kafka ssl 报 CertificateException: No subject alternative names present 异常解决

当使用较新版本SpringBoot时,对应的 kafka-client 版本也比较新,如果使用了 2.x 以上的 kafka-client ,并且配置了 kafka ssl 连接方式时,可能会报如下异常: javax.net.ssl.SSLException: Inbound closed before receiving peer's close_notify: possible truncation attack? ..... org.apache.kafka.common.errors.S

Flume连接Kafka的broker出错

在启动Flume的时候,出现下面的异常,但是程序照样能运行,Kafka也能够收到数据,只是偶尔会断点. 2016-08-25 15:32:54,561 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Fetching metadata from broker id:2,host:10.208.129.5,port:9092 with

Java 连接Kafka报错java.nio.channels.ClosedChannelExcep

Java 客户端连接Kafka报如下错误 bin/kafka-console-consumer.sh --zookeeper 255.255.255.255:2181 --topic eventbustopic [2015-06-02 16:23:04,375] WARN Fetching topic metadata with correlation id 0 for topics [Set(eventbustopic)] from broker [id:1,host:SOME_HOST,po

Golang 连接Kafka

Kafka介绍 Kafka是Apache软件基金会开发的一个开源流处理平台,由Java和Scala编写:Kafka是一种高吞吐.分布式.基于订阅发布的消息系统. Kafka名称解释 Producer:生产者 Consumer:消费者 Topic:消息主题,每一类的消息称之为一个主题 Broker:Kafka以集群的方式运行,可以由一个或多个服务器组成,每个服务器叫做一个broker Partition:物理概念上的分区,为了提供系统吞吐量,在物理上每个Topic会分为一个或多个Partition

asp.netcore Log4Net连接kafka的方法

1.NuGet添加2个包: Microsoft.Extensions.Logging.Log4Net.AspNetCore log4net.Kafka.Core 2.Program里修改CreateWebHostBuilder: public class Program { public static void Main(string[] args) { System.Threading.ThreadPool.SetMinThreads(200, 200); // NLogBuilder.Con

本地连接虚拟机内的kafka遇到的问题

学习技术,提升自己 安装kafka 1.官网找到压缩包,下载并上传到虚拟机内(没钱买不起服务器) 2.执行解压缩命令 tar -zxvf kafka_2.11-2.1.0.tgz 3.进入到kafka_2.11-2.1.0/config目录里,执行图片中的命令,将zookeeper.properties中的信息筛选出来并将筛选出来的数据给一个新建的文件zk.properties cat zookeeper.properties | grep -v '#' >> zk.properties 4.

kafka无法发送消息问题处理

背景 在服务器上搭建了一个单机环境的kafka broker,在服务器上使用命令生产消息时,一切正常.当在本地使用JAVA程序发送消息时,一直出错. 抛出的错误为: Exception in thread "main" Failed to send requests for topics test with correlation ids in [0,12] kafka.common.FailedToSendMessageException: Failed to send messag

Flume+Kafka整合

一.准备工作 准备5台内网服务器创建Zookeeper和Kafka集群 服务器地址: 192.168.2.240 192.168.2.241 192.168.2.242 192.168.2.243 192.168.2.244 服务器系统:Centos 6.5  64位 下载安装包 Zookeeper:http://apache.fayea.com/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz Flume:http://apache.fayea.

java客户端作为kafka的consumer报错org.I0Itec.zkclient.exception.ZkTimeoutException

出错现象: java客户端编程作为kafka的消费端,连接kafka的broker报错 出错原因分析: 当服务器配置或者网络环境较差时,会出现连接zk超时的情况出现; 解决方法:将程序中的timeout数值调大 props.put("zookeeper.session.timeout.ms", "15000");