我们经常会遇到生产者消费者模式,比如前端各种UI操作事件触发后台逻辑等。在这种典型的应用场景中,我们可能会有4个业务处理逻辑(下文以P代表生产者,C代表消费者):
1. FIFO(先进先出)
P产生1,2,3,4,5,6,3,2
C处理顺序应为1,2,3,4,5,6,3,2
2.LIFO(后进先出)
P产生1,2,3,4,5,6,3,2
C处理顺序应为2,3,6,5,4,3,2,1
3.Dynamic FIFO(我定义为:去掉相同数据的FIFO, 如果产生的数据队列里已经有相同数据,后进的数据优先级高)
P产生1,2,3,4,5,6,3,2
C处理顺序为1,4,5,6,3,2
4.Dynamic LIFO(我定义为:去掉相同数据的LIFO, 如果产生的数据栈里已经有相同数据,后进的数据优先级高)
P产生1,2,3,4,5,6,3,2
C处理顺序为2,3,6,5,4,1
1,2情况为基本处理逻辑,3,4可能和我们实际场景有关系(包括:判断相同的逻辑可能不同、已存在和后续数据哪个优先级高)
C#中有个Task类进行异步操作,我们可以通过TaskScheduler类进行任务调度,实现上述的4种基本场景。
定义上述4种场景的通用接口以及其遍历类
public
interface
IScheduler
:
IEnumerable<Task
>
{
void
Add
(Task
t);
void
Remove
(Task
t);
int
Count
{
get; }
Task
this
[int
index] {
get;
set
; }
}
public
class
SchedulerEnumerator
:
IEnumerator<
Task>
{
private
IScheduler
_collection;
private
int
_currentIndex;
private
Task
_currentTask;
public
SchedulerEnumerator
(IScheduler
collection)
{
_collection
=
collection
;
_currentIndex
= -1;
_currentTask
=
default
(Task);
}
public
bool
MoveNext()
{
//Avoids going beyond the end of the collection.
if
(++_currentIndex
>=
_collection.
Count)
{
return
false
;
}
else
{
// Set current box to next item in collection.
_currentTask
=
_collection
[_currentIndex];
}
return
true
;
}
public
void
Reset() {
_currentIndex
= -1; }
void
IDisposable
.Dispose()
{ }
public
Task
Current
{
get
{
return
_currentTask; }
}
object
IEnumerator
.Current
{
get
{
return
Current; }
}
}
实现我们自己的任务调度类模板,可以通过T传递我们想要的队列类型
public
class
TaskSchedulerBase
<T> :
TaskScheduler
where
T :
IScheduler
,
new
()
{
private
Thread
_processThread;
private
readonly
object
_lock =
new
object
();
public
TaskSchedulerBase()
{
_processThread =
new
Thread
(this.Process);
}
private
void
Process()
{
lock
(_lock)
{
var
tasks = GetScheduledTasks();
if
(null
!= tasks)
{
foreach
(var
t
in
tasks)
{
TryExecuteTask(t);
TryDequeue(t);
}
}
}
}
protected
override
void
QueueTask(
Task
task)
{
lock
(_lock)
{
Scheduler.Add(task);
if
(_processThread.ThreadState.Equals(ThreadState
.Stopped))
{
_processThread =
new
Thread
(Process);
}
if
(!_processThread.IsAlive
&& !_processThread.ThreadState.Equals(
ThreadState.Running))
{
try
{
_processThread.Start();
}
catch
(System.Exception
)
{
if
(!_processThread.ThreadState.Equals(ThreadState
.Running))
{
_processThread =
new
Thread
(Process);
_processThread.Start();
}
}
}
}
}
protected
override
bool
TryDequeue(
Task
task)
{
Scheduler.Remove(task);
return
true
;
}
protected
override
IEnumerable<
Task> GetScheduledTasks()
{
return
Scheduler.ToArray();
}
protected
override
bool
TryExecuteTaskInline(
Task
task,
bool
taskWasPreviouslyQueued)
{
if
(taskWasPreviouslyQueued)
{
if
(TryDequeue(task))
{
return
base
.TryExecuteTask(task);
}
else
{
return
false
;
}
}
else
{
return
base
.TryExecuteTask(task);
}
}
private
readonly
T _scheduler =
new
T();
public
T Scheduler
{
get
{
return
_scheduler;
}
}
}
实现4种队列
1.FIFO
public
class
QueueScheduler
:
IScheduler
{
protected
Queue
<Task>
_queue;
public
QueueScheduler
()
{
_queue
=
new
Queue<
Task>();
}
public
void
Add(
Task
t
)
{
if
(!Contains
(t))
{
_queue.Enqueue
(t);
}
}
public
void
Remove(
Task
t
)
{
_queue.Dequeue
();
}
public
bool
Contains(
Task
t
)
{
bool
found
=
false;
foreach
(var
task
in
_queue
)
{
if
(t
.AsyncState
!=
null
&&
t
.AsyncState.
Equals(task
.AsyncState))
{
found
=
true
;
break;
}
}
return
found
;
}
public
bool
Contains(
Task
t
,
EqualityComparer<
Task>
comp
)
{
throw
new
NotImplementedException();
}
public
IEnumerator
<Task>
GetEnumerator()
{
return
new
SchedulerEnumerator(
this);
}
IEnumerator
IEnumerable
.GetEnumerator()
{
return
new
SchedulerEnumerator(
this);
}
public
int
Count
{
get
{
return
_queue.
Count; }
}
public
Task
this[
int
index]
{
get
{
return
(Task)
_queue.ToArray
()[index]; }
set
{
_queue
.ToArray()[index]
= value; }
}
}
2.LIFO
public
class
StackScheduler
:
IScheduler
{
protected
Stack
<Task>
_stack;
public
StackScheduler
()
{
_stack
=
new
Stack<
Task>();
}
public
void
Add(
Task
t
)
{
if
(!Contains
(t))
{
_stack.Push
(t);
}
}
public
void
Remove(
Task
t
)
{
_stack.Pop
();
}
public
bool
Contains(
Task
t
)
{
bool
found
=
false;
foreach
(var
task
in
_stack
)
{
if
(t
.AsyncState
!=
null
&&
t
.AsyncState.
Equals(task
.AsyncState))
{
found
=
true
;
break;
}
}
return
found
;
}
public
bool
Contains(
Task
t
,
EqualityComparer<
Task>
comp
)
{
throw
new
NotImplementedException();
}
public
IEnumerator
<Task>
GetEnumerator()
{
return
new
SchedulerEnumerator(
this);
}
IEnumerator
IEnumerable
.GetEnumerator()
{
return
new
SchedulerEnumerator(
this);
}
public
int
Count
{
get
{
return
_stack.
Count; }
}
public
Task
this[
int
index]
{
get
{
return
(Task)
_stack.ToArray
()[index]; }
set
{
_stack
.ToArray()[index]
= value; }
}
}
3.Dynamic FIFO
public
class
DynamicQueueScheduler
:
IScheduler
{
protected
List
<Task>
_queue;
public
DynamicQueueScheduler
()
{
_queue
=
new
List<
Task>();
}
public
virtual
void
Add(Task
t)
{
Task
oldTask
=
null;
if
(Contains
(t,
out
oldTask
))
{
_queue.Remove
(oldTask);
}
_queue.Add
(t);
}
public
virtual
void
Remove(Task
t)
{
_queue.Remove
(t);
}
public
virtual
bool
Contains(Task
t)
{
Task
oldTask
=
null;
bool
found
=
Contains(
t,
out
oldTask);
return
found
;
}
public
virtual
bool
Contains(Task
t,
out
Task
oldTask)
{
bool
found
=
false;
oldTask
=
null
;
foreach
(var
task
in
_queue
)
{
if
(t
.AsyncState
!=
null
&&
t
.AsyncState.
Equals(task
.AsyncState))
{
oldTask
=
task
;
found
=
true
;
break;
}
}
return
found
;
}
public
virtual
bool
Contains(Task
t,
EqualityComparer<Task
>
comp)
{
throw
new
NotImplementedException();
}
public
IEnumerator
<Task>
GetEnumerator()
{
return
new
SchedulerEnumerator(
this);
}
IEnumerator
IEnumerable
.GetEnumerator()
{
return
new
SchedulerEnumerator(
this);
}
public
int
Count
{
get
{
return
_queue.
Count; }
}
public
Task
this[
int
index]
{
get
{
return
(Task)
_queue[index]; }
set
{
_queue
[index] =
value; }
}
}
4.Dynamic LIFO
public
class
DynamicStackScheduler
:
IScheduler
{
protected
List
<Task>
_queue;
public
DynamicStackScheduler
()
{
_queue
=
new
List<
Task>();
}
public
void
Add(
Task
t
)
{
Task
oldTask
=
null;
if
(Contains
(t,
out
oldTask
))
{
_queue.Remove
(oldTask);
}
_queue.Insert
(0,t);
}
public
void
Remove(
Task
t
)
{
_queue.Remove
(t);
}
public
bool
Contains(
Task
t
)
{
Task
oldTask
=
null;
bool
found
=
Contains(
t,
out
oldTask);
return
found
;
}
public
bool
Contains(
Task
t
,
out
Task
oldTask
)
{
bool
found
=
false;
oldTask
=
null
;
foreach
(var
task
in
_queue
)
{
if
(t
.AsyncState
!=
null
&&
t
.AsyncState.
Equals(task
.AsyncState))
{
oldTask
=
task
;
found
=
true
;
break;
}
}
return
found
;
}
public
bool
Contains(
Task
t
,
EqualityComparer<
Task>
comp
)
{
throw
new
NotImplementedException();
}
public
IEnumerator
<Task>
GetEnumerator()
{
return
new
SchedulerEnumerator(
this);
}
IEnumerator
IEnumerable
.GetEnumerator()
{
return
new
SchedulerEnumerator(
this);
}
public
int
Count
{
get
{
return
_queue.
Count; }
}
public
Task
this[
int
index]
{
get
{
return
(Task)
_queue[index]; }
set
{
_queue
[index] =
value; }
}
}
测试代码
class
Program
{
static
Queue
<int>
_queue
=
new
Queue<
int>();
//static TaskFactory _factory = new TaskFactory(new TaskSchedulerBase<QueueScheduler>());
//static TaskFactory _factory = new TaskFactory(new TaskSchedulerBase<StackScheduler>());
//static TaskFactory _factory = new TaskFactory(new TaskSchedulerBase<DynamicQueueScheduler>());
//static TaskFactory _factory = new TaskFactory(new TaskSchedulerBase<DynamicStackScheduler>());
static
TaskFactory
_factory
=
new
TaskFactory
(new
TaskSchedulerBase<DynamicQueueScheduler
>());
static
void
Main(
string[]
args
)
{
var
thread1
=
new
Thread(Producer
);
var
thread2
=
new
Thread(Consumer
);
thread1.Start
();
thread2.Start
();
Console.ReadKey
();
}
static
void
Producer()
{
for
(int
i
= 0;
i
< 7;
i
++)
{
_queue.Enqueue
(i);
}
_queue.Enqueue
(3);
_queue.Enqueue
(2);
}
static
void
Consumer()
{
while
(true
)
{
if
(_queue
.Count
> 0)
{
foreach
(var
i
in
_queue
)
{
_factory.StartNew
((s) =>
{
Console.Write
("{0}
on thread
{1}
{2}\n",
s, Thread.CurrentThread
.ManagedThreadId,
DateTime.Now
.ToLongTimeString());
},
i);
}
_queue.Clear
();
}
else
{
Thread.Sleep
(1);
}
}
}
}
利用TaskScheduler处理Queue、Stack等类型的操作队列(生产者消费者场景)