【转】消息系统之Redis

本例包括

jedis_demo:入口类

jedis_control:jedis控制器(jedis的连接池)

jedis_pub_sub_listener:订阅的监听器

singleton_agent:单例的代理类(连接池配置)

package com.larry.jedis;  

import redis.clients.jedis.Jedis;  

/**
 * 入口类 */
public class jedis_demo {
    jedis_control redis_util = jedis_control.get_singleton();  

    public static void main(String[] args) {
        jedis_demo jedis_demo = new jedis_demo();  

        new Thread(new Runnable(){
            @Override
            public void run() {
                jedis_control redis_util = jedis_control.get_singleton();
                Jedis jedis = redis_util.get_connection();
                jedis_pub_sub_listener pub_sub_listener = new jedis_pub_sub_listener();
                // 可以订阅多个频道
                // 订阅得到信息在lister的onMessage(...)方法中进行处理
                // jedis.subscribe(listener, "news.share", "news.log");
                // jedis.subscribe(listener, new String[]{"news.share","news.log"});
                jedis.psubscribe(pub_sub_listener, new String[] { "news.share" });// 使用模式匹配的方式设置频道
            }
        }).start();  

        jedis_demo.publish();
    }  

    /**
     * 发布
     */
    public void publish() {
        Jedis jedis = redis_util.get_connection();
        jedis.publish("news.share", "ok");
        jedis.publish("news.share", "hello word");
    }
}  

package com.larry.jedis;  

import redis.clients.jedis.Jedis;  

/**
 * jedis控制器
 * @author 吕桂强
 * @email [email protected]
 * @version 创建时间:2012-3-28 下午12:03:40
 */
public final class jedis_control {
    //单例
    private static jedis_control _jedis_control;
    public static jedis_control get_singleton(){
        if(_jedis_control == null){
            _jedis_control = new jedis_control();
        }
        return _jedis_control;
    }  

    /**
     * 获取连接实例
     * @return jedis
     */
    public Jedis get_connection() {
        Jedis jedis = null;
        try {
            jedis = singleton_agent.get_jedispool().getResource();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return jedis;
    }     

    /**
     * 释放数据库连接
     * @param conn
     */
    public void close_connection(Jedis jedis) {
        if (null != jedis) {
            try {
                singleton_agent.get_jedispool().returnResource(jedis);
            } catch (Exception e) {
                    e.printStackTrace();
            }
        }
    }
}

package com.larry.jedis;  

import redis.clients.jedis.JedisPubSub;  

/**
 * 监听订阅事件
 **/
public class jedis_pub_sub_listener extends JedisPubSub {
    // 取得订阅的消息后的处理
    public void onMessage(String channel, String message) {
        System.out.println(channel + "=" + message);
    }  

    // 初始化订阅时候的处理
    public void onSubscribe(String channel, int subscribedChannels) {
        System.out.println(channel + "=" + subscribedChannels);
    }  

    // 取消订阅时候的处理
    public void onUnsubscribe(String channel, int subscribedChannels) {
        System.out.println(channel + "=" + subscribedChannels);
    }  

    // 初始化按表达式的方式订阅时候的处理
    public void onPSubscribe(String pattern, int subscribedChannels) {
        System.out.println(pattern + "=" + subscribedChannels);
    }  

    // 取消按表达式的方式订阅时候的处理
    public void onPUnsubscribe(String pattern, int subscribedChannels) {
        System.out.println(pattern + "=" + subscribedChannels);
    }  

    // 取得按表达式的方式订阅的消息后的处理
    public void onPMessage(String pattern, String channel, String message) {
        System.out.println(pattern + "=" + channel + "=" + message);
    }
}  

[java] view plain copy
package com.larry.jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;  

/**
 * 所有单例的代理类 */
public class singleton_agent {
    //****************单例一个连接池***************
    private static JedisPool jedispool = null;
    /**
     * 获取连接池
     * @return 数据源
     */
    public static JedisPool get_jedispool() {
        if(jedispool == null){
            JedisPoolConfig jedispool_config = new JedisPoolConfig();
            jedispool_config.maxActive = 20;
            jedispool_config.maxIdle = 0;
            jedispool_config.maxWait = 1000;
            jedispool_config.testOnBorrow = true;
            jedispool = new JedisPool(jedispool_config, "localhost", 6379);
        }
        return jedispool;
    }
    //end****************单例一个连接池***************
}  
时间: 2024-08-07 00:16:12

【转】消息系统之Redis的相关文章

Sprint Boot如何基于Redis发布订阅实现异步消息系统的同步调用?

前言 在很多互联网应用系统中,请求处理异步化是提升系统性能一种常用的手段,而基于消息系统的异步处理由于具备高可靠性.高吞吐量的特点,因而在并发请求量比较高的互联网系统中被广泛应用.与此同时,这种方案也带来了调用链路处理上的问题,因为大部分应用请求都会要求同步响应实时处理结果,而由于请求的处理过程已经通过消息异步解耦,所以整个调用链路就变成了异步链路,此时请求链路的发起者如何同步拿到响应结果,就需要进行额外的系统设计考虑. 为了更清晰地理解这个问题,小码哥以最近正在做的共享单车的IOT系统为例,给

常见开源消息系统

消息系统的作用:异步处理.削减峰值.减少组件之间的耦合. 选择消息系统根据业务需要需要考虑以下几个方面: 是否持久化 吞吐能力 高可用 分布式扩展能力 兼容现有协议 易于维护 其他,如消息丢失和重复的处理 避免单点故障 负载均衡 常见消息系统协议: STOMP AMQP 类似 MEMCACHE 的协议 HTTP 自定格式 1.2 是不错的可选开源组件: 1. Kafka/MetaQ: 广泛用于 Linkedin 内部 (类似有 Java 版本的国产 MetaQ) 由于优先考虑吞吐,更加适合大数据

系统学习redis之一——基础概念

前言: redis集群搭建过很多次,也用过一些基础的命令,能解决一些常规的异常.但是因为平时对数据这块用得不多,一直没有很系统的学习过redis.这次将redis的知识好好学习了一下,记录为学习笔记<系统学习redis>系列. NoSQL介绍 非关系型数据库:NoSQL关系型数据库:MySQL.SQLserver.Oracle 关系型数据库在web2.0上对高并发,暴露出了一些性能问题 NoSQL 是以key-value形式存储,和传统的关系型数据库不一样,不一定遵循传统数据库的一些基本要求,

分布式开放消息系统(RocketMQ)的原理与实践

分布式消息系统作为实现分布式系统可扩展.可伸缩性的关键组件,需要具有高吞吐量.高可用等特点.而谈到消息系统的设计,就回避不了两个问题: 消息的顺序问题 消息的重复问题 RocketMQ作为阿里开源的一款高性能.高吞吐量的消息中间件,它是怎样来解决这两个问题的?RocketMQ 有哪些关键特性?其实现原理是怎样的? 关键特性以及其实现原理 一.顺序消息 消息有序指的是可以按照消息的发送顺序来消费.例如:一笔订单产生了 3 条消息,分别是订单创建.订单付款.订单完成.消费时,要按照顺序依次消费才有意

消息系统之Apache ActiveMQ

一.下载运行MQ服务 1.下载ActiveMQ :http://activemq.apache.org/ 2.解压缩: 进入bin目录 win32和win64对应不同位的操作系统,选择进入 点击activemq.bat 运行即可启动ActiveMQ服务. 在浏览器输入ActiveMQ 服务地址:http://127.0.0.1:8161/admin/         默认用户名/密码 admin/admin 二.开发 jar:activemq-all-5.11.1.jar   在ActiveMQ

Kafka是分布式发布-订阅消息系统

https://www.biaodianfu.com/kafka.html Kafka是分布式发布-订阅消息系统.它最初由LinkedIn公司开发,之后成为Apache项目的一部分.Kafka是一个分布式的,可划分的,冗余备份的持久性的日志服务.它主要用于处理活跃的流式数据. 在大数据系统中,常常会碰到一个问题,整个大数据是由各个子系统组成,数据需要在各个子系统中高性能,低延迟的不停流转.传统的企业消息系统并不是非常适合大规模的数据处理.为了已在同时搞定在线应用(消息)和离线应用(数据文件,日志

分布式消息系统:Kafka

1.为什么要有Kafka?  [出自 Hrq] Kafka是一个消息系统, 原本开发自LinkedIn,用作LinkedIn的活动流(activity stream)和运营数据处理管道(pipeline)的基础.现在主要用作数据管道(data pipeline)和消息系统 Kafka出现的原因: l  传统的日志文件统计分析对离线处理(如报表和批处理)不错,但对于实时处理来说其时延太大,而且还具有较高的运营复杂度. l  现有的消息队列系统虽很适合于在实时或近实时(near-real-time)

分布式发布订阅消息系统Kafka架构设计

我们为什么要搭建该系统 Kafka是一个消息系统,原本开发自LinkedIn,用作LinkedIn的活动流(activity stream)和运营数据处理管道(pipeline)的基础.现在它已为多家不同类型的公司作为多种类型的数据管道(data pipeline)和消息系统使用. 活动流数据是所有站点在对其网站使用情况做报表时要用到的数据中最常规的部分.活动数据包括页面访问量(page view).被查看内容方面的信息以及搜索情况等内容.这种数据通常的处理方式是先把各种活动以日志的形式写入某种

结合windows消息系统理解C#中WndProc函数和DefWndProc函数

Windows消息系统由3部分组成: 1.消息队列.Windows应用程序的消息是由Windows统一在一个消息队列中管理的. 2.消息循环.应用程序从Windows消息队列中获得自己的消息,并将其分配给窗体函数进行处理. 3.窗口过程.负责处理接收到的消息,每个窗口都有对应的窗口过程,负责截获消息并响应.WndProc是窗口过程函数,负责处理接收到的消息,在我们写代码时,不会注意到有这个函数,这是因为开发环境自动为我们生成了.WndProc函数通过switch...case...判断并处理消息