kafka基本命令和实践

Kafka基本命令

#启动server
./bin/kafka-server-start.sh config/server.properties

#创建topic(主题)test
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 -partitions 1 --topic test

#删除主题
./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
#– 注意:如果kafaka启动时加载的配置文件中server.properties没有配置delete.topic.enable=true,那么此 时的删除并不是真正的删除,而是把topic标记为:marked for deletion
#– 此时你若想真正删除它,可以登录zookeeper客户端,进入终端后,删除相应节点

#查看主题
./bin/kafka-topics.sh --list --zookeeper localhost:2181

#查看主题test的详情
./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

#Consumer读消息
./bin/kafka-console-consumer.sh --zookeeper master:2181 --topic badou --from-beginning

#Producer发消息
./bin/kafka-console-producer.sh --broker-list master:9092 --topic badou

用Kafka和Flume搭建日志系统

1.master节点和slave节点启动zookeeper

./bin/zkServer.sh start

2.启动kafka

#启动server
./bin/kafka-server-start.sh config/server.properties

#创建topic badou
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 -partitions 1 --topic badou

#Consumer读消息
./bin/kafka-console-consumer.sh --zookeeper master:2181 --topic badou --from-beginning

3.启动Flume

./bin/flume-ng agent -c conf -f conf/flume_kafka.conf -n a1 -Dflume.root.logger=INFO,console

Flume配置文件flume_kafka.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /home/badou/flume_test/flume_exec_test.txt

#a1.sinks.k1.type = logger
# 设置kafka接收器
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
# 设置kafka的broker地址和端口号
a1.sinks.k1.brokerList=master:9092
# 设置Kafka的topic
a1.sinks.k1.topic=badou
# 设置序列化的方式
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder

# use a channel which buffers events in memory
a1.channels.c1.type=memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 1000

# Bind the source and sink to the channel
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

4.执行python脚本

模拟将后端日志写入日志文件中

python flume_data_write.py 

python代码:

# -*- coding: utf-8 -*-
import random
import time
import pandas as pd
import json

writeFileName="./flume_exec_test.txt"
cols = ["order_id","user_id","eval_set","order_number","order_dow","hour","day"]
df1 = pd.read_csv(‘/mnt/hgfs/share_folder/00-data/orders.csv‘)
df1.columns = cols
df = df1.fillna(0)
with open(writeFileName,‘a+‘)as wf:
    for idx,row in df.iterrows():
        d = {}
        for col in cols:
            d[col]=row[col]
        js = json.dumps(d)
        wf.write(js+‘\n‘)

原文地址:https://www.cnblogs.com/xumaomao/p/12708357.html

时间: 2024-10-11 16:25:59

kafka基本命令和实践的相关文章

[bigdata] kafka基本命令 -- 迁移topic partition到指定的broker

版本 0.9.2 创建topic bin/kafka-topics.sh --create --topic topic_name --partition 6 --replication-factor 1 -zookeeper 10.27.100.207:2181,10.27.100.144:2181,10.27.100.145:2181 开启console consumer查看消息 bin/kafka-console-consumer.sh --topic rt_live_pcweb -zook

kafka基本命令

1.启动nohup kafka-server-start /usr/local/etc/kafka/server-1.properties & 2.查看消费者kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning 3.描述kafka-topics --describe --bootstrap-server localhost:9092 --topic my-replicated-

腾讯资深架构师给你讲解 kafka的基本原理,带你实战实践

前言 Kafka是最初由Linkedin公司开发,是一个分布式.支持分区的(partition).多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统.低延迟的实时系统.storm/Spark流式处理引擎,web/nginx日志.访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目. kafka入门与实践 第一章 kaf

Kafka安装

1.Kafka基本术语 Producer :消息生产者,就是向kafka broker发消息的客户端 Consumer :消息消费者,向kafka broker取消息的客户端 Topic :话题,可以理解为一个队列 Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段.一个topic可以有多个CG.topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个CG只会把消息发给该C

Kafka集群安装与扩容

介绍略 集群安装: 一.准备工作: 1.版本介绍: 目前我们使用版本为kafka_2.9.2-0.8.1(scala-2.9.2为kafka官方推荐版本,此外还有2.8.2和2.10.2可以选择) 2.环境准备: 安装JDK6,目前使用版本为1.6,并配置JAVA_HOME 3.配置修改: 1)拷贝线上配置到本地kafka目录. 2)需要注意的是server.properties里broker和ip的指定,必须要唯一. 3)server.properties中log.dirs必须要手动指定.此配

饶军:Apache Kafka的过去,现在,和未来

欢迎大家前往腾讯云+社区,获取更多腾讯海量技术实践干货哦~ 本文首发在云+社区,未经许可,不得转载. 大家好,我大概简单的介绍一下,我叫饶军,我是硅谷的初创公司Con?uent的联合创始人之一,我们公司的三个创始人都是在最开始在领这个公司做kafka开发出身的.我们公司是2014年成立的,成立的宗旨想把公司做成一个帮助各种各样企业做基于kafka之上的数据流的事情. 在开始之前,我想大概做一个简单的调查,在座的有谁用过Kafka.大概有80%的人都用了.好,谢谢.今天跟大家分享,想分享一下我们的

Kafka项目实践

用户日志上报实时统计之编码实践 1.概述 本课程的视频教程地址:<Kafka实战项目之编码实践>  该课程我以用户实时上报日志案例为基础,带着大家去完成各个KPI的编码工作,实现生产模块.消费模块,数据持久化,以及应用调度等工作, 通过对这一系列流程的演示,让大家能够去掌握Kafka项目的相关编码以及调度流程.下面,我们首先来预览本课程所包含的课时,他们分别有: 接下来,我们开始第一课时的学习:<数据生产实现> 2.内容 2.1 数据生产实现 本课时主要给大家演示Kafka数据生产

实践部署与使用apache kafka框架技术博文资料汇总

前一篇Kafka框架设计来自英文原文(Kafka Architecture Design)的翻译及整理文章,很有借鉴性,本文是从一个企业使用Kafka框架的角度来记录及整理的Kafka框架的技术资料,也很有借鉴价值,为了便于阅读与分享,我将其整理一篇Blog.本文内容目录摘要如下: 1)apache kafka消息服务 2)kafka在zookeeper中存储结构 3)kafka log4j配置 4)kafka replication设计机制 5)apache kafka监控系列-监控指标 6)

window下Kafka最佳实践

Kafka的介绍和入门请看这里kafka入门:简介.使用场景.设计原理.主要配置及集群搭建(转) 当前文章从实践的角度为大家规避window下使用的坑. 1.要求: java 8 2.下载kafka[注意,只用下载kafka] http://mirrors.cnnic.cn/apache/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz 3.解压,通过cmd进入kafka_2.11-0.9.0.1\bin\windows目录 4.启动zk zookeeper-server