redis教程(二)-----redis事务、记录日志到redis、分布式锁

redis事务

Redis 事务可以一次执行多个命令, 并且带有以下两个重要的保证:

  • 批量操作在发送 EXEC 命令前被放入队列缓存。
  • 收到 EXEC 命令后进入事务执行,事务中任意命令执行失败,其余的命令依然被执行。
  • 在事务执行过程,其他客户端提交的命令请求不会插入到事务执行命令序列中。

一个事务从开始到执行会经历以下三个阶段:

  • 开始事务。
  • 命令入队。
  • 执行事务。

示例:

//根据multi开启事务redis 127.0.0.1:6379> MULTI
OK

redis 127.0.0.1:6379> SET book-name "Mastering C++ in 21 days"
QUEUED

redis 127.0.0.1:6379> GET book-name
QUEUED

redis 127.0.0.1:6379> SADD tag "C++" "Programming" "Mastering Series"
QUEUED

redis 127.0.0.1:6379> SMEMBERS tag
QUEUED
//触发事务
redis 127.0.0.1:6379> EXEC
1) OK
2) "Mastering C++ in 21 days"
3) (integer) 3
4) 1) "Mastering Series"
   2) "C++"
   3) "Programming"

注:

单个 Redis 命令的执行是原子性的,但 Redis 没有在事务上增加任何维持原子性的机制,所以 Redis 事务的执行并不是原子性的。
事务可以理解为一个打包的批量执行脚本,但批量指令并非原子化的操作,中间某条指令的失败不会导致前面已做指令的回滚,也不会造成后续的指令不做。

redis 127.0.0.1:7000> multi
OK
redis 127.0.0.1:7000> set a aaa
QUEUED
redis 127.0.0.1:7000> set b bbb
QUEUED
redis 127.0.0.1:7000> set c ccc
QUEUED
redis 127.0.0.1:7000> exec
1) OK
2) OK
3) OK

如果在 set b bbb 处失败,set a 已成功不会回滚,set c 还会继续执行。

redis事务命令

multi:标记一个事务块的开始。
exec:执行所有事务块内的命令。
discard:取消事务,放弃执行事务块内的所有命令。
watch:监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断。
unwatch:取消 WATCH 命令对所有 key 的监视。

将日志记录到redis

1、需求

  • 获取日志的产生的线程名称,记录器名称,上下文产生时间,日志发生时间,自定义日志的信息
  • 将获取的信息以json的形式保存到redis中

2、思路

  • 配置logback使用自定义Appender实现,来获取对应的日志信息
  • 配置一个单列的redis工具类(不影响其他业务),将获取的日志信息保存起来

3、配置依赖

<!-- 日志:slf4j是接口,log4j是具体实现 -->
<dependency>
  <groupId>org.slf4j</groupId>
  <artifactId>slf4j-api</artifactId>
  <version>1.7.12</version>
</dependency>
<dependency>
  <groupId>ch.qos.logback</groupId>
  <artifactId>logback-core</artifactId>
  <version>1.1.1</version>
</dependency>
<!-- 实现slf4j接口并整合 -->
<dependency>
  <groupId>ch.qos.logback</groupId>
  <artifactId>logback-classic</artifactId>
  <version>1.1.1</version>
</dependency>
<!--redis-->
<dependency>
  <groupId>redis.clients</groupId>
  <artifactId>jedis</artifactId>
  <version>2.7.3</version>
</dependency>

4、配置redis以及logback

a、logback.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!--自定义日志-->
    <appender name="MyAppender" class="log.RedisAppender" />
    <root level="info">
        <appender-ref ref="MyAppender" />
    </root>
</configuration>

b、redis.properties

maxIdle=100
maxWait=3000
testOnBorrow=true
host=127.0.0.1
port=6379
timeout=3000
pass=你的密码

5、MyAppender

package log;

import ch.qos.logback.classic.spi.LoggingEvent;
import ch.qos.logback.core.AppenderBase;
import dto.log.LogData;
import redis.clients.jedis.Jedis;
import tool.JsonBuilder;
import tool.RedisBuilder;

/**
 * 自定义日志处理
 */
public class RedisAppender extends AppenderBase<LoggingEvent> {

    @Override
    protected void append(LoggingEvent loggingEvent) {
        //获取日志数据
        LogData logData=new LogData(loggingEvent);
        //设置日志保存数据库
        Jedis jedis=RedisBuilder.getSingleJedis(2);
        //设置日志的key
        String key="logData";
        //获取日志条数
        Integer index=RedisBuilder.getRedisIndexByName(key);
        //保存日志
        jedis.set(key+index, JsonBuilder.getString(logData));
        //关闭链接
        jedis.close();

    }
}

6、LogData

package dto.log;

import ch.qos.logback.classic.spi.LoggingEvent;
import tool.DataFormatBuilder;

/**
 * Created by yuyu on 2018/3/15.
 * 用于保存日志数据
 */
public class LogData {

    private String message;//日志的信息
    private String loggerTime;//上下文产生时间
    private String loggerName;//记录器名称
    private String threadName;//线程名称
    private String happenStamp;//日志发生时间

    public LogData() {
    }

    public LogData(LoggingEvent loggingEvent) {

        this.message=loggingEvent.toString();
        this.loggerTime=DataFormatBuilder.getTimeStampFormat(null, loggingEvent.getContextBirthTime());
        this.loggerName=loggingEvent.getLoggerName();
        this.threadName=loggingEvent.getThreadName();
        this.happenStamp=DataFormatBuilder.getTimeStampFormat(null, loggingEvent.getTimeStamp());
    }
    //getter,setter略
}

7、RedisBuilder

package tool;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

import java.util.Properties;

/**
 * Created by yuyu on 2018/3/15.
 * redis相关的操作,获取一个单例的连接池
 */
public class RedisBuilder {

    private static JedisPool jedisPool;

    private static RedisBuilder singleRedisBuilder=new RedisBuilder();

    //单利模式
    private RedisBuilder(){
        //获取配置信息
        Properties properties=PropertiesGetter.get("redis");
        //设置连接池参数
        JedisPoolConfig config = new JedisPoolConfig();
        config.setMaxIdle(Integer.parseInt(properties.getProperty("maxIdle")));//最大的jedis实例
        config.setMaxWaitMillis(Integer.parseInt(properties.getProperty("maxWait")));//最大等待时间(毫秒)
        config.setTestOnBorrow(Boolean.parseBoolean(properties.getProperty("testOnBorrow")));//借用时测试
        //redis链接
        jedisPool=new JedisPool(config, properties.getProperty("host")
                , Integer.parseInt(properties.getProperty("port"))
                , Integer.parseInt(properties.getProperty("timeout"))
                ,  properties.getProperty("pass"));
    }

    /**
     * 从连接池中获取一个Jedis对象
     * @param db 数据库[0,15]
     * @return
     */
    public static Jedis getSingleJedis(Integer db) {
        if (db==null||(db<0&&db>15)){
            return null;
        }
        Jedis back=jedisPool.getResource();
        back.select(db);
        return back;
    }

    /**
     *传入名称获取保存在redis中的Index号码
     * @param name
     * @return
     */
    public static Integer getRedisIndexByName(String name){
        if (name==null){
            return null;
        }
        Jedis jedis=RedisBuilder.getSingleJedis(15);
        Integer index;
        String value=jedis.get(name);
        //获取保存的index数据,没有的时候取0
        if (null==value){
            index=0;
        }else{
            index =Integer.parseInt(value)+1;
        }
        //将index保存
        jedis.set(name,index.toString());
        jedis.close();
        return index;
    }
}

8、DateFormatBuilder

package tool;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * Created by yuyu on 2018/3/15.
 * 用于日期处理相关函数
 */
public class DataFormatBuilder {

    /**
     * 根据传进来的时间戳 获取对应的时间格式
     * @param format 时间格式
     * @param stamp 时间戳
     * @return
     */
    public static  String getTimeStampFormat(String format,Long stamp){
        if (stamp==null){
            return null;
        }
        if (format==null){
            format="yyyy-MM-dd HH:mm:ss/SSS";
        }
        SimpleDateFormat df = new SimpleDateFormat(format);//设置日期格式
        return df.format(stamp);//传进来的时间戳为获取当前系统时间
    }
}

9、PropertiesGetter

package tool;

import exception.DobeoneException;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

/**
 * Created by yuyu on 2018/3/12.
 * 用于参数配置文件中的数据获取
 */
public class PropertiesGetter {
    /**
     * 获取配置文件的配置信息
     * @param name
     * @return
     */
    public synchronized static Properties get(String name){
        String file="/properties/"+name+".properties";
        Properties prop = new Properties();
        InputStream in = PropertiesGetter.class.getResourceAsStream(file);
        try {
            prop.load(in);
        } catch (IOException e) {
            throw new DobeoneException("获取配置文件异常!-file-"+file,e);
        }
        return prop;
    }
}

10、JsonBuilder

package tool;

import com.fasterxml.jackson.databind.ObjectMapper;
import exception.DobeoneException;

/**
 * Created by yuyu on 2018/3/15.
 * json相关的函数
 */
public class JsonBuilder {
    /**
     * 将一个实体类转换成json字符串
     * @param object
     * @return
     */
    public static String getString(Object object){
        //安全判断
        if (object==null){
            return null;
        }
        ObjectMapper mapper = new ObjectMapper();
        String back;
        try{
            back=mapper.writeValueAsString(object);
        }catch (Exception e){
            //抛出一个自定义异常
            throw new DobeoneException("json字符转换失败!-object-"+object,e);
        }
        return back;
    }
}

测试:

package tool;

import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Created by yuyu on 2018/3/15.
 * 测试日志到redis
 */
public class TestLogger {

    private Logger logger= LoggerFactory.getLogger(this.getClass());

    @Test
    public void testLogger(){
        logger.info("hahha");
    }
}

redis分布式锁

一般生产环境都是服务器集群,那么如果希望某个操作只能由一台服务器去运行,那么如何实现呢?例如本人之前做过的一个需求,通过kettle将数据同步到我们的数据库中,这个同步是每天凌晨3点,每天一次,数据到达我们数据库之后我们要进行分发,并做后续的处理。因此这个分发就只能由一台服务器去操作,如果同时有多台服务器分发,就会出现任务重复的问题。

1、分布式锁要求

  • 互斥性。在任意时刻,只有一个客户端能持有锁。
  • 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
  • 具有容错性。只要大部分的Redis节点正常运行,客户端就可以加锁和解锁。
  • 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。

2、分布式锁

a、添加依赖

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.9.0</version>
</dependency>

b、代码实现

public class RedisTool {

    private static final String LOCK_SUCCESS = "OK";
    private static final String SET_IF_NOT_EXIST = "NX";
    private static final String SET_WITH_EXPIRE_TIME = "PX";

    /**
     * 尝试获取分布式锁
     * @param jedis Redis客户端
     * @param lockKey 锁
     * @param requestId 请求标识
     * @param expireTime 超期时间
     * @return 是否获取成功
     */
    public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {
        //第一个为key,我们使用key来当锁,因为key是唯一的;             //第二个为value,我们传的是requestId,很多童鞋可能不明白,有key作为锁不就够了吗,为什么还要用到value?原因就是我们在上面讲到可靠性时,分布式锁要满足第四个条件解铃还须系铃人,      通过给value赋值为requestId,我们就知道这把锁是哪个请求加的了,在解锁的时候就可以有依据。requestId可以使用UUID.randomUUID().toString()方法生成。
        //第三个为nxxx,这个参数我们填的是NX,意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作;
        //第四个为expx,这个参数我们传的是PX,意思是我们要给这个key加一个过期的设置,具体时间由第五个参数决定。
        //第五个为time,与第四个参数相呼应,代表key的过期时间。
        //总结:利用redis单个操作的原子性,将加锁操作放在一条命令中
        //1、String set(String key, String value)
        //2、String set(String key, String value, String nxxx)
        //3、String set(String key, String value, String nxxx, String expx, int time)
        //4、String set(String key, String value, String nxxx, String expx, long time)
        String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);

        if (LOCK_SUCCESS.equals(result)) {
            return true;
        }
        return false;

    }

}

关于分布式锁的各种问题可以参考这篇文章:Redis分布式锁的正确实现方式(Java版)

原文地址:https://www.cnblogs.com/alimayun/p/10896906.html

时间: 2024-10-13 15:27:05

redis教程(二)-----redis事务、记录日志到redis、分布式锁的相关文章

C#中使用Redis学习二 .NET4.5中使用redis hash操作

上一篇>> 摘要 上一篇讲述了安装redis客户端和服务器端,也大体地介绍了一下redis.本篇着重讲解.NET4.0 和 .NET4.5中如何使用redis和C# redis操作哈希表.并且会将封装的一些代码贴一下.在讲解的过程中,我打算结合redis操作命令一起叙述,算是作为对比吧.这样也能让读者清楚了解,所分装的代码对应的redis的哪一些操作命令. hash哈希表简介 如何在.NET4.0/4.5中安装redis组件? 在上一篇博文中,安装好的redis服务器端,要记得开启服务.然后再

Redis教程(八):事务详解

转载于:http://www.itxuexiwang.com/a/shujukujishu/redis/2016/0216/135.html?1455806987 一.概述: 和众多其它数据库一样,Redis作为NoSQL数据库也同样提供了事务机制.在Redis中,MULTI/EXEC/DISCARD/WATCH这四个命令是我们实现事务的基石.相信对有关系型数据库开发经验的开发者而言这一概念并不陌生,即便如此,我们还是会简要的列出Redis中事务的实现特征: 1). 在事务中的所有命令都将会被串

Redis教程(二):String数据类型

转载于:http://www.itxuexiwang.com/a/shujukujishu/redis/2016/0216/131.html?1455808279 一.概述: 字符串类型是Redis中最为基础的数据存储类型,它在Redis中是二进制安全的,这便意味着该类型可以接受任何格式的数据,如JPEG图像数据或Json对象描述信息等.在Redis中字符串类型的Value最多可以容纳的数据长度是512M. 二.相关命令列表: 命令原型 时间复杂度 命令描述 返回值 APPENDkeyvalue

Java之——redis并发读写锁,使用Redisson实现分布式锁

原文:http://blog.csdn.net/l1028386804/article/details/73523810 1. 可重入锁(Reentrant Lock) Redisson的分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口,同时还支持自动过期解锁. [java] view plain copy public void testReentrantLock(RedissonClient redisson){ RLock lo

Redis的Java客户端Jedis的八种调用方式(事务、管道、分布式)介绍

jedis是一个著名的key-value存储系统,而作为其官方推荐的java版客户端jedis也非常强大和稳定,支持事务.管道及有jedis自身实现的分布式. 在这里对jedis关于事务.管道和分布式的调用方式做一个简单的介绍和对比: 一.普通同步方式 最简单和基础的调用方式: 1 @Test 2 public void test1Normal() { 3 Jedis jedis = new Jedis("localhost"); 4 long start = System.curre

phpredis 中文手册和redis 教程

phpredis 中文手册  :   http://www.cnblogs.com/zcy_soft/archive/2012/09/21/2697006.html 手册: http://www.cnblogs.com/weafer/archive/2011/09/21/2184059.html redis教程:  http://www.yiibai.com/redis/redis_quick_guide.html

分布式锁的几种使用方式(redis、zookeeper、数据库)

Q:一个业务服务器,一个数据库,操作:查询用户当前余额,扣除当前余额的3%作为手续费synchronizedlockdb lockQ:两个业务服务器,一个数据库,操作:查询用户当前余额,扣除当前余额的3%作为手续费分布式锁我们需要怎么样的分布式锁?可以保证在分布式部署的应用集群中,同一个方法在同一时间只能被一台机器上的一个线程执行. 这把锁要是一把可重入锁(避免死锁) 这把锁最好是一把阻塞锁(根据业务需求考虑要不要这条) 这把锁最好是一把公平锁(根据业务需求考虑要不要这条) 有高可用的获取锁和释

jedisLock—redis分布式锁实现

一.使用分布式锁要满足的几个条件: 系统是一个分布式系统(关键是分布式,单机的可以使用ReentrantLock或者synchronized代码块来实现) 共享资源(各个系统访问同一个资源,资源的载体可能是传统关系型数据库或者NoSQL) 同步访问(即有很多个进程同事访问同一个共享资源.没有同步访问,谁管你资源竞争不竞争) 二.应用的场景例子 管理后台的部署架构(多台tomcat服务器+redis[多台tomcat服务器访问一台redis]+mysql[多台tomcat服务器访问一台服务器上的m

如何优雅地用Redis实现分布式锁

https://mp.weixin.qq.com/s?__biz=MzAxNjM2MTk0Ng==&mid=2247484976&idx=2&sn=a0b6771f0b4e471c710f8cd51c243971&chksm=9bf4b685ac833f936f3722a795ae202a3be37a3fb57332393e2eec3bbf8b34c4705d5b14a964&mpshare=1&scene=1&srcid=0919f7t2duWiu