我们经常会遇到生产者消费者模式,比如前端各种UI操作事件触发后台逻辑等。在这种典型的应用场景中,我们可能会有4个业务处理逻辑(下文以P代表生产者,C代表消费者):
1. FIFO(先进先出)
P产生1,2,3,4,5,6,3,2
C处理顺序应为1,2,3,4,5,6,3,2
2.LIFO(后进先出)
P产生1,2,3,4,5,6,3,2
C处理顺序应为2,3,6,5,4,3,2,1
3.Dynamic FIFO(我定义为:去掉相同数据的FIFO, 如果产生的数据队列里已经有相同数据,后进的数据优先级高)
P产生1,2,3,4,5,6,3,2
C处理顺序为1,4,5,6,3,2
4.Dynamic LIFO(我定义为:去掉相同数据的LIFO, 如果产生的数据栈里已经有相同数据,后进的数据优先级高)
P产生1,2,3,4,5,6,3,2
C处理顺序为2,3,6,5,4,1
1,2情况为基本处理逻辑,3,4可能和我们实际场景有关系(包括:判断相同的逻辑可能不同、已存在和后续数据哪个优先级高)
C#中有个Task类进行异步操作,我们可以通过TaskScheduler类进行任务调度,实现上述的4种基本场景。
定义上述4种场景的通用接口以及其遍历类
public interface IScheduler : IEnumerable<Task >
{
void Add (Task t);
void Remove (Task t);
int Count { get; }
Task this [int index] { get; set ; }
}
public class SchedulerEnumerator : IEnumerator< Task>
{
private IScheduler _collection;
private int _currentIndex;
private Task _currentTask;
public SchedulerEnumerator (IScheduler collection)
{
_collection = collection ;
_currentIndex = -1;
_currentTask = default (Task);
}
public bool MoveNext()
{
//Avoids going beyond the end of the collection.
if (++_currentIndex >= _collection. Count)
{
return false ;
}
else
{
// Set current box to next item in collection.
_currentTask = _collection [_currentIndex];
}
return true ;
}
public void Reset() { _currentIndex = -1; }
void IDisposable .Dispose() { }
public Task Current
{
get { return _currentTask; }
}
object IEnumerator .Current
{
get { return Current; }
}
}
实现我们自己的任务调度类模板,可以通过T传递我们想要的队列类型
public class TaskSchedulerBase <T> : TaskScheduler
where T : IScheduler , new ()
{
private Thread _processThread;
private readonly object _lock = new object ();
public TaskSchedulerBase()
{
_processThread = new Thread (this.Process);
}
private void Process()
{
lock (_lock)
{
var tasks = GetScheduledTasks();
if (null != tasks)
{
foreach (var t in tasks)
{
TryExecuteTask(t);
TryDequeue(t);
}
}
}
}
protected override void QueueTask( Task task)
{
lock (_lock)
{
Scheduler.Add(task);
if (_processThread.ThreadState.Equals(ThreadState .Stopped))
{
_processThread = new Thread (Process);
}
if (!_processThread.IsAlive
&& !_processThread.ThreadState.Equals( ThreadState.Running))
{
try
{
_processThread.Start();
}
catch (System.Exception )
{
if (!_processThread.ThreadState.Equals(ThreadState .Running))
{
_processThread = new Thread (Process);
_processThread.Start();
}
}
}
}
}
protected override bool TryDequeue( Task task)
{
Scheduler.Remove(task);
return true ;
}
protected override IEnumerable< Task> GetScheduledTasks()
{
return Scheduler.ToArray();
}
protected override bool TryExecuteTaskInline( Task task, bool taskWasPreviouslyQueued)
{
if (taskWasPreviouslyQueued)
{
if (TryDequeue(task))
{
return base .TryExecuteTask(task);
}
else
{
return false ;
}
}
else
{
return base .TryExecuteTask(task);
}
}
private readonly T _scheduler = new T();
public T Scheduler
{
get
{
return _scheduler;
}
}
}
实现4种队列
1.FIFO
public class QueueScheduler : IScheduler
{
protected Queue <Task> _queue;
public QueueScheduler ()
{
_queue = new Queue< Task>();
}
public void Add( Task t )
{
if (!Contains (t))
{
_queue.Enqueue (t);
}
}
public void Remove( Task t )
{
_queue.Dequeue ();
}
public bool Contains( Task t )
{
bool found = false;
foreach (var task in _queue )
{
if (t .AsyncState != null && t .AsyncState. Equals(task .AsyncState))
{
found = true ;
break;
}
}
return found ;
}
public bool Contains( Task t , EqualityComparer< Task> comp )
{
throw new NotImplementedException();
}
public IEnumerator <Task> GetEnumerator()
{
return new SchedulerEnumerator( this);
}
IEnumerator IEnumerable .GetEnumerator()
{
return new SchedulerEnumerator( this);
}
public int Count
{
get { return _queue. Count; }
}
public Task this[ int index]
{
get { return (Task) _queue.ToArray ()[index]; }
set { _queue .ToArray()[index] = value; }
}
}
2.LIFO
public class StackScheduler : IScheduler
{
protected Stack <Task> _stack;
public StackScheduler ()
{
_stack = new Stack< Task>();
}
public void Add( Task t )
{
if (!Contains (t))
{
_stack.Push (t);
}
}
public void Remove( Task t )
{
_stack.Pop ();
}
public bool Contains( Task t )
{
bool found = false;
foreach (var task in _stack )
{
if (t .AsyncState != null && t .AsyncState. Equals(task .AsyncState))
{
found = true ;
break;
}
}
return found ;
}
public bool Contains( Task t , EqualityComparer< Task> comp )
{
throw new NotImplementedException();
}
public IEnumerator <Task> GetEnumerator()
{
return new SchedulerEnumerator( this);
}
IEnumerator IEnumerable .GetEnumerator()
{
return new SchedulerEnumerator( this);
}
public int Count
{
get { return _stack. Count; }
}
public Task this[ int index]
{
get { return (Task) _stack.ToArray ()[index]; }
set { _stack .ToArray()[index] = value; }
}
}
3.Dynamic FIFO
public class DynamicQueueScheduler : IScheduler
{
protected List <Task> _queue;
public DynamicQueueScheduler ()
{
_queue = new List< Task>();
}
public virtual void Add(Task t)
{
Task oldTask = null;
if (Contains (t, out oldTask ))
{
_queue.Remove (oldTask);
}
_queue.Add (t);
}
public virtual void Remove(Task t)
{
_queue.Remove (t);
}
public virtual bool Contains(Task t)
{
Task oldTask = null;
bool found = Contains( t, out oldTask);
return found ;
}
public virtual bool Contains(Task t, out Task oldTask)
{
bool found = false;
oldTask = null ;
foreach (var task in _queue )
{
if (t .AsyncState != null && t .AsyncState. Equals(task .AsyncState))
{
oldTask = task ;
found = true ;
break;
}
}
return found ;
}
public virtual bool Contains(Task t, EqualityComparer<Task > comp)
{
throw new NotImplementedException();
}
public IEnumerator <Task> GetEnumerator()
{
return new SchedulerEnumerator( this);
}
IEnumerator IEnumerable .GetEnumerator()
{
return new SchedulerEnumerator( this);
}
public int Count
{
get { return _queue. Count; }
}
public Task this[ int index]
{
get { return (Task) _queue[index]; }
set { _queue [index] = value; }
}
}
4.Dynamic LIFO
public class DynamicStackScheduler : IScheduler
{
protected List <Task> _queue;
public DynamicStackScheduler ()
{
_queue = new List< Task>();
}
public void Add( Task t )
{
Task oldTask = null;
if (Contains (t, out oldTask ))
{
_queue.Remove (oldTask);
}
_queue.Insert (0,t);
}
public void Remove( Task t )
{
_queue.Remove (t);
}
public bool Contains( Task t )
{
Task oldTask = null;
bool found = Contains( t, out oldTask);
return found ;
}
public bool Contains( Task t , out Task oldTask )
{
bool found = false;
oldTask = null ;
foreach (var task in _queue )
{
if (t .AsyncState != null && t .AsyncState. Equals(task .AsyncState))
{
oldTask = task ;
found = true ;
break;
}
}
return found ;
}
public bool Contains( Task t , EqualityComparer< Task> comp )
{
throw new NotImplementedException();
}
public IEnumerator <Task> GetEnumerator()
{
return new SchedulerEnumerator( this);
}
IEnumerator IEnumerable .GetEnumerator()
{
return new SchedulerEnumerator( this);
}
public int Count
{
get { return _queue. Count; }
}
public Task this[ int index]
{
get { return (Task) _queue[index]; }
set { _queue [index] = value; }
}
}
测试代码
class Program
{
static Queue <int> _queue = new Queue< int>();
//static TaskFactory _factory = new TaskFactory(new TaskSchedulerBase<QueueScheduler>());
//static TaskFactory _factory = new TaskFactory(new TaskSchedulerBase<StackScheduler>());
//static TaskFactory _factory = new TaskFactory(new TaskSchedulerBase<DynamicQueueScheduler>());
//static TaskFactory _factory = new TaskFactory(new TaskSchedulerBase<DynamicStackScheduler>());
static TaskFactory _factory = new TaskFactory (new TaskSchedulerBase<DynamicQueueScheduler >());
static void Main( string[] args )
{
var thread1 = new Thread(Producer );
var thread2 = new Thread(Consumer );
thread1.Start ();
thread2.Start ();
Console.ReadKey ();
}
static void Producer()
{
for (int i = 0; i < 7; i ++)
{
_queue.Enqueue (i);
}
_queue.Enqueue (3);
_queue.Enqueue (2);
}
static void Consumer()
{
while (true )
{
if (_queue .Count > 0)
{
foreach (var i in _queue )
{
_factory.StartNew ((s) =>
{
Console.Write ("{0} on thread {1} {2}\n", s,Thread.CurrentThread .ManagedThreadId,
DateTime.Now.ToLongTimeString());
}, i);
}
_queue.Clear ();
}
else
{
Thread.Sleep (1);
}
}
}
}