1 public class QNode implements IRecycle { 2 /** 3 * session会话,记录通信层属性 4 **/ 5 private QSession session; 6 /** 7 * message cb 维护消息回调 8 **/ 9 private QCallbackManager callbackManager; 10 /** 11 * netty channel 12 **/ 13 private Channel channel; 14 15 private InetSocketAddress address; 16 }
QNode
1 public class QSession implements IRecycle { 2 3 public static enum SESSION_KEY { 4 ALIAS, OP_TIME, ID, 5 } 6 7 private long id; 8 private Map<String, Object> values = new HashMap<>(); 9 }
QSession
1 /*** 2 * 响应异步消息 针对业务上的逻辑处理 {@link QNode#send(Object, IQCallback)} 3 * 4 * @author solq 5 */ 6 public class QCallbackManager implements IRecycle { 7 private final static Logger LOGGER = LoggerFactory.getLogger(QCallbackManager.class); 8 9 private final static ScheduledExecutorService pool = PoolUtil.createScheduledPool(QMConfig.getInstance().POOL_CLEAR_MESSAGE_CORE, "message clear"); 10 11 private Map<Long, IQCallback<?>> messageRecoreds = Collections.synchronizedMap(new HashMap<>()); 12 13 @SuppressWarnings({ "unchecked", "rawtypes" }) 14 private void buildTask(long sn, IQCallback cb) { 15 messageRecoreds.put(sn, cb); 16 Future<?> future = pool.schedule(new Runnable() { 17 private long _sn = sn; 18 19 @Override 20 public void run() { 21 IQCallback<?> _cb = messageRecoreds.remove(_sn); 22 if (_cb == null) { 23 return; 24 } 25 try { 26 _cb.onReceiveError(QCode.MESSAGE_ERROR_TIMEOUT); 27 } finally { 28 _cb.recycle(); 29 } 30 31 } 32 }, QMConfig.getInstance().NETTY_MESSAGE_CALLBACK_CLEAR_INTERVAL, TimeUnit.MILLISECONDS); 33 cb.setFuture(future); 34 } 35 36 public <T> IQCallback<T> doSend(QPacket sendPacket, IQCallback<T> cb) { 37 if (cb == null) { 38 return null; 39 } 40 sendPacket.setStatus(QPacket.MASK_RESPONSE); 41 cb.setSendPacket(sendPacket); 42 buildTask(sendPacket.getSn(), cb); 43 return cb; 44 } 45 46 public void doSendError(QPacket sendPacket, short code) { 47 final long key = sendPacket.getSn(); 48 IQCallback<?> cb = messageRecoreds.remove(key); 49 if (cb == null) { 50 if (LOGGER.isWarnEnabled()) { 51 LOGGER.warn("发送失败 未找到回调 :" + key); 52 } 53 return; 54 } 55 try { 56 cb.onSendError(code); 57 } finally { 58 cb.recycle(); 59 } 60 } 61 62 public void doReceiveSucceed(QPacket rePacket) { 63 final long key = rePacket.getSn(); 64 IQCallback<?> cb = messageRecoreds.remove(key); 65 if (cb == null) { 66 if (LOGGER.isWarnEnabled()) { 67 LOGGER.warn("响应成功 未找到回调 :" + key); 68 } 69 return; 70 } 71 try { 72 short code = rePacket.toCode(); 73 cb.setCode(code); 74 cb.onSucceed(code); 75 } finally { 76 cb.recycle(); 77 } 78 } 79 80 public void doReceiveError(QPacket rePacket) { 81 final long key = rePacket.getSn(); 82 IQCallback<?> cb = messageRecoreds.remove(key); 83 if (cb == null) { 84 if (LOGGER.isWarnEnabled()) { 85 LOGGER.warn("响应失败 未找到回调 :" + key); 86 } 87 return; 88 } 89 try { 90 short code = rePacket.toCode(); 91 cb.setCode(code); 92 cb.onReceiveError(code); 93 } finally { 94 cb.recycle(); 95 } 96 } 97 98 public int getMessageRecoredSize() { 99 return messageRecoreds.size(); 100 } 101 102 @Override 103 public void recycle() { 104 // 释放所有消息 105 messageRecoreds.forEach((sn, cb) -> { 106 cb.recycle(); 107 }); 108 messageRecoreds.clear(); 109 }
QCallbackManager
1 public abstract class IQCallback<T> implements IRecycle, QResult<T> { 2 //响应成功回调 3 abstract public void onSucceed(short code); 4 5 // 默认什么也不用做 6 public void onSendError(short code) { 7 if (LOGGER.isWarnEnabled()) { 8 LOGGER.warn("onSendError : {)", code); 9 } 10 this.code = code; 11 } 12 13 // 默认什么也不用做 14 public void onReceiveError(short code) { 15 if (LOGGER.isWarnEnabled()) { 16 LOGGER.warn("onReceiveError : {)", code); 17 } 18 this.code = code; 19 } 20 }
IQCallback 有三种响应消息处理
1.onSucceed 响应返回成功
2.onSendError 发送时失败
3.onReceiveError 响应返回失败
有的业务非常复杂,如果响应失败了可以根据返回码深度处理
时间: 2024-10-10 20:31:05