[编织消息框架][分层模型设计]会话与节点

 1 public class QNode implements IRecycle {
 2     /**
 3      * session会话,记录通信层属性
 4      **/
 5     private QSession session;
 6     /**
 7      * message cb 维护消息回调
 8      **/
 9     private QCallbackManager callbackManager;
10     /**
11      * netty channel
12      **/
13     private Channel channel;
14
15     private InetSocketAddress address;
16 }

QNode

1 public class QSession implements IRecycle {
2
3     public static enum SESSION_KEY {
4     ALIAS, OP_TIME, ID,
5     }
6
7     private long id;
8     private Map<String, Object> values = new HashMap<>();
9 }

QSession

  1 /***
  2  * 响应异步消息 针对业务上的逻辑处理 {@link QNode#send(Object, IQCallback)}
  3  *
  4  * @author solq
  5  */
  6 public class QCallbackManager implements IRecycle {
  7     private final static Logger LOGGER = LoggerFactory.getLogger(QCallbackManager.class);
  8
  9     private final static ScheduledExecutorService pool = PoolUtil.createScheduledPool(QMConfig.getInstance().POOL_CLEAR_MESSAGE_CORE, "message clear");
 10
 11     private Map<Long, IQCallback<?>> messageRecoreds = Collections.synchronizedMap(new HashMap<>());
 12
 13     @SuppressWarnings({ "unchecked", "rawtypes" })
 14     private void buildTask(long sn, IQCallback cb) {
 15     messageRecoreds.put(sn, cb);
 16     Future<?> future = pool.schedule(new Runnable() {
 17         private long _sn = sn;
 18
 19         @Override
 20         public void run() {
 21         IQCallback<?> _cb = messageRecoreds.remove(_sn);
 22         if (_cb == null) {
 23             return;
 24         }
 25         try {
 26             _cb.onReceiveError(QCode.MESSAGE_ERROR_TIMEOUT);
 27         } finally {
 28             _cb.recycle();
 29         }
 30
 31         }
 32     }, QMConfig.getInstance().NETTY_MESSAGE_CALLBACK_CLEAR_INTERVAL, TimeUnit.MILLISECONDS);
 33     cb.setFuture(future);
 34     }
 35
 36     public <T> IQCallback<T> doSend(QPacket sendPacket, IQCallback<T> cb) {
 37     if (cb == null) {
 38         return null;
 39     }
 40     sendPacket.setStatus(QPacket.MASK_RESPONSE);
 41     cb.setSendPacket(sendPacket);
 42     buildTask(sendPacket.getSn(), cb);
 43     return cb;
 44     }
 45
 46     public void doSendError(QPacket sendPacket, short code) {
 47     final long key = sendPacket.getSn();
 48     IQCallback<?> cb = messageRecoreds.remove(key);
 49     if (cb == null) {
 50         if (LOGGER.isWarnEnabled()) {
 51         LOGGER.warn("发送失败 未找到回调 :" + key);
 52         }
 53         return;
 54     }
 55     try {
 56         cb.onSendError(code);
 57     } finally {
 58         cb.recycle();
 59     }
 60     }
 61
 62     public void doReceiveSucceed(QPacket rePacket) {
 63     final long key = rePacket.getSn();
 64     IQCallback<?> cb = messageRecoreds.remove(key);
 65     if (cb == null) {
 66         if (LOGGER.isWarnEnabled()) {
 67         LOGGER.warn("响应成功 未找到回调 :" + key);
 68         }
 69         return;
 70     }
 71     try {
 72         short code = rePacket.toCode();
 73         cb.setCode(code);
 74         cb.onSucceed(code);
 75     } finally {
 76         cb.recycle();
 77     }
 78     }
 79
 80     public void doReceiveError(QPacket rePacket) {
 81     final long key = rePacket.getSn();
 82     IQCallback<?> cb = messageRecoreds.remove(key);
 83     if (cb == null) {
 84         if (LOGGER.isWarnEnabled()) {
 85         LOGGER.warn("响应失败 未找到回调 :" + key);
 86         }
 87         return;
 88     }
 89     try {
 90         short code = rePacket.toCode();
 91         cb.setCode(code);
 92         cb.onReceiveError(code);
 93     } finally {
 94         cb.recycle();
 95     }
 96     }
 97
 98     public int getMessageRecoredSize() {
 99     return messageRecoreds.size();
100     }
101
102     @Override
103     public void recycle() {
104     // 释放所有消息
105     messageRecoreds.forEach((sn, cb) -> {
106         cb.recycle();
107     });
108     messageRecoreds.clear();
109     }

QCallbackManager

 1 public abstract class IQCallback<T> implements IRecycle, QResult<T> {
 2     //响应成功回调
 3     abstract public void onSucceed(short code);
 4
 5     // 默认什么也不用做
 6     public void onSendError(short code) {
 7     if (LOGGER.isWarnEnabled()) {
 8         LOGGER.warn("onSendError : {)", code);
 9     }
10     this.code = code;
11     }
12
13     // 默认什么也不用做
14     public void onReceiveError(short code) {
15     if (LOGGER.isWarnEnabled()) {
16         LOGGER.warn("onReceiveError : {)", code);
17     }
18     this.code = code;
19     }
20 }

IQCallback 有三种响应消息处理

1.onSucceed 响应返回成功

2.onSendError 发送时失败

3.onReceiveError 响应返回失败

有的业务非常复杂,如果响应失败了可以根据返回码深度处理

时间: 2024-10-10 20:31:05

[编织消息框架][分层模型设计]会话与节点的相关文章

[编织消息框架]前言

出书缘由 本项目名叫onequeue意为一流消息队列,参考对象为kafka 虽然最终结果可能达不到一流水准,但那不是主要的,主要是做的心态保持一流的态度 为什么作为kafka参考,又为什么自己重新做? 我在预研kafka发现在发送消息时网络断开会造成消息丢失,而底层没有提供失败回调给开发者使用,在某些场景来讲不允许丢消息的 进一步深入看下源码,虽然某些领域kafka开者人员很熟悉但综合水平觉得不如我,所以产生写消息框架的想法 面向读者 如果你喜欢网络传输,数据存储方向,那么本书会非常适合你,但你

[编织消息框架]数值与逻辑分离

为什么要分离? 业务需求是不停地变,如果把条件写进代码里,当用户需求变时要改代码发版本更新才能生效,这过程无疑是漫长的 就算是在开发期,不停的变开发者精力耗光在沟通,小修改上,无法专注逻辑部分 分离的根本目的是让开发者专注写引擎部分,无需关注太多业务上的边界,条件等 需要分离什么类型数值? 如活动开启时间,购买满足条件,购买上限等 这些不确定用户具体需求,全都可以弄成动态获取 分离技术实现有很多 如使用数据库mysql等 linux 常用的配置文本config 表格csv,json文件等 本项目

[编织消息框架][netty源码分析]4 eventLoop 实现类NioEventLoop职责与实现

NioEventLoop 是jdk nio多路处理实现同修复jdk nio的bug 1.NioEventLoop继承SingleThreadEventLoop 重用单线程处理 2.NioEventLoop是组成 pool EventLoopGroup 基本单元 总之好多边界判断跟业务经验之类的代码,非常烦碎 重要属性 public final class NioEventLoop extends SingleThreadEventLoop { //绑定 selector Selector sel

[编织消息框架][netty源码分析]7 Unsafe 实现类NioSocketChannelUnsafe职责与实现

Unsafe 是channel的内部接口, 负责跟socket底层打交道.从书写跟命名上看是不公开给开发者使用的,直到最后实现NioSocketChannelUnsafe也没有公开出去 public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> { interface Unsafe { RecvByteBufAllocator.Handle recvBufAllocHand

[编织消息框架][JAVA核心技术]动态代理应用7-实现设计

根据设计生成两个接口,IRpcSend send方法返回数据要求包装成QResult对象 public interface IRpcSend { public <T> QResult<T> send(byte command, Object... args); } public interface IRpcReceive { public <T> T receive(byte command, Object... args); } public interface IR

[编织消息框架][设计协议]bit基础

理论部分 1字节等于8比特,也就是8个二进数,如下面公式 1Byte = 8bits = 0111 1111 1Short = 2Btye 1Int = 4Byte 那学这些有什么用呢? 可以用来做数据存储,如状态,操作类型 如:拿1Byte演示 操作数据 状态数据 小结:1个Byte可以存储 -127~127 个数,也就是有256个标识可以使用 我们可以用4bits做状态,4bits做操作,1Byte容量组合成两种数据 小提示:可以用计算器来计算转换,window系统在附件里能找到 有的状态是

[编织消息框架][优化系统]突破连接上限(中)

接下来突破65000连接,因为要模拟大规模情况,测试机子有限,所以最好每台机子分配65000*2+以上 这里突破指的是单台机子client超过65000端口限制,当然最终也要server支撑得起 解决思路是添加虚拟IP,添加好后 ping 成功证明生效了,然后执行client测试,结果突破65000+ ifconfig eth0:0 ip netmask 255.255.255.0 up 或用脚本生成: for i in `seq 1 9`; do ifconfig eth0:$i 192.16

[编织消息框架][netty源码分析]14 PoolChunk 的 PoolSubpage

final class PoolSubpage<T> implements PoolSubpageMetric { //该page分配的chunk final PoolChunk<T> chunk; //内存使用记录 private final long[] bitmap; //该page是否已释放 boolean doNotDestroy; //该page在chunk中的id,通过区段计算偏移 private final int memoryMapIdx; //该page在chu

[编织消息框架][JAVA核心技术]动态代理应用2

接下来如何实现 第一步:先把服务类,调用方法转换成数字,方便传输 第二步:提取元信息,提取又有三种方式,三种各有优点,最优方式是第一种 1.编译java时处理 2.程序启动时处理,预处理 3.调用时处理,懒处理 第三步:编码解释 第四步:请求方式 第五步:分布式支持 第一步: @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) public @interface QModel { short value(); } @Targ