理解TensorFlow的Queue

https://www.jianshu.com/p/d063804fb272

这篇文章来说说TensorFlow里与Queue有关的概念和用法。

其实概念只有三个:

  • Queue是TF队列和缓存机制的实现
  • QueueRunner是TF中对操作Queue的线程的封装
  • Coordinator是TF中用来协调线程运行的工具

虽然它们经常同时出现,但这三样东西在TensorFlow里面是可以单独使用的,不妨先分开来看待。

1. Queue

根据实现的方式不同,分成具体的几种类型,例如:

  • tf.FIFOQueue 按入列顺序出列的队列
  • tf.RandomShuffleQueue 随机顺序出列的队列
  • tf.PaddingFIFOQueue 以固定长度批量出列的队列
  • tf.PriorityQueue 带优先级出列的队列
  • ... ...

这些类型的Queue除了自身的性质不太一样外,创建、使用的方法基本是相同的。

创建函数的参数:

tf.FIFOQueue(capacity, dtypes, shapes=None, names=None ...)

Queue主要包含入列(enqueue)出列(dequeue)两个操作。enqueue操作返回计算图中的一个Operation节点,dequeue操作返回一个Tensor值。Tensor在创建时同样只是一个定义(或称为“声明”),需要放在Session中运行才能获得真正的数值。下面是一个单独使用Queue的例子:

import tensorflow as tf
tf.InteractiveSession()

q = tf.FIFOQueue(2, "float")
init = q.enqueue_many(([0,0],))

x = q.dequeue()
y = x+1
q_inc = q.enqueue([y])

init.run()
q_inc.run()
q_inc.run()
q_inc.run()
x.eval()  # 返回1
x.eval()  # 返回2
x.eval()  # 卡住

注意,如果一次性入列超过Queue Size的数据,enqueue操作会卡住,直到有数据(被其他线程)从队列取出。对一个已经取空的队列使用dequeue操作也会卡住,直到有新的数据(从其他线程)写入。

2. QueueRunner

Tensorflow的计算主要在使用CPU/GPU和内存,而数据读取涉及磁盘操作,速度远低于前者操作。因此通常会使用多个线程读取数据,然后使用一个线程消费数据。QueueRunner就是来管理这些读写队列的线程的。

QueueRunner需要与Queue一起使用(这名字已经注定了它和Queue脱不开干系),但并不一定必须使用Coordinator。看下面这个例子:

import tensorflow as tf
import sys
q = tf.FIFOQueue(10, "float")
counter = tf.Variable(0.0)  #计数器
# 给计数器加一
increment_op = tf.assign_add(counter, 1.0)
# 将计数器加入队列
enqueue_op = q.enqueue(counter)

# 创建QueueRunner
# 用多个线程向队列添加数据
# 这里实际创建了4个线程,两个增加计数,两个执行入队
qr = tf.train.QueueRunner(q, enqueue_ops=[increment_op, enqueue_op] * 2)

# 主线程
sess = tf.InteractiveSession()
tf.global_variables_initializer().run()
# 启动入队线程
qr.create_threads(sess, start=True)
for i in range(20):
    print (sess.run(q.dequeue()))

增加计数的进程会不停的后台运行,执行入队的进程会先执行10次(因为队列长度只有10),然后主线程开始消费数据,当一部分数据消费被后,入队的进程又会开始执行。最终主线程消费完20个数据后停止,但其他线程继续运行,程序不会结束。

3. Coordinator

Coordinator是个用来保存线程组运行状态的协调器对象,它和TensorFlow的Queue没有必然关系,是可以单独和Python线程使用的。例如:

import tensorflow as tf
import threading, time

# 子线程函数
def loop(coord, id):
    t = 0
    while not coord.should_stop():
        print(id)
        time.sleep(1)
        t += 1
        # 只有1号线程调用request_stop方法
        if (t >= 2 and id == 1):
            coord.request_stop()

# 主线程
coord = tf.train.Coordinator()
# 使用Python API创建10个线程
threads = [threading.Thread(target=loop, args=(coord, i)) for i in range(10)]

# 启动所有线程,并等待线程结束
for t in threads: t.start()
coord.join(threads)

将这个程序运行起来,会发现所有的子线程执行完两个周期后都会停止,主线程会等待所有子线程都停止后结束,从而使整个程序结束。由此可见,只要有任何一个线程调用了Coordinator的request_stop方法,所有的线程都可以通过should_stop方法感知并停止当前线程。

将QueueRunner和Coordinator一起使用,实际上就是封装了这个判断操作,从而使任何一个现成出现异常时,能够正常结束整个程序,同时主线程也可以直接调用request_stop方法来停止所有子线程的执行。

4. 在一起

在TensorFlow中用Queue的经典模式有两种,都是配合了QueueRunner和Coordinator一起使用的。

第一种,显式的创建QueueRunner,然后调用它的create_threads方法启动线程。例如下面这段代码:

import tensorflow as tf

# 1000个4维输入向量,每个数取值为1-10之间的随机数
data = 10 * np.random.randn(1000, 4) + 1
# 1000个随机的目标值,值为0或1
target = np.random.randint(0, 2, size=1000)

# 创建Queue,队列中每一项包含一个输入数据和相应的目标值
queue = tf.FIFOQueue(capacity=50, dtypes=[tf.float32, tf.int32], shapes=[[4], []])

# 批量入列数据(这是一个Operation)
enqueue_op = queue.enqueue_many([data, target])
# 出列数据(这是一个Tensor定义)
data_sample, label_sample = queue.dequeue()

# 创建包含4个线程的QueueRunner
qr = tf.train.QueueRunner(queue, [enqueue_op] * 4)

with tf.Session() as sess:
    # 创建Coordinator
    coord = tf.train.Coordinator()
    # 启动QueueRunner管理的线程
    enqueue_threads = qr.create_threads(sess, coord=coord, start=True)
    # 主线程,消费100个数据
    for step in range(100):
        if coord.should_stop():
            break
        data_batch, label_batch = sess.run([data_sample, label_sample])
    # 主线程计算完成,停止所有采集数据的进程
    coord.request_stop()
    coord.join(enqueue_threads)

第二种,使用全局的start_queue_runners方法启动线程。

import tensorflow as tf

# 同时打开多个文件,显示创建Queue,同时隐含了QueueRunner的创建
filename_queue = tf.train.string_input_producer(["data1.csv","data2.csv"])
reader = tf.TextLineReader(skip_header_lines=1)
# Tensorflow的Reader对象可以直接接受一个Queue作为输入
key, value = reader.read(filename_queue)

with tf.Session() as sess:
    coord = tf.train.Coordinator()
    # 启动计算图中所有的队列线程
    threads = tf.train.start_queue_runners(coord=coord)
    # 主线程,消费100个数据
    for _ in range(100):
        features, labels = sess.run([data_batch, label_batch])
    # 主线程计算完成,停止所有采集数据的进程
    coord.request_stop()
    coord.join(threads)

在这个例子中,tf.train.string_input_produecer会将一个隐含的QueueRunner添加到全局图中(类似的操作还有tf.train.shuffle_batch等)。

由于没有显式地返回QueueRunner来用create_threads启动线程,这里使用了tf.train.start_queue_runners方法直接启动tf.GraphKeys.QUEUE_RUNNERS集合中的所有队列线程。

这两种方式在效果上是等效的。

参考文章

  1. tensorflow中关于队列使用的实验
  2. cs20si课件slides_09

作者:巾梵
链接:https://www.jianshu.com/p/d063804fb272
來源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

原文地址:https://www.cnblogs.com/DjangoBlog/p/9467867.html

时间: 2024-10-23 19:48:29

理解TensorFlow的Queue的相关文章

深入理解nvme hardware queue pair

hardware queue pair是什么 hardware queue pair是我们理解nvme/spdk的牛鼻子,只有深入理解才可能把nvme用好. 从nvme控制器寄存器的角度看 顾名思义,就是一些硬件寄存器组成的队列. 空队列 满队列 问题:能否并发入队.出队?不行 submission hardware queue entry 每个entry如下表所示: 入队: host software 提交命令到tail entry, 通过操作submission hardware queue

[tensorflow] 通过Class的形式实现网络的创建、加深理解graph和session

一.理解graph和session 对tensorflow中的graph和session进行更深入的了解,能够帮助理解tensorflow的运行机制,使得可以在一个运行程序中调用不同的神经网络.总而言之,是深入学习必须掌握的东西. 在程序一开始,tensorflow会自动生成一个默认的graph.如果不显示的指定graph,所有的操作(添加张量.节点)都会自动加入默认图. 可以通过g=tf.Graph()显示获得一个图的变量.然后通过 with g.as_default():上下文管理器,在后文

Tensorflow编程基础之Mnist手写识别实验+关于cross_entropy的理解

好久没有静下心来写点东西了,最近好像又回到了高中时候的状态,休息不好,无法全心学习,恶性循环,现在终于调整的好一点了,听着纯音乐突然非常伤感,那些曾经快乐的大学时光啊,突然又慢慢的一下子出现在了眼前,不知道我大学的那些小伙伴们现在都怎么样了,考研的刚刚希望他考上,实习的菜头希望他早日脱离苦海,小瑞哥希望他早日出成果,范爷熊健研究生一定要过的开心啊!天哥也哥早日结婚领证!那些回不去的曾经的快乐的时光,你们都还好吗! 最近开始接触Tensorflow,可能是论文里用的是这个框架吧,其实我还是觉得py

TensorFlow中的通信机制——Rendezvous(一)本地传输

背景 [作者:DeepLearningStack,阿里巴巴算法工程师,开源TensorFlow Contributor] 在TensorFlow源码中我们经常能看到一个奇怪的词--Rendezvous.如果从仔细统计该单词出现的频率和模块,你会发现无论在单机还是分布式,无论在core目录还是contrib目录都存在它的身影,所涉及的模块非常多.Rendezvous是一个法语单词,发音也比较特殊,一般直译为"约会.相会.会和",而在TensorFlow中,Rendezvous是用来完成消

TensorFlow中的通信机制——Rendezvous(二)gRPC传输

背景 [作者:DeepLearningStack,阿里巴巴算法工程师,开源TensorFlow Contributor] 本篇是TensorFlow通信机制系列的第二篇文章,主要梳理使用gRPC网络传输部分模块的结构和源码.如果读者对TensorFlow中Rendezvous部分的基本结构和原理还不是非常了解,那么建议先从这篇文章开始阅读.TensorFlow在最初被开源时还只是个单机的异构训练框架,在迭代到0.8版本开始正式支持多机分布式训练.与其他分布式训练框架不同,Google选用了开源项

queue(),dequeue()

这两个方法,一个是往里面添加队列,一个是执行队列 也是分静态方法和实例方法, 同样,实例方法最后调用静态方法 源码主要分析一下延迟delay方法,如何起作用的,写的有点仓促,先记录一下 在这里参照了网络上的文章,给推荐一下,介绍的很详细了,作者比较用心: http://www.html-js.com/card/1083,他是基于1.7的,我这边运行的是1.9的,但是核心思想是一样的 jQuery.extend({ queue: function( elem, type, data ) { var

TensorFlow 初探

最近致力于深度学习,希望在移动领域能够找出更多的应用点.其中TensorFlow作为目前的一个热点值得我们重点关注. 机器学习 机器学习是人工智能的一个分支,也是用来实现人工只能的一种方法.简单来说,机器学习就是通过算法,使得机器能从大量历史数据中学习规律,从而对新的样本做智能识别或对未来做预测,与传统的使用特定指令集手写软件不同,我们使用大量数据和算法来"训练"机器,由此带来机器学习如何完成任务.从1980年代末期以来,机器学习的发展大致经历了两次浪潮:浅层学习(Shallow Le

【转】真正从零开始,TensorFlow详细安装入门图文教程!(帮你完成那个最难的从0到1)

AI这个概念好像突然就火起来了,年初大比分战胜李世石的AlphaGo成功的吸引了大量的关注,但其实看看你的手机上的语音助手,相机上的人脸识别,今日头条上帮你自动筛选出来的新闻,还有各大音乐软件的歌曲"每日推荐"--形形色色的AI早已进入我们生活的方方面面.深刻的影响了着我们,可以说,这是一个AI的时代. 其实早在去年年底,谷歌就开源了其用来制作AlphaGo的深度学习系统Tensorflow,相信有不少同学曾经对着这款强大的机器学习系统蠢蠢欲动,但虽然有关Tensorflow的教程其实

Tensorflow显示图片

Tensorflow在处理数据时,经常加载图像数据,有的时候是直接读取文件,有的则是读取二进制文件,为了更好的理解Tensorflow数据处理模式,先简单讲解显示图片机制,就能更好掌握是否读取正确了. 一.结合opencv读取显示图片 1.变量 使用tf.Variable初始化为tensor,加载到tensorflow对图片进行显示 def showimage_variable_opencv(filename): image = cv2.imread(filename) # Create a T