Kafka+OpenCV 实现实时流视频处理

 1. 启动Kafka Server

bin/kafka-server-start.sh config/server.properties &

2. 创建一个新topic

bin/kafka-topics.sh --create --zookeeper xxxx --replication-factor 1 --partitions 1 --topic video

3. 安装相关依赖

sudo pip-3.6 install kafka-python opencv-contrib-python imutils

4. 创建一个 Kafka Producer,并发送到kafka消息队列
# kafkaProducer
def publish_video(server, topic):
   # start producer
  
producer = KafkaProducer(bootstrap_servers=server)

vs = VideoStream(src=0).start()
   time.sleep(2.0)

print("publishing video...")

while True:
      frame = vs.read()
      frame = imutils.resize(frame, width=400)
      frame = detection(frame, ‘pretrained.prototxt.txt‘, ‘pretrained.caffemodel‘)

# send to kafka topic
     
producer.send(topic, frame.tobytes())

vs.stop()

这里使用opencv 采集本地摄像头视频,读取每一帧数据,做图像识别处理,并转化为bytes数据发送到 kafka topic。

其中detection方法为以一个已经训练好的深度学习模型,用于做图像识别以及描绘边框。网上类似模型很多,这里不多做赘述。这里对每帧做了一个resize是由于原视频采集的每帧数据较大,超过了kafka里默认的一个item大小,所以需要裁剪每帧,以减少传输数据量。

5. 验证是否可以接收到流数据:

bin/kafka-console-consumer.sh --bootstrap-server xxxx:port --topic video

6. 创建一个Kafka Consumer,用于获取流数据

from imutils.video import VideoStream from imutils.video import FPS import imutils import numpy as np import time import cv2 from kafka import KafkaConsumer import sys

def showCam(server, topic):     consumer = KafkaConsumer(         topic,         bootstrap_servers=[server])

    fps = FPS().start()

    for msg in consumer:         decoded = np.frombuffer(msg.value, np.uint8)         decoded = decoded.reshape(225, 400, 3)

        cv2.imshow("Cam", decoded)

        key = cv2.waitKey(1) & 0xFF         if key == ord("q"):             break

        fps.update()

    fps.stop()     cv2.destroyAllWindows()

这里使用np.frombuffer() 方法将每一帧的bytes数据转为一维numpy数组,由于采集的帧数据为3维numpy数组,所以需要对此数组做reshape,以还原为原数据格式,最后显示在屏幕上。

7. 执行代码:

首先启动 Consumer:python3 kafkaCCam.py server:port topic

然后启动 Producer:python3 KafkaPCam.py server:port topic

即可在Consumer端获取到Producer送入到流里的实时视频图像:

原文地址:https://www.cnblogs.com/zackstang/p/10312176.html

时间: 2024-11-13 16:26:47

Kafka+OpenCV 实现实时流视频处理的相关文章

http实时流下载转换成视频

http实时流下载转换视频 请求样式 https://alhlsvodhls02.e.vhall.com/vhallyun/vhallhls/ls/s_/db/lss_db7eb88c/lss_db7eb88c/20191214194456183822/livestream000239.ts 所以我们就是把这些ts二进制下载下来转换成视频格式应该就可以播放了. 试了一下下载一个ts直接在迅雷里可以播放几秒视频的. 参考一下方法可行,但是我没找到批量下载. https://jingyan.baid

大数据实时流统计视频教程(项目实战)

38套大数据,云计算,架构,数据分析师,Hadoop,Spark,Storm,Kafka,人工智能,机器学习,深度学习,项目实战视频教程 视频课程包含: 38套大数据和人工智能精品高级课包含:大数据,云计算,架构,数据挖掘实战,实时推荐系统实战,电视收视率项目实战,实时流统计项目实战,离线电商分析项目实战,Spark大型项目实战用户分析,智能客户系统项目实战,Linux基础,Hadoop,Spark,Storm,Docker,Mapreduce,Kafka,Flume,OpenStack,Hiv

如何优化传输机制来实现实时音视频的超低延迟?

1.前言 要在语音视频 SDK 中实现超低延迟,实时的语音视频传输机制是必不可少的,而 FEC 和 ARQ 的智能结合是实时语音视频传输机制的基石. 在语音社交.视频社交.游戏语音和互动直播等领域,关于在语音视频实时传输中实现低延迟这个议题,已经有不少的文章提出各种方案.绝大部分方案的思路都是"优化",比如说,优化编码.推流.传输和播放等各个环节. 愚以为,要在实时语音视频传输中获得超低延迟,是不能单靠挖空心思去"优化"的,而是要依靠实时的传输机制.就像高铁和火车有

技术调研参考——业界开源实时流处理系统小结

这里对目前业界开源的一些实时流处理系统做一次小结,作为日后进行技术调研的参考资料. S4 S4(Simple Scalable Streaming System)是Yahoo最新发布的一个开源流计算平台,它是一个通用的.分布式的.可扩展性良好.具有分区容错能力.支持插件的分布式流计算平台,在该平台上程序员可以很方便地开发面向无界不间断流数据处理的应用,开发语言为Java. 项目链接:http://incubator.apache.org/s4/(注:S4 0.5.0已支持TCP链接及状态恢复等特

[翻译和注解]Kafka Streams简介: 让流处理变得更简单

Introducing Kafka Streams: Stream Processing Made Simple 这是Jay Kreps在三月写的一篇文章,用来介绍Kafka Streams.当时Kafka Streams还没有正式发布,所以具体的API和功能和0.10.0.0版(2016年6月发布)有所区别.但是Jay Krpes在这简文章里介绍了很多Kafka Streams在设计方面的考虑,还是很值得一看的. 以下的并不会完全按照原文翻译,因为那么搞太累了……这篇文件的确很长,而且Jay

.Spark Streaming(上)--实时流计算Spark Streaming原理介

Spark入门实战系列--7.Spark Streaming(上)--实时流计算Spark Streaming原理介绍 http://www.cnblogs.com/shishanyuan/p/4747735.html 1.Spark Streaming简介 1.1 概述 Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的.具备容错机制的实时流数据的处理.支持从多种数据源获取数据,包括Kafk.Flume.Twitter.ZeroMQ.Kinesis 以及TCP

Windows下利用live555实现H264实时流RTSP发送

文如其名,最近在做的项目要求利用RTSP协议转发处理完的H264视频数据给上一层客户端,环境是Windows的VS2013,于是就各种百度谷歌找代码.结果在得到利用live555去做比较简单的结论的同时也悲情地发现,网上别人贴出来的代码基本都是Linux上面的.在修改了两份来适用于Windows无效后,又一次陷入了百度谷歌的无尽搜索中.Anyway,最后终于解决了,所以贴出代码跟大家分享下,希望能给和我需求相似的童鞋一点启发,也希望有高手指正其中的问题. 用live555进行RTSP的播放基本上

Android端实时音视频开发指南

简介 yun2win-sdk-Android提供Android端实时音视频完整解决方案,方便客户快速集成实时音视频功能. SDK 提供的能力如下: 发起 加入 AVClient Channel AVMember yun2win官网:www.yun2win.com SDK下载地址:http://www.yun2win.com/h-col-107.html 开发准备 注册并创建应用 到 github下载yun2winSDK及demo 下载源码详解 app为主体显示Module uikit为公共服务M

Spark入门实战系列--7.Spark Streaming(上)--实时流计算Spark Streaming介绍

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送–Spark入门实战系列>获取 1 Spark Streaming简介 1.1 概述 Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的.具备容错机制的实时流数据的处理.支持从多种数据源获取数据,包括Kafk.Flume.Twitter.ZeroMQ.Kinesis 以及TCP sockets,从数据源获取数据之后,可以使用诸如map.reduce.join和window等高级函数进行复杂算法的处理