事务
Jedis中事务的写法是将redis操作写在事物代码块中,如下所示,multi与exec之间为具体的事务。
jedis.watch (key1, key2, ...); Transaction t = jedis.multi(); t.set("foo", "bar"); t.exec();
另外,在事务内部,是不能通过Jedis对象去获取值的,不过可以通过Transaction对象去获取,如下写法:
package cn.edu.hdu.jedisdemo; import java.util.Set; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; import redis.clients.jedis.Response; import redis.clients.jedis.Transaction; public class App { public static void main(String[] args) { JedisPool pool = new JedisPool(new JedisPoolConfig(), "localhost"); // / Jedis implements Closable. Hence, the jedis instance will be // auto-closed after the last statement. try (Jedis jedis = pool.getResource()) { // / ... do stuff here ... for example Transaction t = jedis.multi(); t.set("fool", "bar"); //jedis.get("fool");//报错,Cannot use Jedis when in Multi. Please use Transation or reset jedis state. Response<String> result1 = t.get("fool"); t.zadd("foo", 1, "barowitch"); t.zadd("foo", 0, "barinsky"); t.zadd("foo", 0, "barikoviev"); Response<Set<String>> sose = t.zrange("foo", 0, -1); t.exec(); // dont forget it String foolbar = result1.get(); // use Response.get() to retrieve things from a Response int soseSize = sose.get().size(); // on sose.get() you can directly call Set methods! // List<Object> allResults = t.exec(); // you could still get all // results at once, as before System.out.println(foolbar); System.out.println(soseSize); } // / ... when closing your application: pool.close(); } }
注意:Response对象的get方法要在事务exec方法执行之后调用,在t.exec()方法执行之前,Response对象是不包含结果的
管道
有时候,我们需要发送多个命令,一个高效的方法是使用管道;
通过管道可以一次性发送多条命令并在执行完后一次性将结果返回,当一组命令中每条命令都不依赖之前命令的执行结果时就可以将这组命令一起通过管道发出;
管道可以减少客户端与redis的通信次数,提供性能。
如下示例:
Pipeline p = jedis.pipelined(); p.set("fool", "bar"); p.zadd("foo", 1, "barowitch"); p.zadd("foo", 0, "barinsky"); p.zadd("foo", 0, "barikoviev"); Response<String> pipeString = p.get("fool"); Response<Set<String>> sose = p.zrange("foo", 0, -1); p.sync(); int soseSize = sose.get().size(); Set<String> setBack = sose.get();
发布/订阅
订阅消息
先创建一个JedisPubSub对象,然后使用Jedis对象调用subscribe方法即可,该方法需要传入JedisPubSub对象;
MyListener listener = new MyListener(); //listener为JedisPubSub对象,监听类 jedis.subscribe(listener, "channel01");
发布消息
发布消息调用Jedis的publish方法即可;
jedis.publish(channel, message)
注意:subscribe方法是一个阻塞的操作,且发布和订阅不能使用同一个Jedis对象
一个完整的例子
监听事件类:
package cn.edu.hdu.jedisdemo; import redis.clients.jedis.JedisPubSub; class MyListener extends JedisPubSub { public void onMessage(String channel, String message) { System.out.println("get a msg: " + "channel=" + channel + ", message=" + message); } public void onSubscribe(String channel, int subscribedChannels) { System.out.println("channel:" + channel + ", subscribedChannels:" + subscribedChannels); } public void onUnsubscribe(String channel, int subscribedChannels) { System.out.println("channel:" + channel + ", subscribedChannels:" + subscribedChannels); } public void onPSubscribe(String pattern, int subscribedChannels) { System.out.println("pattern:" + pattern + ", subscribedChannels:" + subscribedChannels); } public void onPUnsubscribe(String pattern, int subscribedChannels) { System.out.println("pattern:" + pattern + ", subscribedChannels:" + subscribedChannels); } public void onPMessage(String pattern, String channel, String message) { System.out.println("pattern:" + pattern + ", channel:" + channel + ", message:" + message); } }
main方法:
package cn.edu.hdu.jedisdemo; import java.util.Date; import java.util.concurrent.TimeUnit; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; public class App { public static void main(String[] args) { final JedisPool pool = new JedisPool(new JedisPoolConfig(), "localhost"); //两个订阅者 new Thread(new Runnable() { @Override public void run() { Jedis jedis = pool.getResource(); MyListener listener = new MyListener(); jedis.subscribe(listener, "channel01"); } }).start(); new Thread(new Runnable() { @Override public void run() { Jedis jedis = pool.getResource(); MyListener listener = new MyListener(); //listener为JedisPubSub对象,监听类 jedis.subscribe(listener, "channel01"); } }).start(); //一个发布者 new Thread(new Runnable() { @Override public void run() { //注意,订阅使用的jedis对象与发布使用的jedis对象不能相同,否者运行时会报错 Jedis jedis = pool.getResource(); while(true){ try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } String msg = new Date().toLocaleString(); System.out.println("publish a msg:" + msg); jedis.publish("channel01", msg); } } }).start(); //pool.close(); } }
执行结果:
channel:channel01, subscribedChannels:1
channel:channel01, subscribedChannels:1
publish a msg:2016-7-20 10:03:42
get a msg: channel=channel01, message=2016-7-20 10:03:42
get a msg: channel=channel01, message=2016-7-20 10:03:42
publish a msg:2016-7-20 10:03:43
get a msg: channel=channel01, message=2016-7-20 10:03:43
get a msg: channel=channel01, message=2016-7-20 10:03:43
publish a msg:2016-7-20 10:03:44
get a msg: channel=channel01, message=2016-7-20 10:03:44
get a msg: channel=channel01, message=2016-7-20 10:03:44
分布式ShardedJedis
ShardedJedis里面采用了一致性哈希的算法,来决定每个key的保存位置,主要用于分摊服务器的压力。
举个简单的例子:
首先,开启三个redis服务器,分别使用不同的端口(拷贝三份,使用不同的配置文件即可):6379、6380、6381
然后,编写测试代码:
package cn.edu.hdu.jedisdemo; import java.util.Arrays; import java.util.List; import redis.clients.jedis.Client; import redis.clients.jedis.JedisPoolConfig; import redis.clients.jedis.JedisShardInfo; import redis.clients.jedis.ShardedJedis; import redis.clients.jedis.ShardedJedisPool; public class App { public static void main(String[] args) { // 设置连接池的相关配置 JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxTotal(2); poolConfig.setMaxIdle(1); poolConfig.setMaxWaitMillis(2000); poolConfig.setTestOnBorrow(false); poolConfig.setTestOnReturn(false); //设置Redis信息 String host = "127.0.0.1"; JedisShardInfo shardInfo1 = new JedisShardInfo(host, 6379, 500); JedisShardInfo shardInfo2 = new JedisShardInfo(host, 6380, 500); JedisShardInfo shardInfo3 = new JedisShardInfo(host, 6381, 500); //初始化ShardedJedisPool List<JedisShardInfo> infoList = Arrays.asList(shardInfo1, shardInfo2, shardInfo3); ShardedJedisPool jedisPool = new ShardedJedisPool(poolConfig, infoList); //一些基本操作 try(ShardedJedis jedis = jedisPool.getResource()) { jedis.set("test", "test"); String test = jedis.get("test"); System.out.println(test); } //查看 ShardedJedis jedis = jedisPool.getResource(); jedis.set("1111", "1111"); jedis.set("2222", "2222"); jedis.set("3333", "3333"); jedis.set("4444", "4444"); Client client1 = jedis.getShard("1111").getClient(); Client client2 = jedis.getShard("2222").getClient(); Client client3 = jedis.getShard("3333").getClient(); Client client4 = jedis.getShard("4444").getClient(); //打印key在哪个server中 System.out.println("1111 in server:" + client1.getHost() + " and port is:" + client1.getPort()); System.out.println("2222 in server:" + client2.getHost() + " and port is:" + client2.getPort()); System.out.println("3333 in server:" + client3.getHost() + " and port is:" + client3.getPort()); System.out.println("4444 in server:" + client4.getHost() + " and port is:" + client4.getPort()); jedis.close(); jedisPool.close(); } }
查看输出结果:
test
1111 in server:127.0.0.1 and port is:6380
2222 in server:127.0.0.1 and port is:6379
3333 in server:127.0.0.1 and port is:6381
4444 in server:127.0.0.1 and port is:6381
参考内容:
https://github.com/xetorthio/jedis/wiki/AdvancedUsage
http://www.cnblogs.com/coder2012/p/4401153.html
http://blog.csdn.net/lihao21/article/details/48370687
http://www.cnblogs.com/vhua/p/redis_1.html