读取zookeeper保存的topic元数据

读取zookeeper保存的topic元数据

Table of Contents

  • 1. 有以下问题
  • 2. 解决方法
  • 3. 代码
    • 3.1. KafkaHelper类
    • 3.2. main.cc完整代码

1 有以下问题

  • 需要使用producer才能获得元数据
  • 当producer和consumer共用一些对象时会出现无法读取数据的问题

2 解决方法

用独立的类封装获取元数据的代码,避免共用变量

3 代码

3.1 KafkaHelper类

#ifndef KAFKA_HELPER_H_
#define KAFKA_HELPER_H_

#include <string>
using std::string;

#include "librdkafka/rdkafkacpp.h"
#include "librdkafka/rdkafka.h"
#include <zookeeper/zookeeper.h>
#include <zookeeper/zookeeper.jute.h>
#include <jansson.h>

#define BROKER_PATH "/brokers/ids"

static rd_kafka_t *rk;

class KafkaHelper {
 public:
  static string Brokers(string const& zookeeper) {
    zhandle_t * zh = initialize_zookeeper(zookeeper);
    char brokers[1024];
    set_brokerlist_from_zookeeper(zh, brokers);
    return brokers;
  }

  static void PrintTopicMeta(string const& topic_name) {
    /*
     * Create producer using accumulated global configuration.
     */
    RdKafka::Conf * global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    string zookeeper("localhost:2181");
    string brokers = KafkaHelper::Brokers(zookeeper);
    string errstr;
    global_conf->set("metadata.broker.list", brokers, errstr);
    RdKafka::Conf * topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
    RdKafka::Producer *producer = RdKafka::Producer::create(global_conf, errstr);
    if (!producer) {
      std::cerr << "Failed to create producer: " << errstr << std::endl;
      exit(1);
    }

    std::cout << "% Created producer " << producer->name() << std::endl;

    /*
     * Create topic handle.
     */
    RdKafka::Topic * topic = RdKafka::Topic::create(producer, topic_name, topic_conf, errstr);

    if (!topic) {
      std::cerr << "Failed to create topic: " << errstr << std::endl;
      exit(1);
    }

    bool run = true;
    while (run) {
      class RdKafka::Metadata *metadata;

      // Fetch metadata
      RdKafka::ErrorCode err = producer->metadata(topic!=NULL, topic, &metadata, 5000);
      if (err != RdKafka::ERR_NO_ERROR) {
        std::cerr << "%% Failed to acquire metadata: "
                  << RdKafka::err2str(err) << std::endl;
        run = 0;
        break;
      }

      KafkaHelper::PrintMeta(topic_name, metadata);

      delete metadata;
      run = 0;
    }
  }

  static void PrintMeta(string const & topic, const RdKafka::Metadata *metadata) {
    std::cout << "Metadata for " << (topic.empty() ? "" : "all topics")
              << "(orig broker id from broker "  << metadata->orig_broker_id()
              << ":" << metadata->orig_broker_name() << std::endl;

    /* Iterate brokers */
    std::cout << " " << metadata->brokers()->size() << " brokers:" << std::endl;
    RdKafka::Metadata::BrokerMetadataIterator ib;
    for (ib = metadata->brokers()->begin(); ib != metadata->brokers()->end(); ++ib) {
      std::cout << "  broker " << (*ib)->id() << " at "
                << *(*ib)->host() << ":" << (*ib)->port() << std::endl;
    }
    /* Iterate topics */
    std::cout << metadata->topics()->size() << " topics:" << std::endl;
    RdKafka::Metadata::TopicMetadataIterator it;
    for (it = metadata->topics()->begin(); it != metadata->topics()->end(); ++it) {
      std::cout << "  topic "<< *(*it)->topic() << " with "
                << (*it)->partitions()->size() << " partitions" << std::endl;

      if ((*it)->err() != RdKafka::ERR_NO_ERROR) {
        std::cout << " " << err2str((*it)->err());
        if ((*it)->err() == RdKafka::ERR_LEADER_NOT_AVAILABLE) {
          std::cout << " (try again)";
        }
      }
      std::cout << std::endl;

      /* Iterate topic‘s partitions */
      RdKafka::TopicMetadata::PartitionMetadataIterator ip;
      for (ip = (*it)->partitions()->begin(); ip != (*it)->partitions()->end(); ++ip) {
        std::cout << "    partition " << (*ip)->id()
                  << " leader " << (*ip)->leader()
                  << ", replicas: ";

        /* Iterate partition‘s replicas */
        RdKafka::PartitionMetadata::ReplicasIterator ir;
        for (ir = (*ip)->replicas()->begin();
             ir != (*ip)->replicas()->end() ;
             ++ir) {
          std::cout << (ir == (*ip)->replicas()->begin() ? ",":"") << *ir;
        }

        /* Iterate partition‘s ISRs */
        std::cout << ", isrs: ";
        RdKafka::PartitionMetadata::ISRSIterator iis;
        for (iis = (*ip)->isrs()->begin(); iis != (*ip)->isrs()->end() ; ++iis)
          std::cout << (iis == (*ip)->isrs()->begin() ? ",":"") << *iis;

        if ((*ip)->err() != RdKafka::ERR_NO_ERROR)
          std::cout << ", " << RdKafka::err2str((*ip)->err()) << std::endl;
        else
          std::cout << std::endl;
      }
    }
  }

 private:
  static void watcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx) {
    char brokers[1024];
    if (type == ZOO_CHILD_EVENT && strncmp(path, BROKER_PATH, sizeof(BROKER_PATH) - 1) == 0)
      {
        brokers[0] = ‘\0‘;
        set_brokerlist_from_zookeeper(zh, brokers);
        if (brokers[0] != ‘\0‘ && rk != NULL)
          {
            rd_kafka_brokers_add(rk, brokers);
            rd_kafka_poll(rk, 10);
          }
      }
  }

  static zhandle_t* initialize_zookeeper(string const& zookeeper) {
    zhandle_t * zh = zookeeper_init(zookeeper.c_str(), watcher, 10000, 0, 0, 0);
    if (zh == NULL) {
      fprintf(stderr, "Zookeeper connection not established.");
      exit(1);
    }
    return zh;
  }

  static void set_brokerlist_from_zookeeper(zhandle_t *zzh, char *brokers) {
    if (zzh) {
      struct String_vector brokerlist;
      if (zoo_get_children(zzh, BROKER_PATH, 1, &brokerlist) != ZOK) {
        fprintf(stderr, "No brokers found on path %s\n", BROKER_PATH);
        return;
      }

      int i;
      char *brokerptr = brokers;
      for (i = 0; i < brokerlist.count; i++) {
        char path[255], cfg[1024];
        sprintf(path, "/brokers/ids/%s", brokerlist.data[i]);
        int len = sizeof(cfg);
        zoo_get(zzh, path, 0, cfg, &len, NULL);

        if (len > 0) {
          cfg[len] = ‘\0‘;
          json_error_t jerror;
          json_t *jobj = json_loads(cfg, 0, &jerror);
          if (jobj) {
            json_t *jhost = json_object_get(jobj, "host");
            json_t *jport = json_object_get(jobj, "port");

            if (jhost && jport) {
              const char *host = json_string_value(jhost);
              const int   port = json_integer_value(jport);
              sprintf(brokerptr, "%s:%d", host, port);

              brokerptr += strlen(brokerptr);
              if (i < brokerlist.count - 1) {
                *brokerptr++ = ‘,‘;
              }
            }
            json_decref(jobj);
          }
        }
      }
      deallocate_String_vector(&brokerlist);
      printf("Found brokers %s\n", brokers);
    }
  }
};

#endif

3.2 main.cc完整代码

这里包含了读取数据的代码

#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <csignal>
#include <cstring>
#include <getopt.h>
#include <list>
#include "helper/kafka_helper.h"

using std::string;
using std::list;
using std::cout;
using std::endl;

static bool run = true;
static bool exit_eof = true;

class MyEventCb : public RdKafka::EventCb {
public:
  void event_cb (RdKafka::Event &event) {
    switch (event.type())
      {
      case RdKafka::Event::EVENT_ERROR:
        std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
          event.str() << std::endl;
        if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
          run = false;
        break;

      case RdKafka::Event::EVENT_STATS:
        std::cerr << "\"STATS\": " << event.str() << std::endl;
        break;

      case RdKafka::Event::EVENT_LOG:
        fprintf(stderr, "LOG-%i-%s: %s\n",
                event.severity(), event.fac().c_str(), event.str().c_str());
        break;

      default:
        std::cerr << "EVENT " << event.type() <<
          " (" << RdKafka::err2str(event.err()) << "): " <<
          event.str() << std::endl;
        break;
      }
  }
};

void msg_consume(RdKafka::Message* message, void* opaque) {
  switch (message->err()) {
  case RdKafka::ERR__TIMED_OUT:
    break;

  case RdKafka::ERR_NO_ERROR:
    /* Real message */
    std::cout << "Read msg at offset " << message->offset() << std::endl;
    if (message->key()) {
      std::cout << "Key: " << *message->key() << std::endl;
    }
    cout << static_cast<const char *>(message->payload()) << endl;
    break;

  case RdKafka::ERR__PARTITION_EOF:
    cout << "reach last message" << endl;
    /* Last message */
    if (exit_eof) {
      run = false;
    }
    break;

  case RdKafka::ERR__UNKNOWN_TOPIC:
  case RdKafka::ERR__UNKNOWN_PARTITION:
    std::cerr << "Consume failed: " << message->errstr() << std::endl;
    run = false;
    break;

  default:
    /* Errors */
    std::cerr << "Consume failed: " << message->errstr() << std::endl;
    run = false;
  }
}

class MyConsumeCb : public RdKafka::ConsumeCb {
public:
  void consume_cb (RdKafka::Message &msg, void *opaque) {
    msg_consume(&msg, opaque);
  }
};

static void sigterm (int sig) {
  run = false;
}

int main (int argc, char **argv) {
  /*
   * Process kill signal, quit from the loop
   */
  signal(SIGINT, sigterm);
  signal(SIGTERM, sigterm);

  /*
   * Get broker list from zookeeper
   */
  string zookeeper("localhost:2181");
  string brokers = KafkaHelper::Brokers(zookeeper);
  cout << "brokers from zookeeper is: " << brokers << endl;

  string topic_name = "test2";
  /*
   * Print topic meta
   */
  KafkaHelper::PrintTopicMeta(topic_name);

  /*
   * Global conf objects
   */
  RdKafka::Conf * global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
  string errstr;
  global_conf->set("metadata.broker.list", brokers, errstr);
  MyEventCb ex_event_cb;
  global_conf->set("event_cb", &ex_event_cb, errstr);

  /*
   * Topic conf objects
   */
  RdKafka::Conf * topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

  /*
   * Create consumer using accumulated global configuration.
   */
  RdKafka::Consumer *consumer = RdKafka::Consumer::create(global_conf, errstr);
  if (!consumer) {
    std::cerr << "Failed to create consumer: " << errstr << std::endl;
    exit(1);
  }

  std::cout << "% Created consumer " << consumer->name() << std::endl;

  /*
   * Start consumer for topic+partition at start offset
   */
  int32_t partition = 0;
  int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;

  RdKafka::Topic *topic2 = RdKafka::Topic::create(consumer, topic_name, topic_conf, errstr);
  RdKafka::ErrorCode resp = consumer->start(topic2, 0, start_offset);
  if (resp != RdKafka::ERR_NO_ERROR) {
    std::cerr << "Failed to start consumer: " <<
      RdKafka::err2str(resp) << std::endl;
    exit(1);
  }

  /*
   * Consume messages
   */
  MyConsumeCb ex_consume_cb;
  int use_ccb = 0;
  while (run) {
    if (use_ccb) {
      consumer->consume_callback(topic2, partition, 1000, &ex_consume_cb, &use_ccb);
    } else {
      RdKafka::Message *msg = consumer->consume(topic2, partition, 1000);
      msg_consume(msg, NULL);
      delete msg;
    }
    consumer->poll(0);
  }  

  /*
   * Stop consumer
   */
  consumer->stop(topic2, partition);
  consumer->poll(1000);

  delete topic2;
  delete consumer;

  /*
   * Wait for RdKafka to decommission.
   * This is not strictly needed (when check outq_len() above), but
   * allows RdKafka to clean up all its resources before the application
   * exits so that memory profilers such as valgrind wont complain about
   * memory leaks.
   */
  RdKafka::wait_destroyed(5000);

  return 0;
}

Created: 2016-05-02 Mon 13:07

Validate

再分享一下我老师大神的人工智能教程吧。零基础!通俗易懂!风趣幽默!还带黄段子!希望你也加入到我们人工智能的队伍中来!https://blog.csdn.net/jiangjunshow

原文地址:https://www.cnblogs.com/skiwnywh/p/10322586.html

时间: 2024-09-20 22:45:37

读取zookeeper保存的topic元数据的相关文章

大数据技术之_19_Spark学习_02_Spark Core 应用解析+ RDD 概念 + RDD 编程 + 键值对 RDD + 数据读取与保存主要方式 + RDD 编程进阶 + Spark Core 实例练习

第1章 RDD 概念1.1 RDD 为什么会产生1.2 RDD 概述1.2.1 什么是 RDD1.2.2 RDD 的属性1.3 RDD 弹性1.4 RDD 特点1.4.1 分区1.4.2 只读1.4.3 依赖1.4.4 缓存1.4.5 CheckPoint第2章 RDD 编程2.1 RDD 编程模型2.2 RDD 创建2.2.1 由一个已经存在的 Scala 集合创建,即集合并行化(测试用)2.2.2 由外部存储系统的数据集创建(开发用)2.3 RDD 编程2.3.1 Transformatio

node-red File读取好保存

File节点是操作文件的节点 file文件的保存 拖拽 注入节点inject  file节点(writes msg.payload to a file)和 debug节点到工作区,并连线 设置file节点的文件路径 windows如果不设置路径,会保存在C:\Users\Administrator docker容器保存在根目录下 此处文件名我选择 放到data目录下,因为docker容器启动的时候我把/data目录挂载到宿主机上了,方便查看文件 行为有三种,追加至文件  复写文件  删除文件 勾

Python,ElementTree模块处理XML时注释无法读取和保存的问题

from xml.etree import ElementTree class CommentedTreeBuilder ( ElementTree.XMLTreeBuilder ): def __init__ ( self, html = 0, target = None ): ElementTree.XMLTreeBuilder.__init__( self, html, target ) self._parser.CommentHandler = self.handle_comment d

二十、Android -- SDcard文件读取和保存

背景                                                                                            一些东西可以存在自己定义的文件里面,这个文件可以在手机中,可以在SD卡中,在这里就主要介绍一下在SD卡中的存储和读取吧~ 代码                                                                                            

Android–SDcard文件读取和保存

背景                                                                                            一些东西可以存在自己定义的文件里面,这个文件可以在手机中,可以在SD卡中,在这里就主要介绍一下在SD卡中的存储和读取吧~ 代码                                                                                            

Spark学习笔记4:数据读取与保存

Spark对很多种文件格式的读取和保存方式都很简单.Spark会根据文件扩展名选择对应的处理方式. Spark支持的一些常见文件格式如下: 1.文本文件 使用文件路径作为参数调用SparkContext中的textFile()函数,就可以读取一个文本文件.也可以指定minPartitions控制分区数.传递目录作为参数,会把目录中的各部分都读取到RDD中.例如: val input = sc.textFile("E:\\share\\new\\chapter5") input.fore

Opencv 图像读取与保存问题

本系列文章由 @yhl_leo 出品,转载请注明出处. 文章链接: http://blog.csdn.net/yhl_leo/article/details/49737357 本文仅对 Opencv图像读取与保存进行阐述,重在探讨图像读取与保存过程中应注意的细节问题. 1 图像读取 首先看一下,imread函数的声明: // C++: Mat based Mat imread(const string& filename, int flags=1 ); // C: IplImage based

利用Python PIL、cPickle读取和保存图像数据库

利用Python PIL.cPickle读取和保存图像数据库  @author:wepon @blog:http://blog.csdn.net/u012162613/article/details/43226127 计算机视觉.机器学习任务中,经常跟图像打交道,在C++上有成熟的OpenCV可以使用,在Python中也有一个图像处理库PIL(Python Image Library),当然PIL没有OpenCV那么多功能(比如一些人脸检测的算法),不过在Python上,我们用PIL进行一些基本

IOS webview中cookie的读取与保存-b

Cookie 的读取 将它放在 webViewDidFinishLoad 开始后执行 NSArray *nCookies = [[NSHTTPCookieStorage sharedHTTPCookieStorage] cookies];NSHTTPCookie *cookie; for (id c in nCookies) { if ([c isKindOfClass:[NSHTTPCookie class]]){ cookie=(NSHTTPCookie *)c; NSLog(@"%@: %