利用TaskScheduler处理Queue、Stack等类型的操作队列(生产者消费者场景)

我们经常会遇到生产者消费者模式,比如前端各种UI操作事件触发后台逻辑等。在这种典型的应用场景中,我们可能会有4个业务处理逻辑(下文以P代表生产者,C代表消费者):

1. FIFO(先进先出)

P产生1,2,3,4,5,6,3,2

C处理顺序应为1,2,3,4,5,6,3,2

2.LIFO(后进先出)

P产生1,2,3,4,5,6,3,2

C处理顺序应为2,3,6,5,4,3,2,1

3.Dynamic FIFO(我定义为:去掉相同数据的FIFO, 如果产生的数据队列里已经有相同数据,后进的数据优先级高)

P产生1,2,3,4,5,6,3,2

C处理顺序为1,4,5,6,3,2

4.Dynamic LIFO(我定义为:去掉相同数据的LIFO, 如果产生的数据栈里已经有相同数据,后进的数据优先级高)

P产生1,2,3,4,5,6,3,2

C处理顺序为2,3,6,5,4,1

1,2情况为基本处理逻辑,3,4可能和我们实际场景有关系(包括:判断相同的逻辑可能不同、已存在和后续数据哪个优先级高)

C#中有个Task类进行异步操作,我们可以通过TaskScheduler类进行任务调度,实现上述的4种基本场景。

定义上述4种场景的通用接口以及其遍历类

public
interface
IScheduler
:
IEnumerable<Task
>

{

void
Add
(Task
t);

void
Remove
(Task
t);

int
Count
{
get; }

Task
this
[int
index] {
get;
set
; }

}

public
class
SchedulerEnumerator
:
IEnumerator<
Task>

{

private
IScheduler
_collection;

private
int
_currentIndex;

private
Task
_currentTask;

public
SchedulerEnumerator
(IScheduler
collection)

{

_collection
=
collection
;

_currentIndex
= -1;

_currentTask
=
default
(Task);

}

public
bool
MoveNext()

{

//Avoids going beyond the end of the collection.

if
(++_currentIndex
>=
_collection.
Count)

{

return
false
;

}

else

{

// Set current box to next item in collection.

_currentTask
=
_collection
[_currentIndex];

}

return
true
;

}

public
void
Reset() {
_currentIndex
= -1; }

void
IDisposable
.Dispose()
{ }

public
Task
Current

{

get
{
return
_currentTask; }

}

object
IEnumerator
.Current

{

get
{
return
Current; }

}

}

实现我们自己的任务调度类模板,可以通过T传递我们想要的队列类型

public
class
TaskSchedulerBase
<T> :
TaskScheduler

where
T :
IScheduler
,
new
()

{

private
Thread
_processThread;

private
readonly
object
_lock =
new
object
();

public
TaskSchedulerBase()

{

_processThread =
new
Thread
(this.Process);

}

private
void
Process()

{

lock
(_lock)

{

var
tasks = GetScheduledTasks();

if
(null
!= tasks)

{

foreach
(var
t
in
tasks)

{

TryExecuteTask(t);

TryDequeue(t);

}

}

}

}

protected
override
void
QueueTask(
Task
task)

{

lock
(_lock)

{

Scheduler.Add(task);

if
(_processThread.ThreadState.Equals(ThreadState
.Stopped))

{

_processThread =
new
Thread
(Process);

}

if
(!_processThread.IsAlive

&& !_processThread.ThreadState.Equals(
ThreadState.Running))

{

try

{

_processThread.Start();

}

catch
(System.Exception
)

{

if
(!_processThread.ThreadState.Equals(ThreadState
.Running))

{

_processThread =
new
Thread
(Process);

_processThread.Start();

}

}

}

}

}

protected
override
bool
TryDequeue(
Task
task)

{

Scheduler.Remove(task);

return
true
;

}

protected
override
IEnumerable<
Task> GetScheduledTasks()

{

return
Scheduler.ToArray();

}

protected
override
bool
TryExecuteTaskInline(
Task
task,
bool
taskWasPreviouslyQueued)

{

if
(taskWasPreviouslyQueued)

{

if
(TryDequeue(task))

{

return
base
.TryExecuteTask(task);

}

else

{

return
false
;

}

}

else

{

return
base
.TryExecuteTask(task);

}

}

private
readonly
T _scheduler =
new
T();

public
T Scheduler

{

get

{

return
_scheduler;

}

}

}

实现4种队列

1.FIFO

public
class
QueueScheduler
:
IScheduler

{

protected
Queue
<Task>
_queue;

public
QueueScheduler
()

{

_queue
=
new
Queue<
Task>();

}

public
void
Add(
Task
t
)

{

if
(!Contains
(t))

{

_queue.Enqueue
(t);

}

}

public
void
Remove(
Task
t
)

{

_queue.Dequeue
();

}

public
bool
Contains(
Task
t
)

{

bool
found
=
false;

foreach
(var
task
in
_queue
)

{

if
(t
.AsyncState
!=
null
&&
t
.AsyncState.
Equals(task
.AsyncState))

{

found
=
true
;

break;

}

}

return
found
;

}

public
bool
Contains(
Task
t
,
EqualityComparer<
Task>
comp
)

{

throw
new
NotImplementedException();

}

public
IEnumerator
<Task>
GetEnumerator()

{

return
new
SchedulerEnumerator(
this);

}

IEnumerator
IEnumerable
.GetEnumerator()

{

return
new
SchedulerEnumerator(
this);

}

public
int
Count

{

get
{
return
_queue.
Count; }

}

public
Task
this[
int
index]

{

get
{
return
(Task)
_queue.ToArray
()[index]; }

set
{
_queue
.ToArray()[index]
= value; }

}

}


2.LIFO

public
class
StackScheduler
:
IScheduler

{

protected
Stack
<Task>
_stack;

public
StackScheduler
()

{

_stack
=
new
Stack<
Task>();

}

public
void
Add(
Task
t
)

{

if
(!Contains
(t))

{

_stack.Push
(t);

}

}

public
void
Remove(
Task
t
)

{

_stack.Pop
();

}

public
bool
Contains(
Task
t
)

{

bool
found
=
false;

foreach
(var
task
in
_stack
)

{

if
(t
.AsyncState
!=
null
&&
t
.AsyncState.
Equals(task
.AsyncState))

{

found
=
true
;

break;

}

}

return
found
;

}

public
bool
Contains(
Task
t
,
EqualityComparer<
Task>
comp
)

{

throw
new
NotImplementedException();

}

public
IEnumerator
<Task>
GetEnumerator()

{

return
new
SchedulerEnumerator(
this);

}

IEnumerator
IEnumerable
.GetEnumerator()

{

return
new
SchedulerEnumerator(
this);

}

public
int
Count

{

get
{
return
_stack.
Count; }

}

public
Task
this[
int
index]

{

get
{
return
(Task)
_stack.ToArray
()[index]; }

set
{
_stack
.ToArray()[index]
= value; }

}

}


3.Dynamic FIFO

public
class
DynamicQueueScheduler
:
IScheduler

{

protected
List
<Task>
_queue;

public
DynamicQueueScheduler
()

{

_queue
=
new
List<
Task>();

}

public
virtual
void
Add(Task
t)

{

Task
oldTask
=
null;

if
(Contains
(t,
out
oldTask
))

{

_queue.Remove
(oldTask);

}

_queue.Add
(t);

}

public
virtual
void
Remove(Task
t)

{

_queue.Remove
(t);

}

public
virtual
bool
Contains(Task
t)

{

Task
oldTask
=
null;

bool
found
=
Contains(
t,
out
oldTask);

return
found
;

}

public
virtual
bool
Contains(Task
t,
out
Task
oldTask)

{

bool
found
=
false;

oldTask
=
null
;

foreach
(var
task
in
_queue
)

{

if
(t
.AsyncState
!=
null
&&
t
.AsyncState.
Equals(task
.AsyncState))

{

oldTask
=
task
;

found
=
true
;

break;

}

}

return
found
;

}

public
virtual
bool
Contains(Task
t,
EqualityComparer<Task
>
comp)

{

throw
new
NotImplementedException();

}

public
IEnumerator
<Task>
GetEnumerator()

{

return
new
SchedulerEnumerator(
this);

}

IEnumerator
IEnumerable
.GetEnumerator()

{

return
new
SchedulerEnumerator(
this);

}

public
int
Count

{

get
{
return
_queue.
Count; }

}

public
Task
this[
int
index]

{

get
{
return
(Task)
_queue[index]; }

set
{
_queue
[index] =
value; }

}

}


4.Dynamic LIFO

public
class
DynamicStackScheduler
:
IScheduler

{

protected
List
<Task>
_queue;

public
DynamicStackScheduler
()

{

_queue
=
new
List<
Task>();

}

public
void
Add(
Task
t
)

{

Task
oldTask
=
null;

if
(Contains
(t,
out
oldTask
))

{

_queue.Remove
(oldTask);

}

_queue.Insert
(0,t);

}

public
void
Remove(
Task
t
)

{

_queue.Remove
(t);

}

public
bool
Contains(
Task
t
)

{

Task
oldTask
=
null;

bool
found
=
Contains(
t,
out
oldTask);

return
found
;

}

public
bool
Contains(
Task
t
,
out
Task
oldTask
)

{

bool
found
=
false;

oldTask
=
null
;

foreach
(var
task
in
_queue
)

{

if
(t
.AsyncState
!=
null
&&
t
.AsyncState.
Equals(task
.AsyncState))

{

oldTask
=
task
;

found
=
true
;

break;

}

}

return
found
;

}

public
bool
Contains(
Task
t
,
EqualityComparer<
Task>
comp
)

{

throw
new
NotImplementedException();

}

public
IEnumerator
<Task>
GetEnumerator()

{

return
new
SchedulerEnumerator(
this);

}

IEnumerator
IEnumerable
.GetEnumerator()

{

return
new
SchedulerEnumerator(
this);

}

public
int
Count

{

get
{
return
_queue.
Count; }

}

public
Task
this[
int
index]

{

get
{
return
(Task)
_queue[index]; }

set
{
_queue
[index] =
value; }

}

}

测试代码

class
Program

{

static
Queue
<int>
_queue
=
new
Queue<
int>();

//static TaskFactory _factory = new TaskFactory(new TaskSchedulerBase<QueueScheduler>());

//static TaskFactory _factory = new TaskFactory(new TaskSchedulerBase<StackScheduler>());

//static TaskFactory _factory = new TaskFactory(new TaskSchedulerBase<DynamicQueueScheduler>());

//static TaskFactory _factory = new TaskFactory(new TaskSchedulerBase<DynamicStackScheduler>());

static
TaskFactory
_factory
=
new
TaskFactory
(new
TaskSchedulerBase<DynamicQueueScheduler
>());

static
void
Main(
string[]
args
)

{

var
thread1
=
new
Thread(Producer
);

var
thread2
=
new
Thread(Consumer
);

thread1.Start
();

thread2.Start
();

Console.ReadKey
();

}

static
void
Producer()

{

for
(int
i
= 0;
i
< 7;
i
++)

{

_queue.Enqueue
(i);

}

_queue.Enqueue
(3);

_queue.Enqueue
(2);

}

static
void
Consumer()

{

while
(true
)

{

if
(_queue
.Count
> 0)

{

foreach
(var
i
in
_queue
)

{

_factory.StartNew
((s) =>

{

Console.Write
("{0}
on thread
{1}
{2}\n",
s, Thread.CurrentThread
.ManagedThreadId,

DateTime.Now
.ToLongTimeString());

},
i);

}

_queue.Clear
();

}

else

{

Thread.Sleep
(1);

}

}

}

}

利用TaskScheduler处理Queue、Stack等类型的操作队列(生产者消费者场景)

时间: 2024-10-03 22:07:03

利用TaskScheduler处理Queue、Stack等类型的操作队列(生产者消费者场景)的相关文章

13 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件  queue队列 生产者消费者模型 Queue队列 开发一个线程池

本节内容 操作系统发展史介绍 进程.与线程区别 python GIL全局解释器锁 线程 语法 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件 queue队列 生产者消费者模型 Queue队列 开发一个线程池 进程 语法 进程间通讯 进程池 操作系统发展史 手工操作(无操作系统) 1946年第一台计算机诞生--20世纪50年代中期,还未出现操作系统,计算机工作采用手工操作方式. 手工操作程序员将对应于程序和数据的已穿孔的纸带(或卡片)装入输入机,然后启动输入机把

#queue队列 #生产者消费者模型

1 #queue队列 #生产者消费者模型 2 3 #queue队列 #有顺序的容器 4 #程序解耦 5 #提高运行效率 6 7 #class queue.Queue(maxsize=0) #先入先出 8 #class queue.LifoQueue(maxsize=0)最后在第一 9 #class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列#VIP客户 10 11 #Queue.qsize() 12 #Queue.empty() #return

python2.0_s12_day9之day8遗留知识(queue队列&amp;生产者消费者模型)

4.线程 1.语法 2.join 3.线程锁之Lock\Rlock\信号量 4.将线程变为守护进程 5.Event事件 * 6.queue队列 * 7.生产者消费者模型 4.6 queue队列 queue非常有用,当信息必须安全的在多个线程之间进行数据交换的时候就应该想到queue 所以,queue它能保证数据被安全的在多个线程之间进行交换,那他就是天生的线程安全. queue有那么几种: class queue.Queue(maxsize=0) # 先入先出 class queue.LifoQ

Java多线程15:Queue、BlockingQueue以及利用BlockingQueue实现生产者/消费者模型

Queue是什么 队列,是一种数据结构.除了优先级队列和LIFO队列外,队列都是以FIFO(先进先出)的方式对各个元素进行排序的.无论使用哪种排序方式,队列的头都是调用remove()或poll()移除元素的.在FIFO队列中,所有新元素都插入队列的末尾. Queue中的方法 Queue中的方法不难理解,6个,每2对是一个也就是总共3对.看一下JDK API就知道了: 注意一点就好,Queue通常不允许插入Null,尽管某些实现(比如LinkedList)是允许的,但是也不建议. Blockin

【[email&#160;protected]】queue模块-生产者消费者问题

python通过queue模块来提供线程间的通信机制,从而可以让线程分项数据. 个人感觉queue就是管程的概念 一个生产者消费者问题 1 from random import randint 2 from threading import Thread 3 from queue import Queue 4 from time import sleep 5 6 7 def writeq(queue): 8 print('starting put queue...') 9 queue.put('

Python学习笔记——进阶篇【第九周】———线程、进程、协程篇(队列Queue和生产者消费者模型)

Python之路,进程.线程.协程篇 本节内容 进程.与线程区别 cpu运行原理 python GIL全局解释器锁 线程 语法 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件 queue队列 生产者消费者模型 Queue队列 开发一个线程池 进程 语法 进程间通讯 进程池 参考链接http://www.cnblogs.com/alex3714/articles/5230609.html

python多线程编程-queue模块和生产者-消费者问题

摘录python核心编程 本例中演示生产者-消费者模型:商品或服务的生产者生产商品,然后将其放到类似队列的数据结构中.生产商品中的时间是不确定的,同样消费者消费商品的时间也是不确定的. 使用queue模块(python2.x版本中,叫Queue)来提供线程间通信的机制,从而让线程之间可以分享数据.具体而言,就是创建一个队列,让生产者(线程)在其中放入新的商品,而消费者(线程)消费这些商品. 下表是queue模块的部分属性: 属性 描述 queue模块的类 Queue(maxsize=0) 创建一

stl容器学习——queue,stack,list与string

目录 string stack queue list 点击上面的内容可以实现跳转哦 简介:本文记录了对string list queue stack四个容器的学习总结,包含有四种容器常用的函数介绍和一些使用过程中碰到的细节总结,在list容器中介绍了迭代器的使用. 头文件 想要用哪一种容器,就要加上对应的头文件 比如想要使用queue , 就要加上 #include<queue> 以此类推,想要使用STL库中的容器,只要加上它们的头文件就好. string 目录部分 1.string的定义及初

STL中deque,queue,stack,list的学习

(一):要点 1:容器deque的使用方法 2:容器queue,stack的使用方法 3:容器list的使用方法 (二)deque 1:deque简介 deque是"double-ended queue"的缩写,deque是双端的,vector是单端的. deque在接口上和vector相似,在许多操作的地方可以直接替换 deque可以随机存取元素,支持索引值直接存取,使用[]或者是at()方法 deque的头部和尾部添加和移除元素都非常快速,但是在中部插入元素或移除元素 比较费时.