消息队列

1、为什么需要消息队列?
当系统中出现“生产“和“消费“的速度或稳定性等因素不一致的时候,就需要消息队列,作为抽象层,弥合双方的差异。

举个例子:业务系统触发短信发送申请,但短信发送模块速度跟不上,需要将来不及处理的消息暂存一下,缓冲压力。 
再举个例子:调远程系统下订单成本较高,且因为网络等因素,不稳定,攒一批一起发送。 
再举个栗子,交互模块5:00到24:00和电商系统联通,和内部ERP断开。1:00到4:00和ERP联通,和电商系统断开。 
再举个例子,服务员点菜快,厨师做菜慢。 
再举个例子,到银行办事的人多,提供服务的窗口少。 
乖乖排队吧。

2、使用消息队列有什么好处?
2.1、提高系统响应速度
使用了消息队列,生产者一方,把消息往队列里一扔,就可以立马返回,响应用户了。无需等待处理结果。

处理结果可以让用户稍后自己来取,如医院取化验单。也可以让生产者订阅(如:留下手机号码或让生产者实现listener接口、加入监听队列),有结果了通知。获得约定将结果放在某处,无需通知。

2.2、提高系统稳定性
考虑电商系统下订单,发送数据给生产系统的情况。 
电商系统和生产系统之间的网络有可能掉线,生产系统可能会因维护等原因暂停服务。

如果不使用消息队列,电商系统数据发布出去,顾客无法下单,影响业务开展。 
两个系统间不应该如此紧密耦合。应该通过消息队列解耦。同时让系统更健壮、稳定。

3、为什么需要分布式?
3.1、多系统协作需要分布式
消息队列中的数据需要在多个系统间共享数据才能发挥价值。 
所以必须提供分布式通信机制、协同机制。

3.2、单系统内部署环境需要分布式
单系统内部,为了更好的性能、为了避免单点故障,多为集群环境。 
集群环境中,应用运行在多台服务器的多个JVM中;数据也保存在各种类型的数据库或非数据库的多个节点上。 
为了满足多节点协作需要,需要提供分布式的解决方案。

4、分布式环境下需要解决哪些问题
4.1、并发问题
需进行良好的并发控制。确保“线程安全“。

不要出现一个订单被出货两次。不要出现顾客A下的单,发货发给了顾客B等情况。

4.2、简单的、统一的操作机制
需定义简单的,语义明确的,业务无关的,恰当稳妥的统一的访问方式。

4.3、容错
控制好单点故障,确保数据安全。

4.4、可横向扩展
可便捷扩容。

5、如何实现?
成熟的消息队列中间件产品太多了,族繁不及备载。

成熟产品经过验证,接口规范,可扩展性强。

结合事业环境因素、组织过程遗产、实施运维考虑、技术路线考虑、开发人员情况等原因综合考虑,基于Redis自己做一个是最可行的选择。

1、消息队列需提供哪些功能?
在功能设计上,我崇尚奥卡姆剃刀法则。 
对于消息队列,只需要两个方法: 生产 和 消费。 
具体的业务场景是任务队列,代码设计如下:

public abstract class TaskQueue{
    private final String name ;
    public String getName(){return this.name;}

public abstract void addTask(Serializable taskId);
    public abstract Serializable popTask();
}

同时支持多个队列,每个队列都应该有个名字。final确保TaskQueue是线程安全的。TaskQueue的实现类也应该确保线程安全。

addTask向队列中添加一个任务。队列中仅保存任务的id,不存储任务的业务数据。

popTask从队列中取出一个任务来执行。 
这种设计不是特别友好,因为她需要调用者自行保证任务执行成功,如果执行失败,自行确保重新把任务放回队列。 无论如何,这种机制是可以工作的。想想奥卡姆剃刀法则,我们先按照这个设计实现出来看看。 
如果调用者把业务数据存在数据库中,业务数据中包含“状态“列,标识任务是否被执行,调用者需要自行管理这个状态,并控制事务。

popTask采用阻塞方式,还是非阻塞方式呢? 
如果采用阻塞方式,队列中没任务的时候,客户端不会断开连接,只是等。 
一般情况下,客户端会有多个worker抢着干活儿,几条狼一起等一个肉包子,画面太美。连接是重要资源,如果一直没活儿干,先放回池里,也不错。 
先采用非阻塞的方式吧,如果队列是空的,popTask返回null,立即返回。

2、后续可能提供的功能
2.1、引入Task生命周期概念
应用场景不同,需求也不同。 
在严格的应用场景中,需要确保每个Task执行“成功“了。 
对于上面提到的popTask后不管的“模式“,这是另外一种“运行模式“,两种模式可以并行存在。

在这种新模式下,Task状态有3种:新创建(new,刚调用addTask加到队列中)、正在执行(in-process,调用popTask后,调用finish前)、完成(done,执行OK了,调用finishTask后)。 
调整后的代码如下:

public abstract class TaskQueue{

private final String name ;
    public String getName(){return this.name;}

public abstract int getMode();

public abstract void addTask(Serializable taskId);
    public abstract Serializable popTask();
    public abstract void finishTask(Serializable taskId);
}

2.2、增加批量取出任务的功能
popTask()一次取出一个任务,太磨叽了。 
好比我们要买5瓶水,开车去超市买,每去一次买1瓶,有点儿啥。 
我们需要一个一次取多个任务的方法。

public abstract class TaskQueue{
    ... ...
    public abstract Serializable[] popTasks(long cnt);
}1
2.3、增加阻塞等待机制
想象一种场景: 
小明同学,取出一个任务,发现干不了,放回队列,再去取,取出来发现还是干不了,又放回去。反反复复。 
小明童鞋肿么了?可能是他干活需要网络,网络断了。可能是他做任务需要写磁盘,磁盘满了。

如果小明像邻居家的孩子一样优秀,当他发现哪里不对的时候,他应该冷静下来,歇会儿。

但他万一不是呢?只有我们能帮他了。

假如队列中有10000个待办任务。 
这时候小明来了。他失败100次后,我们应该拦他吗?不应该,除非他主动要求(在系统参数中配置)。5000次后呢?也不应该,除非他主动要求。我们的原则是:我们做的所有事情,对于调用者,都是可以预期的。

我们可以在系统参数中要求调用者设置一个阀值N,如果不设置,默认为100。连续失败N次后,让调用者睡一会儿,睡多长时间,让调用者配置。

假如我们的底层实现中包含待办子队列、重做子队列和完成子队列(这种设计好复杂!pop的时候先pop重做,还是先pop待办,复杂死了!但愿不需要这样)。 
待办子队列中有10000个任务。

在小明失败10000次后,所有的任务都在重做子队列了。这时候我们应该拦他吗? 
重做子队列要不要设置大小,超过之后,让下一个访问者等。 
等的话就会涉及超时,超时后,任务也不能丢弃。 
太复杂 了!设置一个连续失败次数的限制就够了!

2.4、考虑增加Task类
不保存任务的相关数据是基本原则,绝对不动摇。 
增加Task类可以管理下生命周期,更有用的是,可以把Task本身设计成Listener,代码大概时这样的:

public abstract class Task{

public Serializable getId();
    public int getState();

pubic void doTask();

public void whenAdded(final TaskQueue tq);
    public void whenPoped(final TaskQueue tq);
    // public void whenFaild(final TaskQueue tq);
    public void whenFinished(final TaskQueue tq);
}

通过Task接口,我们可以对调用过程进行更强势的管理(如进行事务控制),对调用者施加更强的控制,用户也可以获得更多的交互机会,同TaskQueue有更好的交互(如在whenFinished中做持久化工作)。

但这些真的有必要吗?是不是太侵入了?注解的方式会好些吗? 
再考虑吧。

2.5、增加系统参数
貌似需要个Config类了,不爽! 
本来想做一个很小很精致的小东西的,如果必须再加吧。 
如果做的话,需要支持properties、注解设置、api方式设置、Spring注入式设置,烦。

次回预告:Redis本身机制和TaskQueue的契合。

1、Redis是什么鬼?
Redis是一个简单的,高效的,分布式的,基于内存的缓存工具。 
假设好服务器后,通过网络连接(类似数据库),提供Key-Value式缓存服务。

简单,是Redis突出的特色。 
简单可以保证核心功能的稳定和优异。

2、性能
性能方面:Redis是足够高效的。 
和Memecached对比,在数据量较小大情况下,Redis性能更优秀。 
数据量大到一定程度的时候,Memecached性能稍好。

简单结论:但总体上讲Redis性能已经足够好。

// Ref: Redis性能测试 http://www.cnblogs.com/lulu/archive/2013/06/10/3130878.html 
原则:Value大小不要超过1390Byte。

经实验得知: 
List操作和字符串操作性能相当,略差,几乎可以忽略。 
使用Jedis自带pool,“每次从pool中取用完放回“ 和 “重用单个连接“ 相比,平均用时是3倍。这部分需要继续研究底层机制,采用更合理的实验方法进一步获得数据。 
使用Jedis自带pool,性能上是满足当前访问量需要的,等有时间了再进一步深入。

3、数据类型
Redis支持5种数据类型:字符串、Map、List、Set、Sorted Set。 
List特别适合用于实现队列。提供的操作包括: 
从左侧(或右侧)放入一个元素,从右侧(或左侧)取出一个元素,读取某个范围的元素,删除某个范围的元素。

Sorted Set中元素是唯一的,可以通过名字找。 
Map可以高效地通过key找。 
假如我们需要实现finishTash(taskId),需要通过名字在队列中找元素,上面两个可能会用到。

4、原子操作
实现分布式队列首要问题是:不能出现并发问题。

Redis是底层是单线程的,命令执行是原子操作,支持事务,契合了我们的需求。

Redis直接提供的命令都是原子操作,包括lpush、rpop、blpush、brpop等。

Redis支持事务。通过类似 begin…[cancel]…commit的语法,提供begin…commit之间的命令为原子操作的功能,之间命令对数据的改变对其他操作是不可见的。类似关系型数据库中的存储过程,同时提供了最高级别的事务隔离级别。

Redis支持脚本,每个脚本的执行是原子性的。

做了一下并发测试: 
写了个小程序,随机对List做push或pop操作,push的比pop的稍多。 
记录每次处理的详细信息到数据库。 
最后把List中数据都pop出来,详细记录每次pop详细信息。 
统计push和pop是否相等,统计针对每条数据是否都有push和pop。 
500并发,没有出现并发问题。

5、集群
实现分布式队列另一个重要问题是:不能出现单点故障。

Redis支持Master-Slave数据复制,从服务器设置 slave-of master-ip:port 即可。 
集群功能可以由客户端提供。 
客户端使用哨兵,可自动切换主服务器。

由于队列操作都是写操作,从服务器主要目的是备份数据,保证数据安全。

如果想基于 sharding 做多master集群,可以结合 zookeeper 自己做。

Redis 3.0支持集群了,还没细看,应该是个好消息,等大家都用起来,没什么问题的话,可以考虑试试看。

如果 master 宕掉,怎么办? 
“哨兵”会选出一个新的master来。产生过程中,消息队列暂停服务。 
最极端的情况,所有Redis都停了,当消息队列发现Redis停止响应时,对业务系统的请求应抛出异常,停止队列服务。 
这样会影响业务,业务系统下订单、审批等操作会失败。如果可以接受,这是一种方案。 
Redis整个集群宕掉,这种情况很少发生,如果真发生了,业务系统停止服务也是可以理解的。

如果想要在Redis整个集群宕掉的情况下,消息队列仍继续提供服务。 
方法是这样的: 
启用备用存储机制,可以是zookeeper、可以是关系型数据库、可以是另外可用的Memecached等。 
本地内存存储是不可取的,首先,同步多个客户端虚拟机内存数据太复杂,相当于自己实现了一个Redis,其次,保证内存数据存储安全太复杂。 
备用存储机制相当于实现了另外一个版本的消息队列,逻辑一致,底层存储不同。这个实现可以性能低一些,保证最基本的原则即可。 
想要保证不出现并发问题,由于消息队列程序同时运行在多个虚拟机中,对象锁、方法锁无效。需要有一个独立于虚拟机的锁机制,zookeeper是个好选择。 
将关系型数据库设置为最高级别的事务隔离级别,太傻了。除了zk有其他好办法吗?

Redis集群整个宕掉的同时Zookeeper也全军覆没怎么办? 
这个问题是没有尽头的,提供了第二备用存储、第三备用存储、第四备用存储、…,理论上也会同时宕掉,那时候怎么办? 
有钱任性的土豪可以继续,预算有限的情况,能做到哪步就做到哪步。

6、持久化
分布式队列的应用场景和缓存的应用场景是不一样的。

如果有没来得及持久化的数据怎么办? 
从业务系统的角度,已经成功发送给消息队列了。 
消息队列也以为Redis妥妥地收好了。 
可Redis还没写到日记里,更没有及时通知小伙伴,挂了。可能是断电了,可能是进程被kill了。

后果会怎样? 
已经执行过的任务会再次执行一遍。 
已经放到队列中的任务,消失了。 
标记为已经完成的任务,状态变为“进行中”了,然后又被执行了一遍。 
后果不可接受。

分布式队列不允许丢数据。 
从业务角度,哪怕丢1条数据也是无法接受的。 
从运维角度,Redis丢数据后,如果可以及时发现并补救,也是可以接受的。

从架构角度,队列保存在Redis中,业务数据(包括任务状态)保存在关系型数据库中。 
任务状态是从业务角度确定的,消息队列不应该干涉。如果业务状态没有统一的规范和定义,从业务数据比对任务队列是否全面正确,就只能交给业务开发方来做。 
从分工上来看,任务队列的目的是管理任务执行的状态,业务系统把这个职责交给了任务队列,业务系统自身的任务状态维护未必准确。 
结论:任务队列不能推卸责任,不能丢数据是核心功能,不能打折扣。

采用 Master-Slave 数据复制模式,配置bgsave,追加存储到aof。

在从服务器上配置bgsave,不影响master性能。

队列操作都是写操作,master任务繁重,能让slave分担的持久化工作,就不要master做。

rdb和aof两种方法都用上,多重保险。 
appendfsync设为always。// 单节点测性能,连续100000次算平均时间,和per second比对,性能损失不大。 
性能会有些许损失,但任务执行为异步操作,无需用户同步等待,为了保证数据安全,这样是值得的。

当运维需要重启Master服务器的时候,采取这样的顺序: 
1. 通过 cli shutdown 停止master服务器, master交代完后事后,关掉自己。这时候“哨兵”会找一个新的master出来。 
万万不可以直接kill或者直接打开防火墙中断master和slave之间的连接。 
master 对外防火墙,停止对外服务,Master 自动切换到其他服务器上, 原 Master 继续持久化 aof,发送到原来各从服务器。 
2. 在原 master 上进行运维操作。 
3. 启动原 master,这时候它已经是从服务器了。耐心等待它从新 master 获取最新数据。观察 redis 日志输出,确认数据安全。 
4. 对新的 master 重复1-3的操作。 
5. 将以上操作写成脚本,自动化执行,避免人为错误。

1、访问Redis的工具类
public class RedisManager {

private static Pool<Jedis> pool;

protected final static Logger logger = Logger.getLogger(RedisManager.class);

static{
        try {
            init();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

public static void init() throws Exception {

Properties props = ConfigManager.getProperties("redis");
        logger.debug("初始化Redis连接池。");
        if(props==null){
            throw new RuntimeException("没有找到redis配置文件");
        }
        // 创建jedis池配置实例
        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        // 设置池配置项值
        int poolMaxTotal = Integer.valueOf(props.getProperty("redis.pool.maxTotal").trim());
        jedisPoolConfig.setMaxTotal(poolMaxTotal);

int poolMaxIdle = Integer.valueOf(props.getProperty("redis.pool.maxIdle").trim());
        jedisPoolConfig.setMaxIdle(poolMaxIdle);

long poolMaxWaitMillis = Long.valueOf(props.getProperty("redis.pool.maxWaitMillis").trim());
        jedisPoolConfig.setMaxWaitMillis(poolMaxWaitMillis);

logger.debug(String.format("poolMaxTotal: %s , poolMaxIdle : %s , poolMaxWaitMillis : %s ",
                poolMaxTotal,poolMaxIdle,poolMaxWaitMillis));

// 根据配置实例化jedis池
        String connectMode = props.getProperty("redis.connectMode");
        String hostPortStr = props.getProperty("redis.hostPort");

logger.debug(String.format("host : %s ",hostPortStr));
        logger.debug(String.format("mode : %s ",connectMode));

if(StringUtils.isEmpty(hostPortStr)){
            throw new OptimusException("redis配置文件未配置主机-端口集");
        }
        String[] hostPortSet = hostPortStr.split(","); 
        if("single".equals(connectMode)){
            String[] hostPort = hostPortSet[0].split(":");
            pool = new JedisPool(jedisPoolConfig, hostPort[0], Integer.valueOf(hostPort[1].trim()));
        }else if("sentinel".equals(connectMode)){
            Set<String> sentinels = new HashSet<String>();    
            for(String hostPort : hostPortSet){
                sentinels.add(hostPort);
            }
            pool = new JedisSentinelPool("mymaster", sentinels, jedisPoolConfig);
        }
    }

/**
    * 使用完成后,必须调用 returnResource 还回。
    * @return 获取Jedis对象
    */
    public static Jedis getResource(){
        Jedis jedis = pool.getResource();
        if(logger.isDebugEnabled()){
            logger.debug("获得链接:" + jedis);
        }
        return jedis;
    }

/**
    * 获取Jedis对象。
    * 
    * 用完后,需要调用returnResource放回连接池。
    * 
    * @param db 数据库序号
    * @return
    */
    public static Jedis getResource(int db){
        Jedis jedis = pool.getResource();
        jedis.select(db);
        if(logger.isDebugEnabled()){
            logger.debug("获得链接:" + jedis);
        }
        return jedis;
    }

/**
    * @param jedis
    */
    public static void returnResource(Jedis jedis){
        if(jedis!=null){
            pool.returnResource(jedis);
            if(logger.isDebugEnabled()){
                logger.debug("放回链接:" + jedis);
            }
        }
    }

/**
    * 需要通过Spring确认这个方法被调用。
    * @throws Exception
    */
    public static void destroy() throws Exception {
        pool.destroy();
    }
}

这个类没有通过技术手段强制调用returnResource和destroy,需要想想办法。

2、队列接口
public interface TaskQueue {

/**
    * 获取队列名
    * @return
    */
    String getName();

/**
    * 往队列中添加任务
    * @param task
    */
    void pushTask(String task);

/**
    * 从队列中取出一个任务
    * @return
    */
    String popTask();

}

用String类型描述任务,也可以考虑byte[],要求对每个任务描述的数据尽可能短。

3、队列的Redis实现类
/**
* 任务队列Redis实现。

* 采用每次获取Jedis并放回pool的方式。
* 如果获得Jedis后一直不放手,反复重用,两个操作耗时可以降低1/3。
* 暂时先忍受这种低性能,不明确Jedis是否线程安全。
*
*/
public class TaskQueueRedisImpl implements TaskQueue {

private final static int REDIS_DB_IDX = 9;

private final static Logger logger = Logger.getLogger(TaskQueueRedisImpl.class);

private final String name;

/**
    * 构造函数。
    * 
    * @param name
    */
    public TaskQueueRedisImpl(String name) {
        this.name = name;
    }

/* (non-Javadoc)
    * @see com.gwssi.common.mq.TaskQueue#getName()
    */
    public String getName() {
        return this.name;
    }
    /* (non-Javadoc)
    * @see com.gwssi.common.mq.TaskQueue#pushTask(String)
    */
    public void pushTask(String task) {
        Jedis jedis = null;
        try{
            jedis = RedisManager.getResource(REDIS_DB_IDX);
            jedis.lpush(this.name, task);
        }catch(Throwable e){
            logger.error(e.getMessage(),e);
        }finally{
            if(jedis!=null){
                RedisManager.returnResource(jedis);
            }
        }
    }

/* (non-Javadoc)
    * @see com.gwssi.common.mq.TaskQueue#popTask()
    */
    public String popTask() {
        Jedis jedis = null;
        String task = null;
        try{
            jedis = RedisManager.getResource(REDIS_DB_IDX);
            task = jedis.rpop(this.name);
        }catch(Throwable e){
            logger.error(e.getMessage(),e);
        }finally{
            if(jedis!=null){
                RedisManager.returnResource(jedis);
            }
        }
        return task;
    }

}

4、获取队列实例的工具类
/**
* <pre>
*  // 获得队列
*  TaskQueue tq = TaskQueueManager.get(TaskQueueManager.SMS_QUEUE);
*  
*  // 添加任务到队列
*  String task = "task id";
*  tq.pushTask(task);

*  // 从队列中取出任务执行
*  String taskToDo = tq.popTask();
* </pre>
* @author liuhailong
*/
public class TaskQueueManager {

protected final static Logger logger = Logger.getLogger(TaskQueueManager.class);

private static Map<String, TaskQueueRedisImpl> queneMap = new ConcurrentHashMap<String, TaskQueueRedisImpl>();

/**
    * 短信队列名。
    */
    public static final String SMS_QUEUE = "SMS_QUEUE";

/**
    * 规则队列名。
    */
    public static final String RULE_QUEUE = "RULE_QUEUE";

private static void initQueneMap() {
        logger.debug("初始化任务队列...");
        queneMap.put(RULE_QUEUE, new TaskQueueRedisImpl(RULE_QUEUE));
        logger.debug("建立队列:"+RULE_QUEUE);
        queneMap.put(SMS_QUEUE, new TaskQueueRedisImpl(SMS_QUEUE));
        logger.debug("建立队列:"+SMS_QUEUE);
    }

static {
        initQueneMap();
    }

public static TaskQueue get(String name){
        return getRedisTaskQueue(name);
    }

public static TaskQueue getRedisTaskQueue(String name){
        return queneMap.get(name);
    }

}

和具体的队列过于紧耦合,但简单好用。 
先跑起来再说。

5、向队列中添加任务的代码
TaskQueue tq = TaskQueueManager.get(TaskQueueManager.SMS_QUEUE);
tq.pushTask(smsMessageId);1
6、从队列中取出任务执行的代码
public class SmsSendTask{

protected final static Logger logger = Logger.getLogger(SmsSendTask.class);

protected static SmsSendService smsSendService = new SmsSendServiceUnicomImpl();
    /**
    * 入口方法。
    */
    public void execute()  {
        TaskQueue taskQueue = null;
        String task = null;
        try {
            taskQueue = TaskQueueManager.get(TaskQueueManager.SMS_QUEUE);

// 非线程安全
            Set<Serializable> executedTaskSet = new HashSet<Serializable>();

task = taskQueue.popTask();
            while(task!=null){
                // 判断是否把所有任务都执行一遍了,避免死循环
                if(executedTaskSet.contains(task)){
                    taskQueue.pushTask(task);
                    break;
                }

executeSingleTask(taskQueue,task);

task = taskQueue.popTask();
            }
        }catch(Throwable e){
            logger.error(e.getMessage(),e);
            e.printStackTrace();
        }
    }

/**
    * 发送单条短信。
    * 
    * 取出任务并执行,如果失败,放回任务列表。
    * 
    * @param taskQueue
    * @param task
    */
    @SuppressWarnings({ "rawtypes", "unchecked" })
    private void executeSingleTask(TaskQueue taskQueue, String task) {
        try {
            // do the job
            String smsId = task;
            Map<String,String> sms = smsSendService.getSmsList(smsId);

smsSendService.send(sms);

smsSendService.updateSmsStatus(task,SmsSendService.STATUS_SENT);

String opType = "2";
            TaskQueueUtil.taskLog(taskQueue.getName(), opType, task);
        } catch (Throwable e) {
            if(task!=null){
                taskQueue.pushTask(task);
                smsSendService.updateSmsStatus(task,SmsSendService.STATUS_WAIT);
                if(logger.isDebugEnabled()){
                    logger.error(String.format("任务%s执行失败:%s,重新放回队列", task, e.getMessage()));
                }
            }else {
                e.printStackTrace();
            }
        }
    }

}

这部分代码是固定模式,而且不这样做存在重大缺陷,会有任务执行失败,被丢弃,这部分代码应该写到队列实现中。 
有空再改。

时间: 2024-10-27 09:12:11

消息队列的相关文章

Azure Messaging-ServiceBus Messaging消息队列技术系列6-消息回执

上篇博文中我们介绍了Azure Messaging的重复消息机制.At most once 和At least once. Azure Messaging-ServiceBus Messaging消息队列技术系列5-重复消息:at-least-once at-most-once 本文中我们主要研究并介绍Azure Messaging的消息回执机制:实际应用场景: 同步收发场景下,消息生产者和消费者双向应答模式,例如:张三写封信送到邮局中转站,然后李四从中转站获得信,然后在写一份回执信,放到中转站

【转】MSMQ 微软消息队列 简单 示例

MSMQ它的实现原理是:消息的发送者把自己想要发送的信息放入一个容器中(我们称之为Message),然后把它保存至一个系统公用空间的消息队列(Message Queue)中:本地或者是异地的消息接收程序再从该队列中取出发给它的消息进行处理. 我个人的理解,你可以把他当做一种,把数据打包后,发送到一个地方,程序也可以去取到这个打包的程序,队列的机制就不讲了,并发问题荡然无存.呵呵. 上代码: 首先 using System.Messaging; public class MsmqManagerHe

消息队列(msg)

一.消息队列:从一个进程向另一个进程发送数据块,读取不一定是先入先出. 管道与消息队列区别:管道基于字节流的,消息队列基于消息: 管道只能发送字符串,消息队列有类型: 管道随进程,消息队列随内核. 二.创建函数原型:int msgget(key_t key, int msgflg);    //key由ftok生成,IPC_CREAT|IPC_EXCL 接收消息:ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, in

第15章 进程间通行 15.6 XSI IPC 15.7 消息队列

15.6 XSI IPC (1)3种称作XSI IPC的IPC是: 1)消息队列 2)信号量 3)共享存储器 (2)标识符和键 1)标识符:是一个非负整数,用于引用IPC结构.是IPC对象的内部名. 2)键:IPC对象的外部名.可使多个合作进程能够在同一IPC对象上汇聚. (3)IPC_PRIVATE键: 用于创建一个新的IPC结构.不能指定此键来引用一个现有的IPC结构. (4)ftok函数: 由一个路径名和项目ID产生一个键. (5)ipc_perm结构体 规定了ipc结构的权限和所有者.

XSI进程间通信-----消息队列

1. 基本特点 1) 消息队列是一个由系统内核负责存储和管理,并通过消息队列标识引用的数据链表,消息队列 和有名管道fifo的区别在: 后者一次只能放一个包,而前者则可以放很多包,这样就能处理发包快,哪包慢的问题 2) 可以通过msgget函数创建一个新的消息队列, 或获取一个已有的消息队列. 通过msgsnd函数 (send)向消息队列的后端追加消息, 通过msgrcv(receive)函数从消息队列的前端提取消息. 3) 消息队列中的每个消息单元除包含消息数据外,还包含消息类型和数据长度.消

android 中使用View的消息队列api更新数据

基本上只要继承自View的控件,都具有消息队列或者handler的一些处理方法,下面是一些handler方法以及被View封装了的方法,其底层用的基本都是handler的api. 我么开一下postDelay的定义 android.view.View  public boolean postDelayed(Runnable action, long delayMillis) {         final AttachInfo attachInfo = mAttachInfo;         

消息队列实现订单异步提交

what MSMQ(Microsoft Message Queue),微软消息队列,用于应用程序之间相互通信的一种异步传输模式.应用程序可以分布在同台机器上,也可以分布于互联的网络中的任意位置.基本原理:消息发送者把要发送的消息放入容器,也就是Message(消息),然后保存到系统公用空间的消息队列中(Message Queue)中,本地或互联位置上的消息接收程序再从队列中取出发给它的消息进行处理.消息类型可以是文本,图像,自定义对象等.消息队列分为公共队列和私有队列. why 一.用于进程间的

Windows消息队列

一 Windows中有一个系统消息队列,对于每一个正在执行的Windows应用程序,系统为其建立一个"消息队列",即应用程序队列,用来存放该程序可能 创建的各种窗口的消息.应用程序中含有一段称作"消息循环"的代码,用来从消息队列中检索这些消息并把它们分发到相应的窗口函数中.  二 Windows为当前执行的每个Windows程序维护一个「消息队列」.在发生输入事件之后,Windows将事件转换为一个「消息」并将消息放入程序的消息队列中.程序通过执行一块称之为「消息循

消息队列编程

消息队列:就是一个消息的链表.而一条消息则可看作一个记录,具有特定的格式.进程可以向中按照一定的规则添加新消息:另一些进程则可以从消息队列中读走消息 发送消息队列: #include<sys/types.h>#include<sys/msg.h>#include<sys/ipc.h>#include<stdio.h> struct msgt{ long msgtype; char msgtext[1024]; };int msg_type;char str[

Freertos-事件标志组,消息队列,信号量,二值信号量,互斥信号量

任务间的通信和同步机制  在裸机编程时,使用全局变量的确比较方便,但是在加上 RTOS 后就是另一种情况了. 使用全局变量相比事件标志组主要有如下三个问题: 1.使用事件标志组可以让 RTOS 内核有效地管理任务,而全局变量是无法做到的,任务的超时等机制需要用户自己去实现.2.使用了全局变量就要防止多任务的访问冲突,而使用事件标志组则处理好了这个问题,用户无需担心.3.使用事件标志组可以有效地解决中断服务程序和任务之间的同步问题. 事件标志组:事件标志组是实现多任务同步的有效机制之一. 每创建一