@Test public void testDiscoverNodesAutomatically(){ Set<HostAndPort> jedisClusterNode=new HashSet<HostAndPort>(); jedisClusterNode.add(new HostAndPort("127.0.0.1",7379)); JedisCluster jc=new JedisCluster(jedisClusterNode); assertEquals(3,jc.getClusterNodes().size()); }
@Test public void testCalculateConnectionPerSlot(){ Set<HostAndPort> jedisClusterNode=new HashSet<HostAndPort>(); jedisClusterNode.add(new HostAndPort("127.0.0.1",7379)); JedisCluster jc=new JedisCluster(jedisClusterNode); jc.set("foo","bar"); jc.set("test","test"); assertEquals("bar",node3.get("foo")); assertEquals("test",node2.get("test")); }
@Test public void testRecalculateSlotsWhenMoved() throws InterruptedException { Set<HostAndPort> jedisClusterNode=new HashSet<HostAndPort>(); jedisClusterNode.add(new HostAndPort("127.0.0.1",7379)); JedisCluster jc=new JedisCluster(jedisClusterNode); int slot51=JedisClusterCRC16.getSlot("51"); node2.clusterDelSlots(slot51); node3.clusterDelSlots(slot51); node3.clusterAddSlots(slot51); JedisClusterTestUtil.waitForClusterReady(node1,node2,node3); jc.set("51","foo"); assertEquals("foo",jc.get("51")); }
@Test public void testAskResponse() throws InterruptedException { Set<HostAndPort> jedisClusterNode=new HashSet<HostAndPort>(); jedisClusterNode.add(new HostAndPort("127.0.0.1",7379)); JedisCluster jc=new JedisCluster(jedisClusterNode); int slot51=JedisClusterCRC16.getSlot("51"); node3.clusterSetSlotImporting(slot51,JedisClusterTestUtil.getNodeId(node2.clusterNodes())); node2.clusterSetSlotMigrating(slot51,JedisClusterTestUtil.getNodeId(node3.clusterNodes())); jc.set("51","foo"); assertEquals("foo",jc.get("51")); }
@Test(expected=JedisClusterException.class) public void testThrowExceptionWithoutKey(){ Set<HostAndPort> jedisClusterNode=new HashSet<HostAndPort>(); jedisClusterNode.add(new HostAndPort("127.0.0.1",7379)); JedisCluster jc=new JedisCluster(jedisClusterNode); jc.ping(); }
@Test(expected=JedisClusterMaxRedirectionsException.class) public void testRedisClusterMaxRedirections(){ Set<HostAndPort> jedisClusterNode=new HashSet<HostAndPort>(); jedisClusterNode.add(new HostAndPort("127.0.0.1",7379)); JedisCluster jc=new JedisCluster(jedisClusterNode); int slot51=JedisClusterCRC16.getSlot("51"); node2.clusterSetSlotMigrating(slot51,JedisClusterTestUtil.getNodeId(node3.clusterNodes())); jc.set("51","foo"); }
@Test public void testClusterCountKeysInSlot(){ Set<HostAndPort> jedisClusterNode=new HashSet<HostAndPort>(); jedisClusterNode.add(new HostAndPort(nodeInfo1.getHost(),nodeInfo1.getPort())); JedisCluster jc=new JedisCluster(jedisClusterNode); for (int index=0; index < 5; index++) { jc.set("foo{bar}" + index,"hello"); } int slot=JedisClusterCRC16.getSlot("foo{bar}"); assertEquals(5,node1.clusterCountKeysInSlot(slot).intValue()); }
private void setSuperConnectionHandler(JedisClusterConnectionHandler handler){ try { Field connectionHandlerField=JedisCluster.class.getDeclaredField("connectionHandler"); connectionHandlerField.setAccessible(true); connectionHandlerField.set(this,handler); } catch ( Exception e) { e.printStackTrace(); } }
/** * Store a job in Redis * @param jobDetail the {@link JobDetail} object to be stored * @param replaceExisting if true, any existing job with the same group and name as the given job will be overwritten * @param jedis a thread-safe Redis connection * @throws ObjectAlreadyExistsException */ @Override @SuppressWarnings("unchecked") public void storeJob(JobDetail jobDetail,boolean replaceExisting,JedisCluster jedis) throws ObjectAlreadyExistsException { final String jobHashKey=redisSchema.jobHashKey(jobDetail.getKey()); final String jobDataMapHashKey=redisSchema.jobDataMapHashKey(jobDetail.getKey()); final String jobGroupSetKey=redisSchema.jobGroupSetKey(jobDetail.getKey()); if (!replaceExisting && jedis.exists(jobHashKey)) { throw new ObjectAlreadyExistsException(jobDetail); } jedis.hmset(jobHashKey,(Map<String,String>)mapper.convertValue(jobDetail,new TypeReference<HashMap<String,String>>(){ } )); if (jobDetail.getJobDataMap() != null && !jobDetail.getJobDataMap().isEmpty()) { jedis.hmset(jobDataMapHashKey,getStringDataMap(jobDetail.getJobDataMap())); } jedis.sadd(redisSchema.jobsSet(),jobHashKey); jedis.sadd(redisSchema.jobGroupsSet(),jobGroupSetKey); jedis.sadd(jobGroupSetKey,jobHashKey); }
/** * Remove the given job from Redis * @param jobKey the job to be removed * @param jedis a thread-safe Redis connection * @return true if the job was removed; false if it did not exist */ @Override public boolean removeJob(JobKey jobKey,JedisCluster jedis) throws JobPersistenceException { final String jobHashKey=redisSchema.jobHashKey(jobKey); final String jobDataMapHashKey=redisSchema.jobDataMapHashKey(jobKey); final String jobGroupSetKey=redisSchema.jobGroupSetKey(jobKey); final String jobTriggerSetKey=redisSchema.jobTriggersSetKey(jobKey); Long delJobHashKeyResponse=jedis.del(jobHashKey); jedis.del(jobDataMapHashKey); jedis.srem(redisSchema.jobsSet(),jobHashKey); jedis.srem(jobGroupSetKey,jobHashKey); Set<String> jobTriggerSetResponse=jedis.smembers(jobTriggerSetKey); jedis.del(jobTriggerSetKey); Long jobGroupSetSizeResponse=jedis.scard(jobGroupSetKey); if (jobGroupSetSizeResponse == 0) { jedis.srem(redisSchema.jobGroupsSet(),jobGroupSetKey); } for ( String triggerHashKey : jobTriggerSetResponse) { final TriggerKey triggerKey=redisSchema.triggerKey(triggerHashKey); final String triggerGroupSetKey=redisSchema.triggerGroupSetKey(triggerKey); unsetTriggerState(triggerHashKey,jedis); jedis.srem(redisSchema.triggersSet(),triggerHashKey); jedis.srem(redisSchema.triggerGroupsSet(),triggerGroupSetKey); jedis.srem(triggerGroupSetKey,triggerHashKey); jedis.del(triggerHashKey); } return delJobHashKeyResponse == 1; }
/** * Remove (delete) the <code> {@link Trigger}</code> with the given key. * @param triggerKey the key of the trigger to be removed * @param removeNonDurableJob if true, the job associated with the given trigger will be removed if it is non-durableand has no other triggers * @param jedis a thread-safe Redis connection * @return true if the trigger was found and removed */ @Override protected boolean removeTrigger(TriggerKey triggerKey,boolean removeNonDurableJob,JedisCluster jedis) throws JobPersistenceException, ClassNotFoundException { final String triggerHashKey=redisSchema.triggerHashKey(triggerKey); final String triggerGroupSetKey=redisSchema.triggerGroupSetKey(triggerKey); if (!jedis.exists(triggerHashKey)) { return false; } OperableTrigger trigger=retrieveTrigger(triggerKey,jedis); final String jobHashKey=redisSchema.jobHashKey(trigger.getJobKey()); final String jobTriggerSetKey=redisSchema.jobTriggersSetKey(trigger.getJobKey()); jedis.srem(redisSchema.triggersSet(),triggerHashKey); jedis.srem(triggerGroupSetKey,triggerHashKey); jedis.srem(jobTriggerSetKey,triggerHashKey); if (jedis.scard(triggerGroupSetKey) == 0) { jedis.srem(redisSchema.triggerGroupsSet(),triggerGroupSetKey); } if (removeNonDurableJob) { Long jobTriggerSetKeySizeResponse=jedis.scard(jobTriggerSetKey); Boolean jobExistsResponse=jedis.exists(jobHashKey); if (jobTriggerSetKeySizeResponse == 0 && jobExistsResponse) { JobDetail job=retrieveJob(trigger.getJobKey(),jedis); if (!job.isDurable()) { removeJob(job.getKey(),jedis); signaler.notifySchedulerListenersJobDeleted(job.getKey()); } } } if (isNullOrEmpty(trigger.getCalendarName())) { jedis.srem(redisSchema.calendarTriggersSetKey(trigger.getCalendarName()),triggerHashKey); } unsetTriggerState(triggerHashKey,jedis); jedis.del(triggerHashKey); return true; }
/** * Unsets the state of the given trigger key by removing the trigger from all trigger state sets. * @param triggerHashKey the redis key of the desired trigger hash * @param jedis a thread-safe Redis connection * @return true if the trigger was removed, false if the trigger was stateless * @throws JobPersistenceException if the unset operation failed */ @Override public boolean unsetTriggerState(String triggerHashKey,JedisCluster jedis) throws JobPersistenceException { boolean removed=false; List<Long> responses=new ArrayList<>(RedisTriggerState.values().length); for ( RedisTriggerState state : RedisTriggerState.values()) { responses.add(jedis.zrem(redisSchema.triggerStateKey(state),triggerHashKey)); } for ( Long response : responses) { removed=response == 1; if (removed) { jedis.del(redisSchema.triggerLockKey(redisSchema.triggerKey(triggerHashKey))); break; } } return removed; }
/** * Store a {@link Calendar} * @param name the name of the calendar * @param calendar the calendar object to be stored * @param replaceExisting if true, any existing calendar with the same name will be overwritten * @param updateTriggers if true, any existing triggers associated with the calendar will be updated * @param jedis a thread-safe Redis connection * @throws JobPersistenceException */ @Override public void storeCalendar(String name,Calendar calendar,boolean replaceExisting,boolean updateTriggers,JedisCluster jedis) throws JobPersistenceException { final String calendarHashKey=redisSchema.calendarHashKey(name); if (!replaceExisting && jedis.exists(calendarHashKey)) { throw new ObjectAlreadyExistsException(String.format("Calendar with key %s already exists.",calendarHashKey)); } Map<String,String> calendarMap=new HashMap<>(); calendarMap.put(CALENDAR_CLASS,calendar.getClass().getName()); try { calendarMap.put(CALENDAR_JSON,mapper.writeValueAsString(calendar)); } catch ( JsonProcessingException e) { throw new JobPersistenceException("Unable to serialize calendar.",e); } jedis.hmset(calendarHashKey,calendarMap); jedis.sadd(redisSchema.calendarsSet(),calendarHashKey); if (updateTriggers) { final String calendarTriggersSetKey=redisSchema.calendarTriggersSetKey(name); Set<String> triggerHashKeys=jedis.smembers(calendarTriggersSetKey); for ( String triggerHashKey : triggerHashKeys) { OperableTrigger trigger=retrieveTrigger(redisSchema.triggerKey(triggerHashKey),jedis); long removed=jedis.zrem(redisSchema.triggerStateKey(RedisTriggerState.WAITING),triggerHashKey); trigger.updateWithNewCalendar(calendar,misfireThreshold); if (removed == 1) { setTriggerState(RedisTriggerState.WAITING,(double)trigger.getNextFireTime().getTime(),triggerHashKey,jedis); } } } }
/** * Remove (delete) the <code> {@link Calendar}</code> with the given name. * @param calendarName the name of the calendar to be removed * @param jedis a thread-safe Redis connection * @return true if a calendar with the given name was found and removed */ @Override public boolean removeCalendar(String calendarName,JedisCluster jedis) throws JobPersistenceException { final String calendarTriggersSetKey=redisSchema.calendarTriggersSetKey(calendarName); if (jedis.scard(calendarTriggersSetKey) > 0) { throw new JobPersistenceException(String.format("There are triggers pointing to calendar %s, so it cannot be removed.",calendarName)); } final String calendarHashKey=redisSchema.calendarHashKey(calendarName); Long deleteResponse=jedis.del(calendarHashKey); jedis.srem(redisSchema.calendarsSet(),calendarHashKey); return deleteResponse == 1; }
/** * Get the keys of all of the <code> {@link Job}</code> s that have the given group name. * @param matcher the matcher with which to compare group names * @param jedis a thread-safe Redis connection * @return the set of all JobKeys which have the given group name */ @Override public Set<JobKey> getJobKeys(GroupMatcher<JobKey> matcher,JedisCluster jedis){ Set<JobKey> jobKeys=new HashSet<>(); if (matcher.getCompareWithOperator() == StringMatcher.StringOperatorName.EQUALS) { final String jobGroupSetKey=redisSchema.jobGroupSetKey(new JobKey("",matcher.getCompareToValue())); final Set<String> jobs=jedis.smembers(jobGroupSetKey); if (jobs != null) { for ( final String job : jobs) { jobKeys.add(redisSchema.jobKey(job)); } } } else { List<Set<String>> jobGroups=new ArrayList<>(); for ( final String jobGroupSetKey : jedis.smembers(redisSchema.jobGroupsSet())) { if (matcher.getCompareWithOperator().evaluate(redisSchema.jobGroup(jobGroupSetKey),matcher.getCompareToValue())) { jobGroups.add(jedis.smembers(jobGroupSetKey)); } } for ( Set<String> jobGroup : jobGroups) { if (jobGroup != null) { for ( final String job : jobGroup) { jobKeys.add(redisSchema.jobKey(job)); } } } } return jobKeys; }
时间: 2024-10-06 18:45:24