Redis Java客户端jedis工具类以及Redis实现的跨jvm的锁

Redis Java客户端jedis工具类以及Redis实现的跨jvm的锁

    最近项目中使用redis,学习了一下,client端使用jedis-2.1.0

首先是一个redis实现的跨jvm的lock,

接着是一个简单封装的工具类,也对pipeline处理进行了几个常用的封装

然后是对应Spring的相关配置

Java代码  

  1. public class RedisLock {
  2. /** 加锁标志 */
  3. public static final String LOCKED = "TRUE";
  4. /** 毫秒与毫微秒的换算单位 1毫秒 = 1000000毫微秒 */
  5. public static final long MILLI_NANO_CONVERSION = 1000 * 1000L;
  6. /** 默认超时时间(毫秒) */
  7. public static final long DEFAULT_TIME_OUT = 1000;
  8. public static final Random RANDOM = new Random();
  9. /** 锁的超时时间(秒),过期删除 */
  10. public static final int EXPIRE = 3 * 60;
  11. private ShardedJedisPool shardedJedisPool;
  12. private ShardedJedis jedis;
  13. private String key;
  14. // 锁状态标志
  15. private boolean locked = false;
  16. /**
  17. * This creates a RedisLock
  18. * @param key key
  19. * @param shardedJedisPool 数据源
  20. */
  21. public RedisLock(String key, ShardedJedisPool shardedJedisPool) {
  22. this.key = key + "_lock";
  23. this.shardedJedisPool = shardedJedisPool;
  24. this.jedis = this.shardedJedisPool.getResource();
  25. }
  26. /**
  27. * 加锁
  28. * 应该以:
  29. * lock();
  30. * try {
  31. *      doSomething();
  32. * } finally {
  33. *      unlock();
  34. * }
  35. * 的方式调用
  36. * @param timeout 超时时间
  37. * @return 成功或失败标志
  38. */
  39. public boolean lock(long timeout) {
  40. long nano = System.nanoTime();
  41. timeout *= MILLI_NANO_CONVERSION;
  42. try {
  43. while ((System.nanoTime() - nano) < timeout) {
  44. if (this.jedis.setnx(this.key, LOCKED) == 1) {
  45. this.jedis.expire(this.key, EXPIRE);
  46. this.locked = true;
  47. return this.locked;
  48. }
  49. // 短暂休眠,避免出现活锁
  50. Thread.sleep(3, RANDOM.nextInt(500));
  51. }
  52. } catch (Exception e) {
  53. throw new RuntimeException("Locking error", e);
  54. }
  55. return false;
  56. }
  57. /**
  58. * 加锁
  59. * 应该以:
  60. * lock();
  61. * try {
  62. *      doSomething();
  63. * } finally {
  64. *      unlock();
  65. * }
  66. * 的方式调用
  67. * @param timeout 超时时间
  68. * @param expire 锁的超时时间(秒),过期删除
  69. * @return 成功或失败标志
  70. */
  71. public boolean lock(long timeout, int expire) {
  72. long nano = System.nanoTime();
  73. timeout *= MILLI_NANO_CONVERSION;
  74. try {
  75. while ((System.nanoTime() - nano) < timeout) {
  76. if (this.jedis.setnx(this.key, LOCKED) == 1) {
  77. this.jedis.expire(this.key, expire);
  78. this.locked = true;
  79. return this.locked;
  80. }
  81. // 短暂休眠,避免出现活锁
  82. Thread.sleep(3, RANDOM.nextInt(500));
  83. }
  84. } catch (Exception e) {
  85. throw new RuntimeException("Locking error", e);
  86. }
  87. return false;
  88. }
  89. /**
  90. * 加锁
  91. * 应该以:
  92. * lock();
  93. * try {
  94. *      doSomething();
  95. * } finally {
  96. *      unlock();
  97. * }
  98. * 的方式调用
  99. * @return 成功或失败标志
  100. */
  101. public boolean lock() {
  102. return lock(DEFAULT_TIME_OUT);
  103. }
  104. /**
  105. * 解锁
  106. * 无论是否加锁成功,都需要调用unlock
  107. * 应该以:
  108. * lock();
  109. * try {
  110. *      doSomething();
  111. * } finally {
  112. *      unlock();
  113. * }
  114. * 的方式调用
  115. */
  116. public void unlock() {
  117. try {
  118. if (this.locked) {
  119. this.jedis.del(this.key);
  120. }
  121. } finally {
  122. this.shardedJedisPool.returnResource(this.jedis);
  123. }
  124. }
  125. }

Java代码  

  1. /**
  2. * 内存数据库Redis的辅助类,负责对内存数据库的所有操作
  3. * @version V1.0
  4. * @author fengjc
  5. */
  6. public class RedisUtil {
  7. // 数据源
  8. private ShardedJedisPool shardedJedisPool;
  9. /**
  10. * 执行器,{@link com.futurefleet.framework.base.redis.RedisUtil}的辅助类,
  11. * 它保证在执行操作之后释放数据源returnResource(jedis)
  12. * @version V1.0
  13. * @author fengjc
  14. * @param <T>
  15. */
  16. abstract class Executor<T> {
  17. ShardedJedis jedis;
  18. ShardedJedisPool shardedJedisPool;
  19. public Executor(ShardedJedisPool shardedJedisPool) {
  20. this.shardedJedisPool = shardedJedisPool;
  21. jedis = this.shardedJedisPool.getResource();
  22. }
  23. /**
  24. * 回调
  25. * @return 执行结果
  26. */
  27. abstract T execute();
  28. /**
  29. * 调用{@link #execute()}并返回执行结果
  30. * 它保证在执行{@link #execute()}之后释放数据源returnResource(jedis)
  31. * @return 执行结果
  32. */
  33. public T getResult() {
  34. T result = null;
  35. try {
  36. result = execute();
  37. } catch (Throwable e) {
  38. throw new RuntimeException("Redis execute exception", e);
  39. } finally {
  40. if (jedis != null) {
  41. shardedJedisPool.returnResource(jedis);
  42. }
  43. }
  44. return result;
  45. }
  46. }
  47. /**
  48. * 删除模糊匹配的key
  49. * @param likeKey 模糊匹配的key
  50. * @return 删除成功的条数
  51. */
  52. public long delKeysLike(final String likeKey) {
  53. return new Executor<Long>(shardedJedisPool) {
  54. @Override
  55. Long execute() {
  56. Collection<Jedis> jedisC = jedis.getAllShards();
  57. Iterator<Jedis> iter = jedisC.iterator();
  58. long count = 0;
  59. while (iter.hasNext()) {
  60. Jedis _jedis = iter.next();
  61. Set<String> keys = _jedis.keys(likeKey + "*");
  62. count += _jedis.del(keys.toArray(new String[keys.size()]));
  63. }
  64. return count;
  65. }
  66. }.getResult();
  67. }
  68. /**
  69. * 删除
  70. * @param key 匹配的key
  71. * @return 删除成功的条数
  72. */
  73. public Long delKey(final String key) {
  74. return new Executor<Long>(shardedJedisPool) {
  75. @Override
  76. Long execute() {
  77. return jedis.del(key);
  78. }
  79. }.getResult();
  80. }
  81. /**
  82. * 删除
  83. * @param keys 匹配的key的集合
  84. * @return 删除成功的条数
  85. */
  86. public Long delKeys(final String[] keys) {
  87. return new Executor<Long>(shardedJedisPool) {
  88. @Override
  89. Long execute() {
  90. Collection<Jedis> jedisC = jedis.getAllShards();
  91. Iterator<Jedis> iter = jedisC.iterator();
  92. long count = 0;
  93. while (iter.hasNext()) {
  94. Jedis _jedis = iter.next();
  95. count += _jedis.del(keys);
  96. }
  97. return count;
  98. }
  99. }.getResult();
  100. }
  101. /**
  102. * 为给定 key 设置生存时间,当 key 过期时(生存时间为 0 ),它会被自动删除。
  103. * 在 Redis 中,带有生存时间的 key 被称为『可挥发』(volatile)的。
  104. * @param key key
  105. * @param expire 生命周期,单位为秒
  106. * @return 1: 设置成功 0: 已经超时或key不存在
  107. */
  108. public Long expire(final String key, final int expire) {
  109. return new Executor<Long>(shardedJedisPool) {
  110. @Override
  111. Long execute() {
  112. return jedis.expire(key, expire);
  113. }
  114. }.getResult();
  115. }
  116. /**
  117. * 一个跨jvm的id生成器,利用了redis原子性操作的特点
  118. * @param key id的key
  119. * @return 返回生成的Id
  120. */
  121. public long makeId(final String key) {
  122. return new Executor<Long>(shardedJedisPool) {
  123. @Override
  124. Long execute() {
  125. long id = jedis.incr(key);
  126. if ((id + 75807) >= Long.MAX_VALUE) {
  127. // 避免溢出,重置,getSet命令之前允许incr插队,75807就是预留的插队空间
  128. jedis.getSet(key, "0");
  129. }
  130. return id;
  131. }
  132. }.getResult();
  133. }
  134. /* ======================================Strings====================================== */
  135. /**
  136. * 将字符串值 value 关联到 key 。
  137. * 如果 key 已经持有其他值, setString 就覆写旧值,无视类型。
  138. * 对于某个原本带有生存时间(TTL)的键来说, 当 setString 成功在这个键上执行时, 这个键原有的 TTL 将被清除。
  139. * 时间复杂度:O(1)
  140. * @param key key
  141. * @param value string value
  142. * @return 在设置操作成功完成时,才返回 OK 。
  143. */
  144. public String setString(final String key, final String value) {
  145. return new Executor<String>(shardedJedisPool) {
  146. @Override
  147. String execute() {
  148. return jedis.set(key, value);
  149. }
  150. }.getResult();
  151. }
  152. /**
  153. * 将值 value 关联到 key ,并将 key 的生存时间设为 expire (以秒为单位)。
  154. * 如果 key 已经存在, 将覆写旧值。
  155. * 类似于以下两个命令:
  156. * SET key value
  157. * EXPIRE key expire # 设置生存时间
  158. * 不同之处是这个方法是一个原子性(atomic)操作,关联值和设置生存时间两个动作会在同一时间内完成,在 Redis 用作缓存时,非常实用。
  159. * 时间复杂度:O(1)
  160. * @param key key
  161. * @param value string value
  162. * @param expire 生命周期
  163. * @return 设置成功时返回 OK 。当 expire 参数不合法时,返回一个错误。
  164. */
  165. public String setString(final String key, final String value, final int expire) {
  166. return new Executor<String>(shardedJedisPool) {
  167. @Override
  168. String execute() {
  169. return jedis.setex(key, expire, value);
  170. }
  171. }.getResult();
  172. }
  173. /**
  174. * 将 key 的值设为 value ,当且仅当 key 不存在。若给定的 key 已经存在,则 setStringIfNotExists 不做任何动作。
  175. * 时间复杂度:O(1)
  176. * @param key key
  177. * @param value string value
  178. * @return 设置成功,返回 1 。设置失败,返回 0 。
  179. */
  180. public Long setStringIfNotExists(final String key, final String value) {
  181. return new Executor<Long>(shardedJedisPool) {
  182. @Override
  183. Long execute() {
  184. return jedis.setnx(key, value);
  185. }
  186. }.getResult();
  187. }
  188. /**
  189. * 返回 key 所关联的字符串值。如果 key 不存在那么返回特殊值 nil 。
  190. * 假如 key 储存的值不是字符串类型,返回一个错误,因为 getString 只能用于处理字符串值。
  191. * 时间复杂度: O(1)
  192. * @param key key
  193. * @return 当 key 不存在时,返回 nil ,否则,返回 key 的值。如果 key 不是字符串类型,那么返回一个错误。
  194. */
  195. public String getString(final String key) {
  196. return new Executor<String>(shardedJedisPool) {
  197. @Override
  198. String execute() {
  199. return jedis.get(key);
  200. }
  201. }.getResult();
  202. }
  203. /**
  204. * 批量的 {@link #setString(String, String)}
  205. * @param pairs 键值对数组{数组第一个元素为key,第二个元素为value}
  206. * @return 操作状态的集合
  207. */
  208. public List<Object> batchSetString(final List<Pair<String, String>> pairs) {
  209. return new Executor<List<Object>>(shardedJedisPool) {
  210. @Override
  211. List<Object> execute() {
  212. ShardedJedisPipeline pipeline = jedis.pipelined();
  213. for (Pair<String, String> pair : pairs) {
  214. pipeline.set(pair.getKey(), pair.getValue());
  215. }
  216. return pipeline.syncAndReturnAll();
  217. }
  218. }.getResult();
  219. }
  220. /**
  221. * 批量的 {@link #getString(String)}
  222. * @param keys key数组
  223. * @return value的集合
  224. */
  225. public List<String> batchGetString(final String[] keys) {
  226. return new Executor<List<String>>(shardedJedisPool) {
  227. @Override
  228. List<String> execute() {
  229. ShardedJedisPipeline pipeline = jedis.pipelined();
  230. List<String> result = new ArrayList<String>(keys.length);
  231. List<Response<String>> responses = new ArrayList<Response<String>>(keys.length);
  232. for (String key : keys) {
  233. responses.add(pipeline.get(key));
  234. }
  235. pipeline.sync();
  236. for (Response<String> resp : responses) {
  237. result.add(resp.get());
  238. }
  239. return result;
  240. }
  241. }.getResult();
  242. }
  243. /* ======================================Hashes====================================== */
  244. /**
  245. * 将哈希表 key 中的域 field 的值设为 value 。
  246. * 如果 key 不存在,一个新的哈希表被创建并进行 hashSet 操作。
  247. * 如果域 field 已经存在于哈希表中,旧值将被覆盖。
  248. * 时间复杂度: O(1)
  249. * @param key key
  250. * @param field 域
  251. * @param value string value
  252. * @return 如果 field 是哈希表中的一个新建域,并且值设置成功,返回 1 。如果哈希表中域 field 已经存在且旧值已被新值覆盖,返回 0 。
  253. */
  254. public Long hashSet(final String key, final String field, final String value) {
  255. return new Executor<Long>(shardedJedisPool) {
  256. @Override
  257. Long execute() {
  258. return jedis.hset(key, field, value);
  259. }
  260. }.getResult();
  261. }
  262. /**
  263. * 将哈希表 key 中的域 field 的值设为 value 。
  264. * 如果 key 不存在,一个新的哈希表被创建并进行 hashSet 操作。
  265. * 如果域 field 已经存在于哈希表中,旧值将被覆盖。
  266. * @param key key
  267. * @param field 域
  268. * @param value string value
  269. * @param expire 生命周期,单位为秒
  270. * @return 如果 field 是哈希表中的一个新建域,并且值设置成功,返回 1 。如果哈希表中域 field 已经存在且旧值已被新值覆盖,返回 0 。
  271. */
  272. public Long hashSet(final String key, final String field, final String value, final int expire) {
  273. return new Executor<Long>(shardedJedisPool) {
  274. @Override
  275. Long execute() {
  276. Pipeline pipeline = jedis.getShard(key).pipelined();
  277. Response<Long> result = pipeline.hset(key, field, value);
  278. pipeline.expire(key, expire);
  279. pipeline.sync();
  280. return result.get();
  281. }
  282. }.getResult();
  283. }
  284. /**
  285. * 返回哈希表 key 中给定域 field 的值。
  286. * 时间复杂度:O(1)
  287. * @param key key
  288. * @param field 域
  289. * @return 给定域的值。当给定域不存在或是给定 key 不存在时,返回 nil 。
  290. */
  291. public String hashGet(final String key, final String field) {
  292. return new Executor<String>(shardedJedisPool) {
  293. @Override
  294. String execute() {
  295. return jedis.hget(key, field);
  296. }
  297. }.getResult();
  298. }
  299. /**
  300. * 返回哈希表 key 中给定域 field 的值。 如果哈希表 key 存在,同时设置这个 key 的生存时间
  301. * @param key key
  302. * @param field 域
  303. * @param expire 生命周期,单位为秒
  304. * @return 给定域的值。当给定域不存在或是给定 key 不存在时,返回 nil 。
  305. */
  306. public String hashGet(final String key, final String field, final int expire) {
  307. return new Executor<String>(shardedJedisPool) {
  308. @Override
  309. String execute() {
  310. Pipeline pipeline = jedis.getShard(key).pipelined();
  311. Response<String> result = pipeline.hget(key, field);
  312. pipeline.expire(key, expire);
  313. pipeline.sync();
  314. return result.get();
  315. }
  316. }.getResult();
  317. }
  318. /**
  319. * 同时将多个 field-value (域-值)对设置到哈希表 key 中。
  320. * 时间复杂度: O(N) (N为fields的数量)
  321. * @param key key
  322. * @param hash field-value的map
  323. * @return 如果命令执行成功,返回 OK 。当 key 不是哈希表(hash)类型时,返回一个错误。
  324. */
  325. public String hashMultipleSet(final String key, final Map<String, String> hash) {
  326. return new Executor<String>(shardedJedisPool) {
  327. @Override
  328. String execute() {
  329. return jedis.hmset(key, hash);
  330. }
  331. }.getResult();
  332. }
  333. /**
  334. * 同时将多个 field-value (域-值)对设置到哈希表 key 中。同时设置这个 key 的生存时间
  335. * @param key key
  336. * @param hash field-value的map
  337. * @param expire 生命周期,单位为秒
  338. * @return 如果命令执行成功,返回 OK 。当 key 不是哈希表(hash)类型时,返回一个错误。
  339. */
  340. public String hashMultipleSet(final String key, final Map<String, String> hash, final int expire) {
  341. return new Executor<String>(shardedJedisPool) {
  342. @Override
  343. String execute() {
  344. Pipeline pipeline = jedis.getShard(key).pipelined();
  345. Response<String> result = pipeline.hmset(key, hash);
  346. pipeline.expire(key, expire);
  347. pipeline.sync();
  348. return result.get();
  349. }
  350. }.getResult();
  351. }
  352. /**
  353. * 返回哈希表 key 中,一个或多个给定域的值。如果给定的域不存在于哈希表,那么返回一个 nil 值。
  354. * 时间复杂度: O(N) (N为fields的数量)
  355. * @param key key
  356. * @param fields field的数组
  357. * @return 一个包含多个给定域的关联值的表,表值的排列顺序和给定域参数的请求顺序一样。
  358. */
  359. public List<String> hashMultipleGet(final String key, final String... fields) {
  360. return new Executor<List<String>>(shardedJedisPool) {
  361. @Override
  362. List<String> execute() {
  363. return jedis.hmget(key, fields);
  364. }
  365. }.getResult();
  366. }
  367. /**
  368. * 返回哈希表 key 中,一个或多个给定域的值。如果给定的域不存在于哈希表,那么返回一个 nil 值。
  369. * 同时设置这个 key 的生存时间
  370. * @param key key
  371. * @param fields field的数组
  372. * @param expire 生命周期,单位为秒
  373. * @return 一个包含多个给定域的关联值的表,表值的排列顺序和给定域参数的请求顺序一样。
  374. */
  375. public List<String> hashMultipleGet(final String key, final int expire, final String... fields) {
  376. return new Executor<List<String>>(shardedJedisPool) {
  377. @Override
  378. List<String> execute() {
  379. Pipeline pipeline = jedis.getShard(key).pipelined();
  380. Response<List<String>> result = pipeline.hmget(key, fields);
  381. pipeline.expire(key, expire);
  382. pipeline.sync();
  383. return result.get();
  384. }
  385. }.getResult();
  386. }
  387. /**
  388. * 批量的{@link #hashMultipleSet(String, Map)},在管道中执行
  389. * @param pairs 多个hash的多个field
  390. * @return 操作状态的集合
  391. */
  392. public List<Object> batchHashMultipleSet(final List<Pair<String, Map<String, String>>> pairs) {
  393. return new Executor<List<Object>>(shardedJedisPool) {
  394. @Override
  395. List<Object> execute() {
  396. ShardedJedisPipeline pipeline = jedis.pipelined();
  397. for (Pair<String, Map<String, String>> pair : pairs) {
  398. pipeline.hmset(pair.getKey(), pair.getValue());
  399. }
  400. return pipeline.syncAndReturnAll();
  401. }
  402. }.getResult();
  403. }
  404. /**
  405. * 批量的{@link #hashMultipleSet(String, Map)},在管道中执行
  406. * @param data Map<String, Map<String, String>>格式的数据
  407. * @return 操作状态的集合
  408. */
  409. public List<Object> batchHashMultipleSet(final Map<String, Map<String, String>> data) {
  410. return new Executor<List<Object>>(shardedJedisPool) {
  411. @Override
  412. List<Object> execute() {
  413. ShardedJedisPipeline pipeline = jedis.pipelined();
  414. for (Map.Entry<String, Map<String, String>> iter : data.entrySet()) {
  415. pipeline.hmset(iter.getKey(), iter.getValue());
  416. }
  417. return pipeline.syncAndReturnAll();
  418. }
  419. }.getResult();
  420. }
  421. /**
  422. * 批量的{@link #hashMultipleGet(String, String...)},在管道中执行
  423. * @param pairs 多个hash的多个field
  424. * @return 执行结果的集合
  425. */
  426. public List<List<String>> batchHashMultipleGet(final List<Pair<String, String[]>> pairs) {
  427. return new Executor<List<List<String>>>(shardedJedisPool) {
  428. @Override
  429. List<List<String>> execute() {
  430. ShardedJedisPipeline pipeline = jedis.pipelined();
  431. List<List<String>> result = new ArrayList<List<String>>(pairs.size());
  432. List<Response<List<String>>> responses = new ArrayList<Response<List<String>>>(pairs.size());
  433. for (Pair<String, String[]> pair : pairs) {
  434. responses.add(pipeline.hmget(pair.getKey(), pair.getValue()));
  435. }
  436. pipeline.sync();
  437. for (Response<List<String>> resp : responses) {
  438. result.add(resp.get());
  439. }
  440. return result;
  441. }
  442. }.getResult();
  443. }
  444. /**
  445. * 返回哈希表 key 中,所有的域和值。在返回值里,紧跟每个域名(field name)之后是域的值(value),所以返回值的长度是哈希表大小的两倍。
  446. * 时间复杂度: O(N)
  447. * @param key key
  448. * @return 以列表形式返回哈希表的域和域的值。若 key 不存在,返回空列表。
  449. */
  450. public Map<String, String> hashGetAll(final String key) {
  451. return new Executor<Map<String, String>>(shardedJedisPool) {
  452. @Override
  453. Map<String, String> execute() {
  454. return jedis.hgetAll(key);
  455. }
  456. }.getResult();
  457. }
  458. /**
  459. * 返回哈希表 key 中,所有的域和值。在返回值里,紧跟每个域名(field name)之后是域的值(value),所以返回值的长度是哈希表大小的两倍。
  460. * 同时设置这个 key 的生存时间
  461. * @param key key
  462. * @param expire 生命周期,单位为秒
  463. * @return 以列表形式返回哈希表的域和域的值。若 key 不存在,返回空列表。
  464. */
  465. public Map<String, String> hashGetAll(final String key, final int expire) {
  466. return new Executor<Map<String, String>>(shardedJedisPool) {
  467. @Override
  468. Map<String, String> execute() {
  469. Pipeline pipeline = jedis.getShard(key).pipelined();
  470. Response<Map<String, String>> result = pipeline.hgetAll(key);
  471. pipeline.expire(key, expire);
  472. pipeline.sync();
  473. return result.get();
  474. }
  475. }.getResult();
  476. }
  477. /**
  478. * 批量的{@link #hashGetAll(String)}
  479. * @param keys key的数组
  480. * @return 执行结果的集合
  481. */
  482. public List<Map<String, String>> batchHashGetAll(final String... keys) {
  483. return new Executor<List<Map<String, String>>>(shardedJedisPool) {
  484. @Override
  485. List<Map<String, String>> execute() {
  486. ShardedJedisPipeline pipeline = jedis.pipelined();
  487. List<Map<String, String>> result = new ArrayList<Map<String, String>>(keys.length);
  488. List<Response<Map<String, String>>> responses = new ArrayList<Response<Map<String, String>>>(keys.length);
  489. for (String key : keys) {
  490. responses.add(pipeline.hgetAll(key));
  491. }
  492. pipeline.sync();
  493. for (Response<Map<String, String>> resp : responses) {
  494. result.add(resp.get());
  495. }
  496. return result;
  497. }
  498. }.getResult();
  499. }
  500. /**
  501. * 批量的{@link #hashMultipleGet(String, String...)},与{@link #batchHashGetAll(String...)}不同的是,返回值为Map类型
  502. * @param keys key的数组
  503. * @return 多个hash的所有filed和value
  504. */
  505. public Map<String, Map<String, String>> batchHashGetAllForMap(final String... keys) {
  506. return new Executor<Map<String, Map<String, String>>>(shardedJedisPool) {
  507. @Override
  508. Map<String, Map<String, String>> execute() {
  509. ShardedJedisPipeline pipeline = jedis.pipelined();
  510. // 设置map容量防止rehash
  511. int capacity = 1;
  512. while ((int) (capacity * 0.75) <= keys.length) {
  513. capacity <<= 1;
  514. }
  515. Map<String, Map<String, String>> result = new HashMap<String, Map<String, String>>(capacity);
  516. List<Response<Map<String, String>>> responses = new ArrayList<Response<Map<String, String>>>(keys.length);
  517. for (String key : keys) {
  518. responses.add(pipeline.hgetAll(key));
  519. }
  520. pipeline.sync();
  521. for (int i = 0; i < keys.length; ++i) {
  522. result.put(keys[i], responses.get(i).get());
  523. }
  524. return result;
  525. }
  526. }.getResult();
  527. }
  528. /* ======================================List====================================== */
  529. /**
  530. * 将一个或多个值 value 插入到列表 key 的表尾(最右边)。
  531. * @param key key
  532. * @param values value的数组
  533. * @return 执行 listPushTail 操作后,表的长度
  534. */
  535. public Long listPushTail(final String key, final String... values) {
  536. return new Executor<Long>(shardedJedisPool) {
  537. @Override
  538. Long execute() {
  539. return jedis.rpush(key, values);
  540. }
  541. }.getResult();
  542. }
  543. /**
  544. * 将一个或多个值 value 插入到列表 key 的表头
  545. * @param key key
  546. * @param value string value
  547. * @return 执行 listPushHead 命令后,列表的长度。
  548. */
  549. public Long listPushHead(final String key, final String value) {
  550. return new Executor<Long>(shardedJedisPool) {
  551. @Override
  552. Long execute() {
  553. return jedis.lpush(key, value);
  554. }
  555. }.getResult();
  556. }
  557. /**
  558. * 将一个或多个值 value 插入到列表 key 的表头, 当列表大于指定长度是就对列表进行修剪(trim)
  559. * @param key key
  560. * @param value string value
  561. * @param size 链表超过这个长度就修剪元素
  562. * @return 执行 listPushHeadAndTrim 命令后,列表的长度。
  563. */
  564. public Long listPushHeadAndTrim(final String key, final String value, final long size) {
  565. return new Executor<Long>(shardedJedisPool) {
  566. @Override
  567. Long execute() {
  568. Pipeline pipeline = jedis.getShard(key).pipelined();
  569. Response<Long> result = pipeline.lpush(key, value);
  570. // 修剪列表元素, 如果 size - 1 比 end 下标还要大,Redis将 size 的值设置为 end 。
  571. pipeline.ltrim(key, 0, size - 1);
  572. pipeline.sync();
  573. return result.get();
  574. }
  575. }.getResult();
  576. }
  577. /**
  578. * 批量的{@link #listPushTail(String, String...)},以锁的方式实现
  579. * @param key key
  580. * @param values value的数组
  581. * @param delOld 如果key存在,是否删除它。true 删除;false: 不删除,只是在行尾追加
  582. */
  583. public void batchListPushTail(final String key, final String[] values, final boolean delOld) {
  584. new Executor<Object>(shardedJedisPool) {
  585. @Override
  586. Object execute() {
  587. if (delOld) {
  588. RedisLock lock = new RedisLock(key, shardedJedisPool);
  589. lock.lock();
  590. try {
  591. Pipeline pipeline = jedis.getShard(key).pipelined();
  592. pipeline.del(key);
  593. for (String value : values) {
  594. pipeline.rpush(key, value);
  595. }
  596. pipeline.sync();
  597. } finally {
  598. lock.unlock();
  599. }
  600. } else {
  601. jedis.rpush(key, values);
  602. }
  603. return null;
  604. }
  605. }.getResult();
  606. }
  607. /**
  608. * 同{@link #batchListPushTail(String, String[], boolean)},不同的是利用redis的事务特性来实现
  609. * @param key key
  610. * @param values value的数组
  611. * @return null
  612. */
  613. public Object updateListInTransaction(final String key, final List<String> values) {
  614. return new Executor<Object>(shardedJedisPool) {
  615. @Override
  616. Object execute() {
  617. Transaction transaction = jedis.getShard(key).multi();
  618. transaction.del(key);
  619. for (String value : values) {
  620. transaction.rpush(key, value);
  621. }
  622. transaction.exec();
  623. return null;
  624. }
  625. }.getResult();
  626. }
  627. /**
  628. * 在key对应list的尾部部添加字符串元素,如果key存在,什么也不做
  629. * @param key key
  630. * @param values value的数组
  631. * @return 执行insertListIfNotExists后,表的长度
  632. */
  633. public Long insertListIfNotExists(final String key, final String[] values) {
  634. return new Executor<Long>(shardedJedisPool) {
  635. @Override
  636. Long execute() {
  637. RedisLock lock = new RedisLock(key, shardedJedisPool);
  638. lock.lock();
  639. try {
  640. if (!jedis.exists(key)) {
  641. return jedis.rpush(key, values);
  642. }
  643. } finally {
  644. lock.unlock();
  645. }
  646. return 0L;
  647. }
  648. }.getResult();
  649. }
  650. /**
  651. * 返回list所有元素,下标从0开始,负值表示从后面计算,-1表示倒数第一个元素,key不存在返回空列表
  652. * @param key key
  653. * @return list所有元素
  654. */
  655. public List<String> listGetAll(final String key) {
  656. return new Executor<List<String>>(shardedJedisPool) {
  657. @Override
  658. List<String> execute() {
  659. return jedis.lrange(key, 0, -1);
  660. }
  661. }.getResult();
  662. }
  663. /**
  664. * 返回指定区间内的元素,下标从0开始,负值表示从后面计算,-1表示倒数第一个元素,key不存在返回空列表
  665. * @param key key
  666. * @param beginIndex 下标开始索引(包含)
  667. * @param endIndex 下标结束索引(不包含)
  668. * @return 指定区间内的元素
  669. */
  670. public List<String> listRange(final String key, final long beginIndex, final long endIndex) {
  671. return new Executor<List<String>>(shardedJedisPool) {
  672. @Override
  673. List<String> execute() {
  674. return jedis.lrange(key, beginIndex, endIndex - 1);
  675. }
  676. }.getResult();
  677. }
  678. /**
  679. * 一次获得多个链表的数据
  680. * @param keys key的数组
  681. * @return 执行结果
  682. */
  683. public Map<String, List<String>> batchGetAllList(final List<String> keys) {
  684. return new Executor<Map<String, List<String>>>(shardedJedisPool) {
  685. @Override
  686. Map<String, List<String>> execute() {
  687. ShardedJedisPipeline pipeline = jedis.pipelined();
  688. Map<String, List<String>> result = new HashMap<String, List<String>>();
  689. List<Response<List<String>>> responses = new ArrayList<Response<List<String>>>(keys.size());
  690. for (String key : keys) {
  691. responses.add(pipeline.lrange(key, 0, -1));
  692. }
  693. pipeline.sync();
  694. for (int i = 0; i < keys.size(); ++i) {
  695. result.put(keys.get(i), responses.get(i).get());
  696. }
  697. return result;
  698. }
  699. }.getResult();
  700. }
  701. /* ======================================Pub/Sub====================================== */
  702. /**
  703. * 将信息 message 发送到指定的频道 channel。
  704. * 时间复杂度:O(N+M),其中 N 是频道 channel 的订阅者数量,而 M 则是使用模式订阅(subscribed patterns)的客户端的数量。
  705. * @param channel 频道
  706. * @param message 信息
  707. * @return 接收到信息 message 的订阅者数量。
  708. */
  709. public Long publish(final String channel, final String message) {
  710. return new Executor<Long>(shardedJedisPool) {
  711. @Override
  712. Long execute() {
  713. Jedis _jedis = jedis.getShard(channel);
  714. return _jedis.publish(channel, message);
  715. }
  716. }.getResult();
  717. }
  718. /**
  719. * 订阅给定的一个频道的信息。
  720. * @param jedisPubSub 监听器
  721. * @param channel 频道
  722. */
  723. public void subscribe(final JedisPubSub jedisPubSub, final String channel) {
  724. new Executor<Object>(shardedJedisPool) {
  725. @Override
  726. Object execute() {
  727. Jedis _jedis = jedis.getShard(channel);
  728. // 注意subscribe是一个阻塞操作,因为当前线程要轮询Redis的响应然后调用subscribe
  729. _jedis.subscribe(jedisPubSub, channel);
  730. return null;
  731. }
  732. }.getResult();
  733. }
  734. /**
  735. * 取消订阅
  736. * @param jedisPubSub 监听器
  737. */
  738. public void unSubscribe(final JedisPubSub jedisPubSub) {
  739. jedisPubSub.unsubscribe();
  740. }
  741. /* ======================================Sorted set================================= */
  742. /**
  743. * 将一个 member 元素及其 score 值加入到有序集 key 当中。
  744. * @param key key
  745. * @param score score 值可以是整数值或双精度浮点数。
  746. * @param member 有序集的成员
  747. * @return 被成功添加的新成员的数量,不包括那些被更新的、已经存在的成员。
  748. */
  749. public Long addWithSortedSet(final String key, final double score, final String member) {
  750. return new Executor<Long>(shardedJedisPool) {
  751. @Override
  752. Long execute() {
  753. return jedis.zadd(key, score, member);
  754. }
  755. }.getResult();
  756. }
  757. /**
  758. * 将多个 member 元素及其 score 值加入到有序集 key 当中。
  759. * @param key key
  760. * @param scoreMembers score、member的pair
  761. * @return 被成功添加的新成员的数量,不包括那些被更新的、已经存在的成员。
  762. */
  763. public Long addWithSortedSet(final String key, final Map<Double, String> scoreMembers) {
  764. return new Executor<Long>(shardedJedisPool) {
  765. @Override
  766. Long execute() {
  767. return jedis.zadd(key, scoreMembers);
  768. }
  769. }.getResult();
  770. }
  771. /**
  772. * 返回有序集 key 中, score 值介于 max 和 min 之间(默认包括等于 max 或 min )的所有的成员。
  773. * 有序集成员按 score 值递减(从大到小)的次序排列。
  774. * @param key key
  775. * @param max score最大值
  776. * @param min score最小值
  777. * @return 指定区间内,带有 score 值(可选)的有序集成员的列表
  778. */
  779. public Set<String> revrangeByScoreWithSortedSet(final String key, final double max, final double min) {
  780. return new Executor<Set<String>>(shardedJedisPool) {
  781. @Override
  782. Set<String> execute() {
  783. return jedis.zrevrangeByScore(key, max, min);
  784. }
  785. }.getResult();
  786. }
  787. /* ======================================Other====================================== */
  788. /**
  789. * 设置数据源
  790. * @param shardedJedisPool 数据源
  791. */
  792. public void setShardedJedisPool(ShardedJedisPool shardedJedisPool) {
  793. this.shardedJedisPool = shardedJedisPool;
  794. }
  795. /**
  796. * 构造Pair键值对
  797. * @param key key
  798. * @param value value
  799. * @return 键值对
  800. */
  801. public <K, V> Pair<K, V> makePair(K key, V value) {
  802. return new Pair<K, V>(key, value);
  803. }
  804. /**
  805. * 键值对
  806. * @version V1.0
  807. * @author fengjc
  808. * @param <K> key
  809. * @param <V> value
  810. */
  811. public class Pair<K, V> {
  812. private K key;
  813. private V value;
  814. public Pair(K key, V value) {
  815. this.key = key;
  816. this.value = value;
  817. }
  818. public K getKey() {
  819. return key;
  820. }
  821. public void setKey(K key) {
  822. this.key = key;
  823. }
  824. public V getValue() {
  825. return value;
  826. }
  827. public void setValue(V value) {
  828. this.value = value;
  829. }
  830. }
  831. }

Spring配置文件:

Java代码  

  1. <?xml version="1.0" encoding="UTF-8" ?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
  4. xmlns:context="http://www.springframework.org/schema/context"
  5. xsi:schemaLocation="http://www.springframework.org/schema/beans
  6. http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
  7. http://www.springframework.org/schema/context
  8. http://www.springframework.org/schema/context/spring-context-3.0.xsd">
  9. <!-- POOL配置 -->
  10. <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
  11. <property name="maxActive" value="${redis.jedisPoolConfig.maxActive}" />
  12. <property name="maxIdle" value="${redis.jedisPoolConfig.maxIdle}" />
  13. <property name="maxWait" value="${redis.jedisPoolConfig.maxWait}" />
  14. <property name="testOnBorrow" value="${redis.jedisPoolConfig.testOnBorrow}" />
  15. </bean>
  16. <!-- jedis shard信息配置 -->
  17. <bean id="jedis.shardInfoCache1" class="redis.clients.jedis.JedisShardInfo">
  18. <constructor-arg index="0" value="${redis.jedis.shardInfoCache1.host}" />
  19. <constructor-arg index="1"  type="int" value="${redis.jedis.shardInfoCache1.port}" />
  20. </bean>
  21. <bean id="jedis.shardInfoCache2" class="redis.clients.jedis.JedisShardInfo">
  22. <constructor-arg index="0" value="${redis.jedis.shardInfoCache2.host}" />
  23. <constructor-arg index="1"  type="int" value="${redis.jedis.shardInfoCache2.port}" />
  24. </bean>
  25. <!-- jedis shard pool配置 -->
  26. <bean id="shardedJedisPoolCache" class="redis.clients.jedis.ShardedJedisPool">
  27. <constructor-arg index="0" ref="jedisPoolConfig" />
  28. <constructor-arg index="1">
  29. <list>
  30. <ref bean="jedis.shardInfoCache1" />
  31. <ref bean="jedis.shardInfoCache2" />
  32. </list>
  33. </constructor-arg>
  34. </bean>
  35. <bean id="redisCache" class="com.**.RedisUtil">
  36. <property name="shardedJedisPool" ref="shardedJedisPoolCache" />
  37. </bean>
  38. </beans>
时间: 2024-12-26 02:04:25

Redis Java客户端jedis工具类以及Redis实现的跨jvm的锁的相关文章

Java Redis 连接池 Jedis 工具类

import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; import java.io.IOException; import java.io.InputStream; import java.util.Prop

Redis java客户端 jedis 源码分析系列二:单实例 jedis

在使用Jedis的过程中最简单的用法就是单实例单连接的jedis,如下代码所示: public void testJedis(){ Jedis jedis = new Jedis("127.0.0.1"); jedis.set("key", "value"); jedis.get("key"); jedis.close(); } 让我们深入到内部去看一看其结构,如下图所示: 此处请先忽略 JedisPool 类和 Pool&l

redis java客户端Jedis 实现 连接池 + 简单的负载均衡

1.下载 redis_win_2.6.13.zip 安装包 下载地址:大家去百度吧 2.redis_win_2.6.13.zip 安装包解压缩后,进入redis-server.exe所在目录 在此目录中,新建一个配置文件:redis01.conf[此处文件名字,并不固定],文件内容如下: #是否以后台进程运行 daemonize yes   #指定后台进程的pid文件写入位置 pidfile /var/run/redis.pid   #监听端口,默认为6379 port 6379   #只接受以

redis的java客户端Jedis简单封装

经过我们团队的一番讨论,最终决定使用redis来进行我们的业务缓存.redis会将数据缓存到内存中,运行效率会很快.同时异步将数据写入到磁盘中,进行持久化. 且redis支持主从同步,支持分布式部署,支持N多数据结构,这对于我们有着莫大的吸引力. 参见:http://blog.csdn.net/yichenlian/article/details/27207383 我们团队讨论的焦点是在于redis的灾备恢复问题.由于redis的持久化是异步的,总会有一点时间内存中数据和磁盘数据不同步的情况(当

Java客户端Jedis

使用Jedis的Java客户端 maven依赖 <!-- jedis --> <dependency> <groupid>redis.clients</groupid> jedis</artifactid> <version>2.9.0</version> </dependency> <!-- fastjson --> <dependency> <groupid>com.al

UrlUtils工具类,Java URL工具类,Java URL链接工具类

UrlUtils工具类,Java URL工具类,Java URL链接工具类 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ?Copyright 蕃薯耀 2017年7月15日 http://www.cnblogs.com/fanshuyao/ Java代码   import java.util.Ha

Java 通过Xml导出Excel文件,Java Excel 导出工具类,Java导出Excel工具类

Java 通过Xml导出Excel文件,Java Excel 导出工具类,Java导出Excel工具类 ============================== ?Copyright 蕃薯耀 2017年9月13日 http://www.cnblogs.com/fanshuyao/ 直接上代码: import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.ref

[精品] 收集的27个java开发常用工具类.基本满足开发需求

原文:[精品] 收集的27个java开发常用工具类.基本满足开发需求 源代码下载地址:http://www.zuidaima.com/share/1596028005993472.htm 最近从网上收集的java开发常用的工具类,分享给大家.基本满足开发需求.推荐给热爱最代码以及java的牛牛们.   每个类都有注释的,欢迎大家可以下载使用. 字符编码:CharTools, base64:Base64 *.java Md5加密:  MD5*.java 上传:*Uploader* 生成缩略图类:T

java MD5数据加密工具类

package com.wetuo.util; import java.security.MessageDigest; /**  * 数据加密工具类  * @author wzp  *  */ public class DataUtil { public static String md5(String str) { StringBuffer buffer = new StringBuffer(); char[] chars = { '0', '1', '2', '3', '4', '5', '