转:Task任务调度实现生产者消费者模式

我们经常会遇到生产者消费者模式,比如前端各种UI操作事件触发后台逻辑等。在这种典型的应用场景中,我们可能会有4个业务处理逻辑(下文以P代表生产者,C代表消费者):

1. FIFO(先进先出)

     P产生1,2,3,4,5,6,3,2

     C处理顺序应为1,2,3,4,5,6,3,2

2.LIFO(后进先出)

     P产生1,2,3,4,5,6,3,2

     C处理顺序应为2,3,6,5,4,3,2,1

3.Dynamic FIFO(我定义为:去掉相同数据的FIFO, 如果产生的数据队列里已经有相同数据,后进的数据优先级高)

     P产生1,2,3,4,5,6,3,2

     C处理顺序为1,4,5,6,3,2

4.Dynamic LIFO(我定义为:去掉相同数据的LIFO, 如果产生的数据栈里已经有相同数据,后进的数据优先级高)

     P产生1,2,3,4,5,6,3,2

     C处理顺序为2,3,6,5,4,1

1,2情况为基本处理逻辑,3,4可能和我们实际场景有关系(包括:判断相同的逻辑可能不同、已存在和后续数据哪个优先级高)

C#中有个Task类进行异步操作,我们可以通过TaskScheduler类进行任务调度,实现上述的4种基本场景。

定义上述4种场景的通用接口以及其遍历类

public interface IScheduler : IEnumerable<Task >

    {

void Add (Task t);

void Remove (Task t);

int Count { get; }

Task this [int index] { get; set ; }

    }

public class SchedulerEnumerator : IEnumerator< Task>

    {

private IScheduler _collection;

private int _currentIndex;

private Task _currentTask;

public SchedulerEnumerator (IScheduler collection)

        {

_collection = collection ;

_currentIndex = -1;

_currentTask = default (Task);

        }

public bool MoveNext()

        {

//Avoids going beyond the end of the collection.

if (++_currentIndex >= _collection. Count)

            {

return false ;

            }

else

            {

// Set current box to next item in collection.

_currentTask = _collection [_currentIndex];

            }

return true ;

        }

public void Reset() { _currentIndex = -1; }

void IDisposable .Dispose() { }

public Task Current

        {

get { return _currentTask; }

        }

object IEnumerator .Current

        {

get { return Current; }

        }

    }

实现我们自己的任务调度类模板,可以通过T传递我们想要的队列类型

public class TaskSchedulerBase <T> : TaskScheduler

where T : IScheduler , new ()

    {

private Thread _processThread;

private readonly object _lock = new object ();

public TaskSchedulerBase()

        {

            _processThread = new Thread (this.Process);

        }

private void Process()

        {

lock (_lock)

            {

var tasks = GetScheduledTasks();

if (null != tasks)

                {

foreach (var t in tasks)

                    {

                        TryExecuteTask(t);

                        TryDequeue(t);

                    }

                }

            }

        }

protected override void QueueTask( Task task)

        {

lock (_lock)

            {

                Scheduler.Add(task);

if (_processThread.ThreadState.Equals(ThreadState .Stopped))

                {

                    _processThread = new Thread (Process);

                }

if (!_processThread.IsAlive

                    && !_processThread.ThreadState.Equals( ThreadState.Running))

                {

try

                    {

                        _processThread.Start();

                    }

catch (System.Exception )

                    {

if (!_processThread.ThreadState.Equals(ThreadState .Running))

                        {

                            _processThread = new Thread (Process);

                            _processThread.Start();

                        }

                    }

                }

            }

        }

protected override bool TryDequeue( Task task)

        {

            Scheduler.Remove(task);

return true ;

        }

protected override IEnumerable< Task> GetScheduledTasks()

        {

return Scheduler.ToArray();

        }

protected override bool TryExecuteTaskInline( Task task, bool taskWasPreviouslyQueued)

        {

if (taskWasPreviouslyQueued)

            {

if (TryDequeue(task))

                {

return base .TryExecuteTask(task);

                }

else

                {

return false ;

                }

            }

else

            {

return base .TryExecuteTask(task);

            }

        }

private readonly T _scheduler = new T();

public T Scheduler

        {

get

            {

return _scheduler;

            }

        }

    }


实现4种队列

     1.FIFO

public class QueueScheduler : IScheduler

    {

protected Queue <Task> _queue;

public QueueScheduler ()

        {

_queue = new Queue< Task>();

        }

public void Add( Task t )

        {

if (!Contains (t))

            {

_queue.Enqueue (t);

            }

        }

public void Remove( Task t )

        {

_queue.Dequeue ();

        }

public bool Contains( Task t )

        {

bool found = false;

foreach (var task in _queue )

            {

if (t .AsyncState != null && t .AsyncState. Equals(task .AsyncState))

                {

found = true ;

break;

                }

            }

return found ;

        }

public bool Contains( Task t , EqualityComparer< Task> comp )

        {

throw new NotImplementedException();

        }

public IEnumerator <Task> GetEnumerator()

        {

return new SchedulerEnumerator( this);

        }

IEnumerator IEnumerable .GetEnumerator()

        {

return new SchedulerEnumerator( this);

        }

public int Count

        {

get { return _queue. Count; }

        }

public Task this[ int index]

        {

get { return (Task) _queue.ToArray ()[index]; }

set { _queue .ToArray()[index] = value; }

        }

    }



     2.LIFO

public class StackScheduler : IScheduler

    {

protected Stack <Task> _stack;

public StackScheduler ()

        {

_stack = new Stack< Task>();

        }

public void Add( Task t )

        {

if (!Contains (t))

            {

_stack.Push (t);

            }

        }

public void Remove( Task t )

        {

_stack.Pop ();

        }

public bool Contains( Task t )

        {

bool found = false;

foreach (var task in _stack )

            {

if (t .AsyncState != null && t .AsyncState. Equals(task .AsyncState))

                {

found = true ;

break;

                }

            }

return found ;

        }

public bool Contains( Task t , EqualityComparer< Task> comp )

        {

throw new NotImplementedException();

        }

public IEnumerator <Task> GetEnumerator()

        {

return new SchedulerEnumerator( this);

        }

IEnumerator IEnumerable .GetEnumerator()

        {

return new SchedulerEnumerator( this);

        }

public int Count

        {

get { return _stack. Count; }

        }

public Task this[ int index]

        {

get { return (Task) _stack.ToArray ()[index]; }

set { _stack .ToArray()[index] = value; }

        }

    }



     3.Dynamic FIFO

public class DynamicQueueScheduler : IScheduler

    {

protected List <Task> _queue;

public DynamicQueueScheduler ()

        {

_queue = new List< Task>();

        }

public virtual void Add(Task t)

        {

Task oldTask = null;

if (Contains (t, out oldTask ))

            {

_queue.Remove (oldTask);

            }

_queue.Add (t);

        }

public virtual void Remove(Task t)

        {

_queue.Remove (t);

        }

public virtual bool Contains(Task t)

        {

Task oldTask = null;

bool found = Contains( t, out oldTask);

return found ;

        }

public virtual bool Contains(Task t, out Task oldTask)

        {

bool found = false;

oldTask = null ;

foreach (var task in _queue )

            {

if (t .AsyncState != null && t .AsyncState. Equals(task .AsyncState))

                {

oldTask = task ;

found = true ;

break;

                }

            }

return found ;

        }

public virtual bool Contains(Task t, EqualityComparer<Task > comp)

        {

throw new NotImplementedException();

        }

public IEnumerator <Task> GetEnumerator()

        {

return new SchedulerEnumerator( this);

        }

IEnumerator IEnumerable .GetEnumerator()

        {

return new SchedulerEnumerator( this);

        }

public int Count

        {

get { return _queue. Count; }

        }

public Task this[ int index]

        {

get { return (Task) _queue[index]; }

set { _queue [index] = value; }

        }

    }



     4.Dynamic LIFO

public class DynamicStackScheduler : IScheduler

    {

protected List <Task> _queue;

public DynamicStackScheduler ()

        {

_queue = new List< Task>();

        }

public void Add( Task t )

        {

Task oldTask = null;

if (Contains (t, out oldTask ))

            {

_queue.Remove (oldTask);

            }

_queue.Insert (0,t);

        }

public void Remove( Task t )

        {

_queue.Remove (t);

        }

public bool Contains( Task t )

        {

Task oldTask = null;

bool found = Contains( t, out oldTask);

return found ;

        }

public bool Contains( Task t , out Task oldTask )

        {

bool found = false;

oldTask = null ;

foreach (var task in _queue )

            {

if (t .AsyncState != null && t .AsyncState. Equals(task .AsyncState))

                {

oldTask = task ;

found = true ;

break;

                }

            }

return found ;

        }

public bool Contains( Task t , EqualityComparer< Task> comp )

        {

throw new NotImplementedException();

        }

public IEnumerator <Task> GetEnumerator()

        {

return new SchedulerEnumerator( this);

        }

IEnumerator IEnumerable .GetEnumerator()

        {

return new SchedulerEnumerator( this);

        }

public int Count

        {

get { return _queue. Count; }

        }

public Task this[ int index]

        {

get { return (Task) _queue[index]; }

set { _queue [index] = value; }

        }

    }


测试代码

class Program

    {

static Queue <int> _queue = new Queue< int>();

//static TaskFactory _factory = new TaskFactory(new TaskSchedulerBase<QueueScheduler>());

//static TaskFactory _factory = new TaskFactory(new TaskSchedulerBase<StackScheduler>());

//static TaskFactory _factory = new TaskFactory(new TaskSchedulerBase<DynamicQueueScheduler>());

//static TaskFactory _factory = new TaskFactory(new TaskSchedulerBase<DynamicStackScheduler>());

static TaskFactory _factory = new TaskFactory (new TaskSchedulerBase<DynamicQueueScheduler >());

static void Main( string[] args )

        {

var thread1 = new Thread(Producer );

var thread2 = new Thread(Consumer );

thread1.Start ();

thread2.Start ();

Console.ReadKey ();

        }

static void Producer()

        {

for (int i = 0; i < 7; i ++)

            {

_queue.Enqueue (i);

            }

_queue.Enqueue (3);

_queue.Enqueue (2);

        }

static void Consumer()

        {

while (true )

            {

if (_queue .Count > 0)

                {

foreach (var i in _queue )

                    {

_factory.StartNew ((s) =>

                        {

Console.Write ("{0} on thread {1} {2}\n", s,Thread.CurrentThread .ManagedThreadId,

DateTime.Now.ToLongTimeString());

                        }, i);

                    }

_queue.Clear ();

                }

else

                {

Thread.Sleep (1);

                }

            }

        }

    }

时间: 2024-10-12 16:09:09

转:Task任务调度实现生产者消费者模式的相关文章

Java线程同步与死锁、生产者消费者模式以及任务调度等

一.Thread类基本信息方法 package Threadinfo; public class MyThread implements Runnable{ private boolean flag = true; private int num = 0; @Override public void run() { while(flag) { System.out.println(Thread.currentThread().getName()+"-->"+num++); } }

阻塞队列和生产者-消费者模式

阻塞队列提供了可阻塞的put和take方法.如果队列满了put将阻塞到有空间可用,如果队列为空,take将阻塞到有元素可用.队列可以是有界和无界的,无界的队列put将不会阻塞. 阻塞队列支持生产者消费者模式,该模式将找出需要完成的工作,和执行工作分开.生产者-消费者模式能简化开发过程,因为消除了生产者和消费者之间的代码依赖性,此外,该模式还将生产数据的过程和使用数据的过程解耦开来. 在基于阻塞队列构建的生产者-消费者设计中个,当数据生成时,生产者把数据放入队列,当消费者处理数据时,将从队列中获取

并发编程基础之生产者消费者模式

一:概念 生产者消费者模式是java并发编程中很经典的并发情况,首先有一个大的容器,生产者put元素到 容器中,消费者take元素出来,如果元素的数量超过容器的容量时,生产者不能再往容器中put元素 ,处于阻塞状态,如果元素的数量等于0,则消费者不能在从容器中take数据,处于阻塞状态. 二:示例 /** * */ package com.hlcui.main; import java.util.LinkedList; import java.util.concurrent.ExecutorSe

生产者消费者模式

什么是生产者消费者模式   在工作中,大家可能会碰到这样一种情况:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类.函数.线程.进程等).产生数据的模块,就形象地称为生产者:而处理数据的模块,就称为消费者.在生产者与消费者之间在加个缓冲区,我们形象的称之为仓库,生产者负责往仓库了进商品,而消费者负责从仓库里拿商品,这就构成了生产者消费者模式.结构图如下: 生产者消费者模式的优点 1.解耦 假设生产者和消费者分别是两个类.如果让生产者直接调用消费者的某个方法,那

使用BlockingQueue的生产者消费者模式

BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题.通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利.使用场景. 首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示: 通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出:在生产者消费者模式中,通过队列的方式可以很方便的实现两者之间的数据共享.强大的BlockingQueue使我们不用关心什么时候需要阻塞线程,什么时候需要唤醒线程. BlockingQueue的

生产者消费者模式(转)

本文转载自博文系列架构设计:生产者/消费者模式.文中对原文格式进行了稍加整理. 概述 今天打算来介绍一下“生产者/消费者模式”,这玩意儿在很多开发领域都能派上用场.由于该模式很重要,打算分几个帖子来介绍.今天这个帖子先来扫盲一把.如果你对这个模式已经比较了解,请跳过本扫盲帖,直接看下一个帖子(关于该模式的具体应用) . 看到这里,可能有同学心中犯嘀咕了:在四人帮(GOF)的23种模式里面似乎没听说过这种嘛!其实GOF那经典的23种模式主要是基于OO的(从书名<Design Patterns: E

生产者消费者模式(吃包子例子)

生产者-消费者问题是一个经典的进程同步问 题,该问题最早由Dijkstra提出,用以演示他提出的信号量机制.在同一个进程地址空间内执行的两个线程生产者线程生产物品,然后将物品放置在一个空 缓冲区中供消费者线程消费.消费者线程从缓冲区中获得物品,然后释放缓冲区.当生产者线程生产物品时,如果没有空缓冲区可用,那么生产者线程必须等待消费 者线程释放出一个空缓冲区.当消费者线程消费物品时,如果没有满的缓冲区,那么消费者线程将被阻塞,直到新的物品被生产出来. 生产者消费者模式是并发.多线程编程中经典的设计

Java 并发编程(四)阻塞队列和生产者-消费者模式

阻塞队列 阻塞队列提供了可阻塞的 put 和 take 方法,以及支持定时的 offer 和 poll 方法.如果队列已经满了,那么put方法将阻塞直到有空间可以用:如果队列为空,那么take方法将一直阻塞直到有元素可用.队列可以使有界的,也可以是无界的,无界队列永远都不会充满,因此无界队列上的put方法永远不会阻塞.一种常见的阻塞生产者-消费者模式就是线程池与工作队列的组合,在 Executor 任务执行框架中就体现了这种模式. 意义:该模式能简化开发过程,因为他消除了生产者和消费者类之间的代

关于java中生产者消费者模式的理解

在说生产者消费者模式之前,我觉得有必要理解一下 Obj.wait(),与Obj.notify()方法.wait()方法是指在持有对象锁的线程调用此方法时,会释放对象锁,同时休眠本线程.notify()方法是持有相同的对象锁来唤醒休眠的线程,使其具有抢占cpu的资格.可以理解同步方法,同步方法的对象锁就是谁调用这个方法,这个对象就是对象锁. 根据李兴华老师的视频讲解,建立一个生产者类,一个消费者类,还有一个Info类,贴上代码: 1.生产者类 package com.company; /** *