ActiveMq-拦截创建消息队列

ActiveMQ拦截客户端创建/接收消息队列

1.创建插件

public class AuthPlugin implements BrokerPlugin{
    private String mqName;//本MQ服务器名称
    private JdbcTemplate jdbcTemplate;//数据库操作类

    public AuthPlugin(JdbcTemplate jdbcTemplate,String mqName) {
        this.jdbcTemplate=jdbcTemplate;
        this.mqName=mqName;
    }

    @Override
    public Broker installPlugin(Broker broker) throws Exception {
        return new AuthBroker(broker,jdbcTemplate,mqName);
    }
}

2.修改apache-activemq\conf\activemq.xml

<!--broker节点下-->
<plugins>
    <bean xmlns="http://www.springframework.org/schema/beans" id="ehlPlugin" class="com.ehl.plugin.AuthPlugin">
        <constructor-arg index="0">
            <ref bean="jdbcTemplate"/>
        </constructor-arg>
        <constructor-arg index="1" value="MQName"/><!--本消息队列的名称-->
    </bean>
</plugins>

3.创建插件类

public class AuthBroker extends AbstractAuthenticationBroker{
    private static Log log = LogFactory.getLog(AuthBroker.class);

    private static final String ACTIVEMQ_ADVISORY_PRODUCER_QUEUE="ActiveMQ.Advisory.Producer.Queue.";//消息生产者前缀
    private static final String ACTIVEMQ_ADVISORY_CONSUMER_QUEUE="ActiveMQ.Advisory.Consumer.Queue.";//消息消费者前缀
    private JdbcTemplate jdbcTemplate;//数据库操作
    private String mqName;//MQ服务器名称
    public AuthBroker(Broker next,JdbcTemplate jdbcTemplate,String mqName) {
        super(next);
        this.jdbcTemplate=jdbcTemplate;
        this.mqName=mqName;
    }
        /**
     * 连接拦截器
     */
    @Override
    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
        log.info("用户["+info.getUserName()+"]请求连接["+mqName+"]!");
        SecurityContext securityContext = context.getSecurityContext();
        if (securityContext == null) {
            securityContext = authenticate(info.getUserName(), info.getPassword(), null);
            context.setSecurityContext(securityContext);
            securityContexts.add(securityContext);
        }

        try {
            super.addConnection(context, info);
        } catch (Exception e) {
            securityContexts.remove(securityContext);
            context.setSecurityContext(null);
            throw e;
        }
    }
        /**
     * 认证
     * <p>Title: authenticate</p>
     */
    @Override
    public SecurityContext authenticate(String username, String password, X509Certificate[] peerCertificates) throws SecurityException {
        SecurityContext securityContext = null;
        Com com=getCom(username,password);
        //验证用户信息
        if(com!=null&&com.getId()!=null){
             securityContext = new SecurityContext(username) {
                    @Override
                    public Set<Principal> getPrincipals() {
                        Set<Principal> groups = new HashSet<Principal>();
                        groups.add(new GroupPrincipal("users"));//默认加入了users的组
                        return groups;
                    }
                };
//                log.info("用户:"+username+"验证成功!");
        }else{
            log.error("用户:"+username+"验证失败!");
            throw new SecurityException("验证失败");
        }
        return securityContext;
    }
        /**
     * 添加一个目标
     * <p>Title: addDestination</p>
     * @see org.apache.activemq.broker.BrokerFilter#addDestination(org.apache.activemq.broker.ConnectionContext, org.apache.activemq.command.ActiveMQDestination, boolean)
     */
    @Override
    public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean createIfTemporary) throws Exception {
        boolean destStats = destination.getPhysicalName().regionMatches(true, 0, ACTIVEMQ_ADVISORY_PRODUCER_QUEUE, 0,ACTIVEMQ_ADVISORY_PRODUCER_QUEUE.length());
        //发送消息者
        if(destStats){
            if(context.getSecurityContext()!=null){
                //判断不是默认用户
                if(!context.getSecurityContext().getUserName().equals(SecurityContext.BROKER_SECURITY_CONTEXT.getUserName())){
                    String queuesName=destination.getPhysicalName().replace(ACTIVEMQ_ADVISORY_PRODUCER_QUEUE, "");//得到消息队列名
                    if(powers.containsKey(context.getSecurityContext().getUserName())){//判断该用户是否有权限
                        Map<String,ViewProjectMqQueuesCom> map=powers.get(context.getSecurityContext().getUserName());
                        if(map!=null&&map.containsKey(queuesName)){//判断是否有发送的权限
                            if(map.get(queuesName).getBindId()!=null&&map.get(queuesName).getComQueuesType()!=null){
                                if(map.get(queuesName).getComQueuesType().intValue()==QueuesComType.BOTH.getValue().intValue()||map.get(queuesName).getComQueuesType().intValue()==QueuesComType.SEND.getValue().intValue()){
                                    return super.addDestination(context, destination, createIfTemporary);
                                }
                            }
                        }
                        throw new Exception("["+mqName+"-"+context.getUserName()+"]对消息队列["+queuesName+"]没有发送消息的权限");
                    }else{
                        throw new Exception("请登录后再操作!");
                    }
                }
            }
        }else{
            boolean consumerStats = destination.getPhysicalName().regionMatches(true, 0, ACTIVEMQ_ADVISORY_CONSUMER_QUEUE, 0,ACTIVEMQ_ADVISORY_CONSUMER_QUEUE.length());
            //消息接收者
            if(consumerStats){
                if(context.getSecurityContext()!=null){
                    //判断不是默认用户
                    if(!context.getSecurityContext().getUserName().equals(SecurityContext.BROKER_SECURITY_CONTEXT.getUserName())){
                        String queuesName=destination.getPhysicalName().replace(ACTIVEMQ_ADVISORY_CONSUMER_QUEUE, "");//得到消息队列名称
                        if(powers.containsKey(context.getSecurityContext().getUserName())){//判断用户是否有对应的权限
                            Map<String,ViewProjectMqQueuesCom> map=powers.get(context.getSecurityContext().getUserName());
                            if(map!=null&&map.containsKey(queuesName)){
                                if(map.get(queuesName).getBindId()!=null&&map.get(queuesName).getComQueuesType()!=null){
                                    if(map.get(queuesName).getComQueuesType().intValue()==QueuesComType.BOTH.getValue().intValue()||map.get(queuesName).getComQueuesType().intValue()==QueuesComType.RECEIVE.getValue().intValue()){
                                        return super.addDestination(context, destination, createIfTemporary);
                                    }
                                }
                            }
                            throw new Exception("["+mqName+"-"+context.getUserName()+"]对消息队列["+queuesName+"]没有获取消息的权限");
                        }else{
                            throw new Exception("请登录后再操作!");
                        }
                    }
                }
            }
        }
        return super.addDestination(context, destination, createIfTemporary);
    }
        /**
     * 监控发送消息
     * <p>Title: send</p>
     * @see org.apache.activemq.broker.BrokerFilter#send(org.apache.activemq.broker.ProducerBrokerExchange, org.apache.activemq.command.Message)
     */
    @Override
    public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
        String userName=producerExchange.getConnectionContext().getUserName();
        ActiveMQDestination msgDest = messageSend.getDestination();
        String physicalName = msgDest.getPhysicalName();
    }
/**
     * 监控消息接收者
     * <p>Title: acknowledge</p>
     * @see org.apache.activemq.broker.BrokerFilter#acknowledge(org.apache.activemq.broker.ConsumerBrokerExchange, org.apache.activemq.command.MessageAck)
     */
    @Override
    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
        String userName=consumerExchange.getConnectionContext().getUserName();
        String queues=ack.getDestination().getPhysicalName();
    }
}
时间: 2024-11-10 00:14:36

ActiveMq-拦截创建消息队列的相关文章

ActiveMq C#客户端 消息队列的使用(存和取)

1.准备工具 VS2013Apache.NMS.ActiveMQ-1.7.2-bin.zipapache-activemq-5.14.0-bin.zip 2.开始项目 VS2013新建一个C#控制台应用程序,项目中添加两个dll引用,一个是D:\Apache.NMS.ActiveMQ-1.7.2-bin\lib\Apache.NMS\net-4.0目录下的Apache.NMS.dll,另一个是D:\Apache.NMS.ActiveMQ-1.7.2-bin\build\net-4.0\debug

实战Spring4+ActiveMQ整合实现消息队列(生产者+消费者)

引言: 最近公司做了一个以信息安全为主的项目,其中有一个业务需求就是,项目定时监控操作用户的行为,对于一些违规操作严重的行为,以发送邮件(FoxMail)的形式进行邮件告警,可能是多人,也可能是一个人,第一次是以单人的形式,,直接在业务层需要告警的地方发送邮件即可,可是后边需求变更了,对于某些告警邮件可能会发送多人,这其中可能就会有阻塞发邮件的可能,直到把所有邮件发送完毕后再继续做下边的业务,领导说这样会影响用户体验,发邮件的时候用户一直处于等待状态,不能干别的事情.最后研究说用消息队列,当有需

MSMQ创建消息队列出现“工作组安装计算机不支持该操作”

[sceislqzw]:你在创建公有队列,而你的机器不属于任何域.一般工作组安装的计算机只能创建私有队列. System.Messaging.MessageQueue QueueReceive = new System.Messaging.MessageQueue(@".\Private$\MSMQDemo");这样应该不会出错. [erdgzw]:创建专用队列当然可以.难到创建公有队列必须要在域上啊? [flyaqiao]:if (MessageQueue.Exists(path))

AMQP 与ActiveMQ,JMS消息队列之间的比较

http://blog.csdn.net/kimmking/article/details/8253549 http://www.csdn123.com/html/mycsdn20140110/8f/8f42bb0680685c547107a0079e557686.html http://blog.sina.com.cn/s/blog_999d1f4c01010dpx.html

JMS消息队列ActiveMQ(点对点模式)

生产者(producer)->消息队列(message queue) package com.java1234.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session

JMS消息队列ActiveMQ(发布/订阅模式)

消费者1(Consumer)--订阅(subcribe)-->主题(Topic) package com.java1234.activemq2; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.

java JMS消息队列

http://blog.csdn.net/shirdrn/article/details/6362792 http://haohaoxuexi.iteye.com/blog/1893038 http://afreon.blog.163.com/blog/static/223794094201431422654237/ http://www.cnblogs.com/huang0925/p/3558690.html ActiveMQ第二弹:使用Spring JMS与ActiveMQ通讯 本文章的完整

XSI进程间通信-----消息队列

1. 基本特点 1) 消息队列是一个由系统内核负责存储和管理,并通过消息队列标识引用的数据链表,消息队列 和有名管道fifo的区别在: 后者一次只能放一个包,而前者则可以放很多包,这样就能处理发包快,哪包慢的问题 2) 可以通过msgget函数创建一个新的消息队列, 或获取一个已有的消息队列. 通过msgsnd函数 (send)向消息队列的后端追加消息, 通过msgrcv(receive)函数从消息队列的前端提取消息. 3) 消息队列中的每个消息单元除包含消息数据外,还包含消息类型和数据长度.消

进程间通信之消息队列通信

概念 消息队列 消息队列提供了一个从一个进程向另外一个进程发送一块数据的方法 每个数据块都被认为是有一个类型,接收者进程接收的数据块可以有不同的类型值 消息队列也有管道一样的不足,就是每条消息的最大长度是有上限的(MSGMAX),每个消息队列的总字节数(内核缓冲上限)是有上限的(MSGMNB),系统上消息队列的总数(消息条目数)也有一个上限(MSGMNI) 对比: 管道 消息 流管道 有边界 先进先出 可以后进入.先出来 消息大小三大限制 cat /proc/sys/kernel/msgmax最