[elk]kafka集群

kafka高可用

并发写

每一个分区都是一个顺序的、不可变的消息队列, 并且可以持续的添加。分区中的消息都被分了一个序列号,称之为偏移量(offset),在每个分区中此偏移量都是唯一的。

并发读

数据组织模式

- 创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic1
- 生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic

1.创建了3个p的topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test-topic3

2.Kafka中可以将Topic从物理上划分成一个或多个分区(Partition),每个分区在物理上对应一个文件夹,以”topicName_partitionIndex”的命名方式命名,该文件夹下存储这个分区的所有消息(.log)和索引文件(.index),这使得Kafka的吞吐率可以水平扩展。
n1: p0
test-topic3-0

n2: p1
test-topic3-1

n3: p2
test-topic3-2
[[email protected] logstash]# ls -ld /data/kafka-logs/test-topic3-0

[[email protected] logstash]# ls -ld /data/kafka-logs/test-topic3-1

[[email protected] logstash]# ls -ld /data/kafka-logs/test-topic3-2

读写

消费者可以从
1.zk拉数据

## src_base_zk
- 查看基于zk的消费组
bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test-topic3

- 查看group详情(判断cusumer是否正常)
bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --group logstash-group --describe

2,kafka拉数据

## src_base_kafka
- 查看zk
bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list

- 查看group详情(判断cusumer是否正常)
bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group logstash-group --describe

- 查看实时消费日志
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test-topic3

探究kafka p和消费者个数之间的关系

实验环境

- 创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic1

- 生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic

监控消费日志
[[email protected] kafka]# bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test-topic
hi
mao

logstash消费

input {
  kafka {
    bootstrap_servers => "localhost:9092"
    topics => "test-topic"
    group_id => "logstash-group"
    codec => "json"
    consumer_threads => 1
    decorate_events => true
  }
}

output {
    stdout { codec => rubydebug }
}

/usr/local/logstash/bin/logstash -f logstash.yaml --config.reload.automatic

一个group,3个p

先创建3个p的test-topic3
1.当有3个p, 1个消费者时

2.当有3个p,2个消费者时

3.当有3个p,3个消费者时

都是动态调配的(新增一个消费者, p的分配会自动变)

4.当有1个p,2个消费者

小结: 同一个消费组, 消费者个数<=p个数

2个group,1个p

kafka cli最佳实战

    - 创建topic
        bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic
        bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test-topic2
        bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test-topic3
    - 查看topic list
        bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
    - 查看topic 详细
        bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic test-topic3 --describe
    - 生产者
        bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic3
    基于zk的消费者
        - 查看基于zk的消费组
            bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test-topic3
        - 开启一个消费者(随机生成group)
            bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test-topic3
        - 查看group详情(判断cusumer是否正常)
            bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --group logstash-group --describe
    基于kafka的消费者
        - 查看基于kafka的消费组
            bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
        - 查看group详情(判断cusumer是否正常)
            bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group logstash-group --describe
        - 开启一个消费者(随机生成group)
            bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test-topic3 --group logstash-group
        - 开启一个消费者(指定group,可能偷走已有的消费者的数据)
            bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test-topic3 --group logstash-group

参考:
http://lxw1234.com/archives/2015/10/538.htm
https://www.cnblogs.com/AcAc-t/p/kafka_topic_consumer_group_command.html
https://www.cnblogs.com/happyday56/p/4208663.html

原文地址:https://www.cnblogs.com/iiiiher/p/9270116.html

时间: 2024-10-10 01:51:25

[elk]kafka集群的相关文章

ELK5.3+Kafka集群配置

[一]资源准备 # 3台4C*8G, 安装Zookeeper.Kafka.Logstash--Broker(input: filebeat; output: Kafka) 10.101.2.23 10.101.2.24 10.101.2.25 # 2台4C*8G, 安装Logstash--Indexer(input: Kafaka; output: Elasticsearch) 10.101.2.26 10.101.2.27 # 3台8C*16G, 安装Elasticsearch 10.101.

如何为Kafka集群选择合适的Partitions数量

转载http://blog.csdn.net/odailidong/article/details/52571901 这是许多kafka使用者经常会问到的一个问题.本文的目的是介绍与本问题相关的一些重要决策因素,并提供一些简单的计算公式. 文章目录 1 越多的分区可以提供更高的吞吐量 2 越多的分区需要打开更多地文件句柄 3 更多地分区会导致更高的不可用性 4 越多的分区可能增加端对端的延迟 5 越多的partition意味着需要客户端需要更多的内存 6 总结 越多的分区可以提供更高的吞吐量 首

ELK stat集群部署+Grafana及可视化图形

1.ELK stat集群部署+Grafana及可视化图形 2.后续会更新.................

kafka集群维护

kafka集群基本信息实时查看和修改: 集群信息实时查看(topic工具) kafka-topics,sh --list --zookeeper x.x.x.x kafka-topics.sh --describe --zookeeper x.x.x.x --topic topic_name 集群信息实时修改(topic工具) 提高topic并发,通过增加topic的partition数来解决. 现在我们搭建了一个集群,怎么知道每个节点的信息呢?运行""describe topics&

KAFKA集群搭建

一.简介 Kafka是Apache下的一个子项目,是一个高性能跨语言分布式发布/订阅消息队列系统,使用Scala编写,它以可水平扩展和高吞吐率而被广泛使用.目前越来越多的开源分布式处理系统如Cloudera.Apache Storm.Spark都支持与Kafka集成.   Kafka适合做什么? 官方文档介绍,它通常被使用在两大类应用中: 搭建实时数据流管道,在系统或应用之间可靠的获取数据 搭建对数据流进行转换或相应的实时流应用程序.   为了了解Kafka具体如何实现这些功能, 首先理解几个概

Kafka集群环境搭建

Kafka介绍 在流式计算中,Kafka一般用来缓存数据,Storm通过消费Kafka的数据进行计算. KAFKA + STORM +REDIS 1.Apache Kafka是一个开源消息系统,用Scala写成. 2.Kafka是一个分布式消息队列:生产者.消费者的功能.它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现. 3.Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接收者成为Consumer,此外Kafka集群由多个Ka

centos7搭建ELK Cluster集群日志分析平台(三)

续  centos7搭建ELK Cluster集群日志分析平台(一) 续  centos7搭建ELK Cluster集群日志分析平台(二) 已经安装好elasticsearch 5.4集群和logstash 5.4 安装kibana步骤 1.下载安装Kibana  ~]#wget https://artifacts.elastic.co/downloads/kibana/kibana-5.4.0-x86_64.rpm 如果链接失效,官网下载:https://www.elastic.co/down

kafka集群安装

集群安装1.解压2.修改server.propertiesbroker.id=1zookeeper.connect=work01:2181,work02:2181,work03:2181 3.将zookeeper集群启动 4.在每一台节点上启动brokerbin/kafka-server-start.sh config/server.properties 5.在kafka集群中创建一个topicbin/kafka-topics.sh --create --zookeeper work01:218

kafka集群搭建和使用Java写kafka生产者消费者

 kafka集群搭建 Java代码   1.zookeeper集群  搭建在110, 111,112 2.kafka使用3个节点110, 111,112 修改配置文件config/server.properties broker.id=110 host.name=192.168.1.110 log.dirs=/usr/local/kafka_2.10-0.8.2.0/logs 复制到其他两个节点,然后修改对应节点上的config/server.pro 3.启动,在三个节点分别执行 bin/kaf