Linux下librdkafka客户端的编译运行

Linux下librdkafka客户端的编译运行

  librdkafka是一个开源的Kafka客户端C/C++实现,提供了Kafka生产者、消费者接口。

  由于项目需要,我要将Kafka生产者接口封装起来给别人调用,所以先安装了librdkakfa,然后在demo上进行修改封装一个生产者接口。

[一] 安装librdkafka

   首先在github上下载librdkafka源码,解压后进行编译;

   cd librdkafka-master

   chmod 777 configure lds-gen.py

   ./configure

   make

   make install

   在make的时候,如果是64位Linux会报下面这个异常

   /bin/ld:librdkafka.lds:1: syntax error in VERSION script

   只要Makefile.config里面的WITH_LDS=y这一行注释掉就不会报错了。

[二] 封装librdkafka的生产者接口

#include <ctype.h>
#include <signal.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <syslog.h>
#include <time.h>
#include <sys/time.h>

#include "librdkafka/rdkafka.h"  /* for Kafka driver */

static int run = 1;
static rd_kafka_t *rk;
rd_kafka_topic_t *rkt;
int partition = RD_KAFKA_PARTITION_UA;
rd_kafka_topic_conf_t *topic_conf;

static void stop (int sig) {
    run = 0;
    fclose(stdin); /* abort fgets() */
}

static void sig_usr1 (int sig) {
    rd_kafka_dump(stdout, rk);
}

int initProducer (char *parameters) {
    int argc = 1;
    char **argv;
    char *para;
    char *delim = " ";
    char *brokers = "localhost:9092";
    char *topic = NULL;
    int opt;
    rd_kafka_conf_t *conf;
    char errstr[512];
    char tmp[16];

    char copyParameters[1024];
    strcpy(copyParameters, parameters);
    para = strtok(parameters, delim);
    argc++;
    while((para = strtok(NULL, delim)) != NULL){
        argc++;
    }
    argv = (char**)malloc(argc*sizeof(char*));
    argc = 0;
    argv[argc] = "initProducer";
    para = strtok(copyParameters, delim);
    argc++;
    argv[argc] = para;
    while((para = strtok(NULL, delim)) != NULL){
        argc++;
        argv[argc] = para;
    }
    argc++;
    /* Kafka configuration */
    conf = rd_kafka_conf_new();
    /* Quick termination */
    snprintf(tmp, sizeof(tmp), "%i", SIGIO);
    rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);
    /* Topic configuration */
    topic_conf = rd_kafka_topic_conf_new();
    while ((opt = getopt(argc, argv, "PCLt:p:b:z:qd:o:eX:As:")) != -1) {
        switch (opt) {
        case ‘t‘:
            topic = optarg;
            break;
        case ‘p‘:
            partition = atoi(optarg);
            break;
        case ‘b‘:
            brokers = optarg;
            break;
        default:
            fprintf(stderr, "%% Failed to init producer with error parameters\n");
        }
    }
    if (optind != argc || !topic) {
        exit(1);
    }
    signal(SIGINT, stop);
    signal(SIGUSR1, sig_usr1);
    /* Create Kafka handle */
    if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)))) {
        fprintf(stderr, "%% Failed to create new producer: %s\n", errstr);
        exit(1);
    }
    rd_kafka_set_log_level(rk, LOG_DEBUG);
    /* Add brokers */
    if (rd_kafka_brokers_add(rk, brokers) == 0) {
        fprintf(stderr, "%% No valid brokers specified\n");
        exit(1);
    }
    /* Create topic */
    rkt = rd_kafka_topic_new(rk, topic, topic_conf);
    topic_conf = NULL; /* Now owned by topic */
    return 1;
}

int freeProducer()
{
    /* Destroy topic */
    rd_kafka_topic_destroy(rkt);
    /* Destroy the handle */
    rd_kafka_destroy(rk);
    if (topic_conf)
        rd_kafka_topic_conf_destroy(topic_conf);
    /* Let background threads clean up and terminate cleanly. */
    run = 5;
    while (run-- > 0 && rd_kafka_wait_destroyed(1000) == -1)
        printf("Waiting for librdkafka to decommission\n");
    if (run <= 0)
        rd_kafka_dump(stdout, rk);
    return 1;
}

int main (int argc, char **argv)
{
    char parameter[] = "-t XX-HTTP-KEYWORD-LOG -b 10.10.6.101:9092,10.10.6.102:9092,10.10.6.104:9092";
    char buf[1024];
    initProducer(parameter);
    while (run && fgets(buf, sizeof(buf), stdin)) {
        if(rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, buf, strlen(buf), NULL, 0, NULL) == -1){
            fprintf(stderr, "%% Failed to produce to topic %s partition %i: %s\n", rd_kafka_topic_name(rkt), partition, rd_kafka_err2str(rd_kafka_last_error()));
        }else{
            fprintf(stderr, "%% Sent %zd bytes to topic %s partition %i\n", strlen(buf), rd_kafka_topic_name(rkt), partition);
        }
    }
    freeProducer();
    return 0;
}

[三] 编译运行

   编译的时候要加上-lrdkafka -lz -lpthread -lrt这些选项:gcc myProducer.c -o myProducer -lrdkafka -lz -lpthread -lrt

   在编译的时候会报error while loading share library librdkafak.so.1,这是因为make的时候将librdkafak.so.1放在了/usr/local/lib下,在Linux的默认共享库路径/lib和/usr/lib下找不到,只要执行下面两句就可以了:

   echo "/usr/local/lib" >> /etc/ld.so.conf
   ldconfig

   运行./myProducer,会不断的从终端读取键入的字符串,然后发送到Kafka,通过Kafka自带的console consumer能够消费查看数据。

时间: 2024-12-28 22:30:03

Linux下librdkafka客户端的编译运行的相关文章

linux下的APK反编译软件及过程介绍

需要工具: 1.apktool apk打包工具 下载地址:http://android-apktool.googlecode.com/files/apktool1.5.2.tar.bz2 安装:直接解压即可,是一个apktool.jar文件,通过 $java -jar apktool.jar 来运行,依赖于java运行环境 2.dex2jar dex转化jar工具 下载地址:http://dex2jar.googlecode.com/files/dex2jar-0.0.9.15.zip 安装:直

[转]Caffe在Linux下的安装,编译,实验

Caffe在Linux下的安装,编译,实验 原文地址:http://www.cnblogs.com/evansyang/p/6150118.html 第一部分:Caffe 简介 caffe是有伯克利视觉和学习中心(BVLC)开发.作者是伯克利博士贾杨清.caffe是一个深度学习(deep learning)框架.其具有易读.快速和模块化思想. 第二部分:Caffe安装与配置 2.1 配置环境:ubuntu 14.04LTS, 使用Homebrew进行安装.暂不使用GPU,所以使用CPU-ONLY

Linux下ORACLE客户端安装详解

1.首先去oracle官网下载以下安装包(http://www.oracle.com/technetwork/topics/linuxsoft-082809.html) instantclient-basic-linux.x64-11.2.0.3.0.zip instantclient-odbc-linux-11.2.0.3.0.zip instantclient-sdk-linux.x64-11.2.0.3.0.zip instantclient-sqlplus-linux.x64-11.2.

Linux下通过源码编译安装程序

ASK: Linux下通过源码编译安装程序(configure/make/make install的作用) configure Linux 平台有各种不同的配置,安装时需要通过 configure 来确定,如:编译器用的是 cc 还是 gcc.不同库文件所在目录等.执行 configure 后会生成 Makefile,Makefile 规定了用什么编译器.编译参数等信息. make 根据 Makefile 中规定的内容进行编译,生成的可执行文件放在当前目录或某个子目录. make install

Linux下用Intel编译器编译安装NetCDF-Fortan库(4.2版本后)

本来这个问题真的没必要写的,可是真的困扰我太久%>_<%,决定还是记录一下. 首先,最权威清晰的安装文档还是官方的: Building the NetCDF-4.2 and later Fortran libraries (写此文时,最近版为4.2) 那这个文档最开始就告诉我们,自NetCDF库4.2版本以后,Fortran的库和C的库就要分开build啦!而且要装Fortran的库必须先装好C的库. 所以先装C的库咯:仍然官方文档: Getting and Building NetCDF-C

Linux 下 简单客户端服务器通讯模型(TCP)

原文:Linux 下 简单客户端服务器通讯模型(TCP) 服务器端:server.c #include<stdio.h> #include<stdlib.h> #include<errno.h> #include<string.h> #include<sys/socket.h> #include<sys/types.h> #include <stdio.h> #include <unistd.h> #inclu

Linux下C语言的编译全过程解读

Linux下C语言的编译全过程解读 我们总是在Linux下输入 gcc -o app main.c 即可编译好程序,对于具体的详细过程及流程,相信亲们就不太了解啦!下面给大家展示一下C编译器的解释全过程. Linux下编辑调试工具: gcc gdb  ,把高级语言编译成二进制可执行代码的工具. 把高级语言编译成二进制可执行文件的过程如下: 需要经历四个步骤: (1)预处理:去掉注释,进行宏替换(#define相关),头文件(#include)包含等工作. gcc–E test.c –o test

linux下安装htk工具箱并运行demo[centos6.5]

一直没有静下心来好好研究研究htk.最近听了豆豆童鞋的建议,决定从头看一遍htk.就从安装开始吧.顺便做个整理. 我的环境是centos 6.5 32bit 在vmware虚拟机中运行. 一.首先要安装依赖.yum的用法就不做记录了,需要安装的是gcc和libx11,命令分别是: sudo yum install gcc* sudo yum install libx11*[因为我没有使用root账号,所以要使用sudo,如果你的账号不是sudo用户,可以添加到sudoers文件] 执行vi /e

linux下arm平台Qt编译环境搭建与解析

一.概述: ???? 我们知道QTcreator.这仅仅是个IDE,他包含了一个编译器--qmake.这两者的关系与codeblocks和g++的关系一样,首先要明确这些. ???? 而我们在linux下搭建arm平台的QT编译环境,基本的是要使用适合arm的qmake.正如我们编译在arm上使用c++程序时要用arm-none-linux-gnueabi-g++一样.而qmake仅仅是我们使用的工具,自然不须要也不能换.所以我们仅仅要使用arm-none-linux-gnueabi-g++编译