原文:https://www.helplib.com/Java_API_Classes/article_64580
以下是展示如何使用io.netty.util.ReferenceCountUtil的最佳示例。 我们使用了代码质量辨别算法从开源项目中提取出了最佳的优秀示例。
实例 1
复制代码
private static void testPerformOpeningHandshake0(boolean subProtocol) { EmbeddedChannel ch = new EmbeddedChannel( new HttpObjectAggregator(42), new HttpRequestDecoder(), new HttpResponseEncoder()); FullHttpRequest req = ReferenceCountUtil.releaseLater( new DefaultFullHttpRequest(HTTP_1_1, HttpMethod.GET, "/chat")); req.headers().set(Names.HOST, "server.example.com"); req.headers().set(Names.UPGRADE, WEBSOCKET.toLowerCase()); req.headers().set(Names.CONNECTION, "Upgrade"); req.headers().set(Names.SEC_WEBSOCKET_KEY, "dGhlIHNhbXBsZSBub25jZQ=="); req.headers().set(Names.SEC_WEBSOCKET_ORIGIN, "http://example.com"); req.headers().set(Names.SEC_WEBSOCKET_PROTOCOL, "chat, superchat"); req.headers().set(Names.SEC_WEBSOCKET_VERSION, "13"); if (subProtocol) { new WebSocketServerHandshaker13( "ws://example.com/chat", "chat", false, Integer.MAX_VALUE).handshake(ch, req); } else { new WebSocketServerHandshaker13( "ws://example.com/chat", null, false, Integer.MAX_VALUE).handshake(ch, req); } ByteBuf resBuf = (ByteBuf) ch.readOutbound(); EmbeddedChannel ch2 = new EmbeddedChannel(new HttpResponseDecoder()); ch2.writeInbound(resBuf); HttpResponse res = (HttpResponse) ch2.readInbound(); Assert.assertEquals( "s3pPLMBiTxaQ9kYGzzhZRbK+xOo=", res.headers().get(Names.SEC_WEBSOCKET_ACCEPT)); if (subProtocol) { Assert.assertEquals("chat", res.headers().get(Names.SEC_WEBSOCKET_PROTOCOL)); } else { Assert.assertNull(res.headers().get(Names.SEC_WEBSOCKET_PROTOCOL)); } ReferenceCountUtil.release(res); }
实例 2
复制代码
@Test public void testHttpUpgradeRequest() throws Exception { EmbeddedChannel ch = createChannel(new MockOutboundHandler()); ChannelHandlerContext handshakerCtx = ch.pipeline().context(WebSocketServerProtocolHandshakeHandler.class); writeUpgradeRequest(ch); assertEquals(SWITCHING_PROTOCOLS, ReferenceCountUtil.releaseLater(responses.remove()).getStatus()); assertNotNull(WebSocketServerProtocolHandler.getHandshaker(handshakerCtx)); }
实例 3
复制代码
@Override @SuppressWarnings("unchecked") public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Class<?> messageClass = msg.getClass(); if (!handshaker.isHandshakeComplete()) { ctx.pipeline().remove(HttpObjectAggregator.class); handshaker.finishHandshake(ctx.channel(), (FullHttpResponse) msg); httpChannel = new NettyHttpChannel(tcpStream, new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/")) { @Override protected void doSubscribeHeaders(Subscriber<? super Void> s) { Publishers.<Void>empty().subscribe(s); } }; NettyHttpWSClientHandler.super.channelActive(ctx); super.channelRead(ctx, msg); return; } if (TextWebSocketFrame.class.isAssignableFrom(messageClass)) { try { //don‘t inflate the String bytes now channelSubscriber.onNext(new StringBuffer(((TextWebSocketFrame) msg).content().nioBuffer())); } finally { ReferenceCountUtil.release(msg); } } else if (CloseWebSocketFrame.class.isAssignableFrom(messageClass)) { ctx.close(); } else { doRead(ctx, ((WebSocketFrame)msg).content()); } }
实例 4
复制代码
@SuppressWarnings("unchecked") protected final void doRead(ChannelHandlerContext ctx, Object msg) { try { if (null == channelSubscriber || msg == Unpooled.EMPTY_BUFFER) { ReferenceCountUtil.release(msg); return; } NettyBuffer buffer = NettyBuffer.create(msg); try { channelSubscriber.onNext(buffer); } finally { if (buffer.getByteBuf() != null) { if (buffer.getByteBuf() .refCnt() != 0) { ReferenceCountUtil.release(buffer.getByteBuf()); } } } } catch (Throwable err) { Exceptions.throwIfFatal(err); if (channelSubscriber != null) { channelSubscriber.onError(err); } else { throw err; } } }
实例 5
复制代码
/** * Test try to reproduce issue #1335 */ @Test public void testBindMultiple() throws Exception { DefaultChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); NioEventLoopGroup group = new NioEventLoopGroup(); try { for (int i = 0; i < 100; i++) { Bootstrap udpBootstrap = new Bootstrap(); udpBootstrap.group(group).channel(NioDatagramChannel.class) .option(ChannelOption.SO_BROADCAST, true) .handler(new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { // Discard ReferenceCountUtil.release(msg); } }); DatagramChannel datagramChannel = (DatagramChannel) udpBootstrap .bind(new InetSocketAddress(0)).syncUninterruptibly().channel(); channelGroup.add(datagramChannel); } Assert.assertEquals(100, channelGroup.size()); } finally { channelGroup.close().sync(); group.shutdownGracefully().sync(); } }
实例 6
复制代码
@BeforeClass public static void init() { // Configure a test server group = new LocalEventLoopGroup(); ServerBootstrap sb = new ServerBootstrap(); sb.group(group) .channel(LocalServerChannel.class) .childHandler(new ChannelInitializer<LocalChannel>() { @Override public void initChannel(LocalChannel ch) throws Exception { ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { // Discard ReferenceCountUtil.release(msg); } }); } }); localAddr = (LocalAddress) sb.bind(LocalAddress.ANY).syncUninterruptibly().channel().localAddress(); }
实例 7
复制代码
@BeforeClass public static void init() { // Configure a test server group = new LocalEventLoopGroup(); ServerBootstrap sb = new ServerBootstrap(); sb.group(group) .channel(LocalServerChannel.class) .childHandler(new ChannelInitializer<LocalChannel>() { @Override public void initChannel(LocalChannel ch) throws Exception { ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { // Discard ReferenceCountUtil.release(msg); } }); } }); localAddr = (LocalAddress) sb.bind(LocalAddress.ANY).syncUninterruptibly().channel().localAddress(); }
实例 8
复制代码
@Override public ChannelGroupFuture write(Object message, ChannelMatcher matcher) { if (message == null) { throw new NullPointerException("message"); } if (matcher == null) { throw new NullPointerException("matcher"); } Map<Channel, ChannelFuture> futures = new LinkedHashMap<Channel, ChannelFuture>(size()); for (Channel c: nonServerChannels) { if (matcher.matches(c)) { futures.put(c, c.write(safeDuplicate(message))); } } ReferenceCountUtil.release(message); return new DefaultChannelGroupFuture(this, futures, executor); }
实例 9
复制代码
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (isRemote(ctx)) { ByteBuf payload = (ByteBuf) msg; byte[] data = getPayloadFromByteBuf(payload); writeBuffer(data); return; } ReferenceCountUtil.retain(msg); // propagate the data to rest of handlers in pipeline ctx.fireChannelRead(msg); }
实例 10
复制代码
@Override public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception { // The first message must be authentication response if (this.authenticationUrl != null && (this.cookies == null || this.cookies.isEmpty())) { HttpResponse response = (HttpResponse) msg; CharSequence cookieData = response.headers().get(new AsciiString("set-cookie")); if (cookieData != null) { this.cookies = ServerCookieDecoder.decode(cookieData.toString()); if (this.cookies == null || this.cookies.isEmpty()) { throw new WebSocketAuthenticationFailureException("Could not authenticate"); } if (log.isDebugEnabled()) { for (Cookie cookie : this.cookies) { log.debug("Server says must set cookie with name {} and value {}", cookie.name(), cookie.value()); } } } else { throw new ITException("Could not authenticate"); } if (log.isDebugEnabled()) { log.debug("Authentication succeeded for user {}", this.user); } handShaker.handshake(ctx.channel()); return; } // The second one must be the response for web socket handshake if (!handShaker.isHandshakeComplete()) { handShaker.finishHandshake(ctx.channel(), (FullHttpResponse) msg); if (log.isDebugEnabled()) { log.debug("Web socket client connected for user {}", this.user); } handshakeFuture.setSuccess(); return; } // Take the byte buff and send it up to Stomp decoder if (msg instanceof WebSocketFrame) { if (log.isDebugEnabled()) { if (msg instanceof TextWebSocketFrame) { log.debug("Received text frame {}", ((TextWebSocketFrame) msg).text()); } } ReferenceCountUtil.retain(msg); ctx.fireChannelRead(((WebSocketFrame) msg).content()); } }
实例 11
复制代码
@Override protected void encode(ChannelHandlerContext ctx, DefaultHttpMessage defaultHttpMessage, List out) throws Exception { if (defaultHttpMessage.headers().contains(HttpHeaders.CONTENT_LENGTH, "", true)) { defaultHttpMessage.headers().remove(HttpHeaders.CONTENT_LENGTH); } ReferenceCountUtil.retain(defaultHttpMessage); out.add(defaultHttpMessage); }
实例 12
复制代码
private static Object safeDuplicate(Object message) { if (message instanceof ByteBuf) { return ((ByteBuf) message).duplicate().retain(); } else if (message instanceof ByteBufHolder) { return ((ByteBufHolder) message).duplicate().retain(); } else { return ReferenceCountUtil.retain(message); } }
实例 13
复制代码
@Override public void onNext(T t) { // Retain so that post-buffer, the ByteBuf does not get released. // Release will be done after reading from the subject. ReferenceCountUtil.retain(t); state.bufferedObserver.onNext(t); // Schedule timeout once and when not subscribed yet. if (state.casTimeoutScheduled() && state.state == State.STATES.UNSUBSCRIBED.ordinal()) { timeoutScheduler.subscribe(new Action1<Long>() { // Schedule timeout after the first content arrives. @Override public void call(Long aLong) { disposeIfNotSubscribed(); } }); } }
实例 14
复制代码
@Override protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception { Channel channel = ctx.channel(); if (msg instanceof HttpRequest) { HttpRequest request = (HttpRequest) msg; if (handleRequest(request, channel, ctx)) { if (httpMethodInfoBuilder.getHttpResourceModel() .isStreamingReqSupported() && channel.pipeline().get("aggregator") != null) { channel.pipeline().remove("aggregator"); } else if (!httpMethodInfoBuilder.getHttpResourceModel() .isStreamingReqSupported() && channel.pipeline().get("aggregator") == null) { channel.pipeline().addAfter("router", "aggregator", new HttpObjectAggregator(Integer.MAX_VALUE)); } } ReferenceCountUtil.retain(msg); ctx.fireChannelRead(msg); } else if (msg instanceof HttpContent) { ReferenceCountUtil.retain(msg); ctx.fireChannelRead(msg); } }
实例 15
复制代码
@Override public void onData(final ByteBuf input) { // We need to retain until the serializer gets around to processing it. ReferenceCountUtil.retain(input); serializer.execute(new Runnable() { @Override public void run() { if (isTraceBytes()) { TRACE_BYTES.info("Received: {}", ByteBufUtil.hexDump(input)); } ByteBuffer source = input.nioBuffer(); do { ByteBuffer buffer = protonTransport.getInputBuffer(); int limit = Math.min(buffer.remaining(), source.remaining()); ByteBuffer duplicate = source.duplicate(); duplicate.limit(source.position() + limit); buffer.put(duplicate); protonTransport.processInput(); source.position(source.position() + limit); } while (source.hasRemaining()); ReferenceCountUtil.release(input); // Process the state changes from the latest data and then answer back // any pending updates to the Broker. processUpdates(); pumpToProtonTransport(); } }); }
原文地址:https://www.cnblogs.com/shihaiming/p/9561838.html
时间: 2024-11-09 03:50:47