Java io.netty.util.ReferenceCountUtil 代码实例

原文:https://www.helplib.com/Java_API_Classes/article_64580

以下是展示如何使用io.netty.util.ReferenceCountUtil的最佳示例。 我们使用了代码质量辨别算法从开源项目中提取出了最佳的优秀示例。

实例 1

复制代码

private static void testPerformOpeningHandshake0(boolean subProtocol) {
    EmbeddedChannel ch = new EmbeddedChannel(
            new HttpObjectAggregator(42), new HttpRequestDecoder(), new HttpResponseEncoder());
    FullHttpRequest req = ReferenceCountUtil.releaseLater(
            new DefaultFullHttpRequest(HTTP_1_1, HttpMethod.GET, "/chat"));
    req.headers().set(Names.HOST, "server.example.com");
    req.headers().set(Names.UPGRADE, WEBSOCKET.toLowerCase());
    req.headers().set(Names.CONNECTION, "Upgrade");
    req.headers().set(Names.SEC_WEBSOCKET_KEY, "dGhlIHNhbXBsZSBub25jZQ==");
    req.headers().set(Names.SEC_WEBSOCKET_ORIGIN, "http://example.com");
    req.headers().set(Names.SEC_WEBSOCKET_PROTOCOL, "chat, superchat");
    req.headers().set(Names.SEC_WEBSOCKET_VERSION, "13");
    if (subProtocol) {
        new WebSocketServerHandshaker13(
                "ws://example.com/chat", "chat", false, Integer.MAX_VALUE).handshake(ch, req);
    } else {
        new WebSocketServerHandshaker13(
                "ws://example.com/chat", null, false, Integer.MAX_VALUE).handshake(ch, req);
    }
    ByteBuf resBuf = (ByteBuf) ch.readOutbound();
    EmbeddedChannel ch2 = new EmbeddedChannel(new HttpResponseDecoder());
    ch2.writeInbound(resBuf);
    HttpResponse res = (HttpResponse) ch2.readInbound();
    Assert.assertEquals(
            "s3pPLMBiTxaQ9kYGzzhZRbK+xOo=", res.headers().get(Names.SEC_WEBSOCKET_ACCEPT));
    if (subProtocol) {
        Assert.assertEquals("chat", res.headers().get(Names.SEC_WEBSOCKET_PROTOCOL));
    } else {
        Assert.assertNull(res.headers().get(Names.SEC_WEBSOCKET_PROTOCOL));
    }
    ReferenceCountUtil.release(res);
}

实例 2

复制代码

@Test
public void testHttpUpgradeRequest() throws Exception {
    EmbeddedChannel ch = createChannel(new MockOutboundHandler());
    ChannelHandlerContext handshakerCtx = ch.pipeline().context(WebSocketServerProtocolHandshakeHandler.class);
    writeUpgradeRequest(ch);
    assertEquals(SWITCHING_PROTOCOLS, ReferenceCountUtil.releaseLater(responses.remove()).getStatus());
    assertNotNull(WebSocketServerProtocolHandler.getHandshaker(handshakerCtx));
}

实例 3

复制代码

@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
	Class<?> messageClass = msg.getClass();
	if (!handshaker.isHandshakeComplete()) {
		ctx.pipeline().remove(HttpObjectAggregator.class);
		handshaker.finishHandshake(ctx.channel(), (FullHttpResponse) msg);
		httpChannel = new NettyHttpChannel(tcpStream, new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/")) {
			@Override
			protected void doSubscribeHeaders(Subscriber<? super Void> s) {
				Publishers.<Void>empty().subscribe(s);
			}
		};
		NettyHttpWSClientHandler.super.channelActive(ctx);
		super.channelRead(ctx, msg);
		return;
	}
	if (TextWebSocketFrame.class.isAssignableFrom(messageClass)) {
		try {
			//don‘t inflate the String bytes now
			channelSubscriber.onNext(new StringBuffer(((TextWebSocketFrame) msg).content().nioBuffer()));
		} finally {
			ReferenceCountUtil.release(msg);
		}
	} else if (CloseWebSocketFrame.class.isAssignableFrom(messageClass)) {
		ctx.close();
	} else {
		doRead(ctx, ((WebSocketFrame)msg).content());
	}
}

实例 4

复制代码

@SuppressWarnings("unchecked")
protected final void doRead(ChannelHandlerContext ctx, Object msg) {
	try {
		if (null == channelSubscriber || msg == Unpooled.EMPTY_BUFFER) {
			ReferenceCountUtil.release(msg);
			return;
		}
		NettyBuffer buffer = NettyBuffer.create(msg);
		try {
			channelSubscriber.onNext(buffer);
		}
		finally {
			if (buffer.getByteBuf() != null) {
				if (buffer.getByteBuf()
				          .refCnt() != 0) {
					ReferenceCountUtil.release(buffer.getByteBuf());
				}
			}
		}
	}
	catch (Throwable err) {
		Exceptions.throwIfFatal(err);
		if (channelSubscriber != null) {
			channelSubscriber.onError(err);
		}
		else {
			throw err;
		}
	}
}

实例 5

复制代码

/**
 * Test try to reproduce issue #1335
 */
@Test
public void testBindMultiple() throws Exception {
    DefaultChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    NioEventLoopGroup group = new NioEventLoopGroup();
    try {
        for (int i = 0; i < 100; i++) {
            Bootstrap udpBootstrap = new Bootstrap();
            udpBootstrap.group(group).channel(NioDatagramChannel.class)
                    .option(ChannelOption.SO_BROADCAST, true)
                    .handler(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) {
                            // Discard
                            ReferenceCountUtil.release(msg);
                        }
                    });
            DatagramChannel datagramChannel = (DatagramChannel) udpBootstrap
                    .bind(new InetSocketAddress(0)).syncUninterruptibly().channel();
            channelGroup.add(datagramChannel);
        }
        Assert.assertEquals(100, channelGroup.size());
    } finally {
        channelGroup.close().sync();
        group.shutdownGracefully().sync();
    }
}

实例 6

复制代码

@BeforeClass
public static void init() {
    // Configure a test server
    group = new LocalEventLoopGroup();
    ServerBootstrap sb = new ServerBootstrap();
    sb.group(group)
      .channel(LocalServerChannel.class)
      .childHandler(new ChannelInitializer<LocalChannel>() {
          @Override
          public void initChannel(LocalChannel ch) throws Exception {
              ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                  @Override
                  public void channelRead(ChannelHandlerContext ctx, Object msg) {
                      // Discard
                      ReferenceCountUtil.release(msg);
                  }
              });
          }
      });
    localAddr = (LocalAddress) sb.bind(LocalAddress.ANY).syncUninterruptibly().channel().localAddress();
}

实例 7

复制代码

@BeforeClass
public static void init() {
    // Configure a test server
    group = new LocalEventLoopGroup();
    ServerBootstrap sb = new ServerBootstrap();
    sb.group(group)
            .channel(LocalServerChannel.class)
            .childHandler(new ChannelInitializer<LocalChannel>() {
                @Override
                public void initChannel(LocalChannel ch) throws Exception {
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) {
                            // Discard
                            ReferenceCountUtil.release(msg);
                        }
                    });
                }
            });
    localAddr = (LocalAddress) sb.bind(LocalAddress.ANY).syncUninterruptibly().channel().localAddress();
}

实例 8

复制代码

@Override
public ChannelGroupFuture write(Object message, ChannelMatcher matcher) {
    if (message == null) {
        throw new NullPointerException("message");
    }
    if (matcher == null) {
        throw new NullPointerException("matcher");
    }
    Map<Channel, ChannelFuture> futures = new LinkedHashMap<Channel, ChannelFuture>(size());
    for (Channel c: nonServerChannels) {
        if (matcher.matches(c)) {
            futures.put(c, c.write(safeDuplicate(message)));
        }
    }
    ReferenceCountUtil.release(message);
    return new DefaultChannelGroupFuture(this, futures, executor);
}

实例 9

复制代码

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if (isRemote(ctx)) {
        ByteBuf payload = (ByteBuf) msg;
        byte[] data = getPayloadFromByteBuf(payload);
        writeBuffer(data);
        return;
    }
    ReferenceCountUtil.retain(msg);
    // propagate the data to rest of handlers in pipeline
    ctx.fireChannelRead(msg);
}

实例 10

复制代码

@Override
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
    // The first message must be authentication response
    if (this.authenticationUrl != null && (this.cookies == null || this.cookies.isEmpty())) {
        HttpResponse response = (HttpResponse) msg;
        CharSequence cookieData = response.headers().get(new AsciiString("set-cookie"));
        if (cookieData != null) {
            this.cookies = ServerCookieDecoder.decode(cookieData.toString());
            if (this.cookies == null || this.cookies.isEmpty()) {
                throw new WebSocketAuthenticationFailureException("Could not authenticate");
            }
            if (log.isDebugEnabled()) {
                for (Cookie cookie : this.cookies) {
                    log.debug("Server says must set cookie with name {} and value {}", cookie.name(), cookie.value());
                }
            }
        } else {
            throw new ITException("Could not authenticate");
        }
        if (log.isDebugEnabled()) {
            log.debug("Authentication succeeded for user {}", this.user);
        }
        handShaker.handshake(ctx.channel());
        return;
    }
    // The second one must be the response for web socket handshake
    if (!handShaker.isHandshakeComplete()) {
        handShaker.finishHandshake(ctx.channel(), (FullHttpResponse) msg);
        if (log.isDebugEnabled()) {
            log.debug("Web socket client connected for user {}", this.user);
        }
        handshakeFuture.setSuccess();
        return;
    }
    // Take the byte buff and send it up to Stomp decoder
    if (msg instanceof WebSocketFrame) {
        if (log.isDebugEnabled()) {
            if (msg instanceof TextWebSocketFrame) {
                log.debug("Received text frame {}", ((TextWebSocketFrame) msg).text());
            }
        }
        ReferenceCountUtil.retain(msg);
        ctx.fireChannelRead(((WebSocketFrame) msg).content());
    }
}

实例 11

复制代码

@Override
protected void encode(ChannelHandlerContext ctx, DefaultHttpMessage defaultHttpMessage, List out) throws Exception {
    if (defaultHttpMessage.headers().contains(HttpHeaders.CONTENT_LENGTH, "", true)) {
        defaultHttpMessage.headers().remove(HttpHeaders.CONTENT_LENGTH);
    }
    ReferenceCountUtil.retain(defaultHttpMessage);
    out.add(defaultHttpMessage);
}

实例 12

复制代码

private static Object safeDuplicate(Object message) {
    if (message instanceof ByteBuf) {
        return ((ByteBuf) message).duplicate().retain();
    } else if (message instanceof ByteBufHolder) {
        return ((ByteBufHolder) message).duplicate().retain();
    } else {
        return ReferenceCountUtil.retain(message);
    }
}

实例 13

复制代码

@Override
public void onNext(T t) {
    // Retain so that post-buffer, the ByteBuf does not get released.
    // Release will be done after reading from the subject.
    ReferenceCountUtil.retain(t);
    state.bufferedObserver.onNext(t);
    // Schedule timeout once and when not subscribed yet.
    if (state.casTimeoutScheduled() && state.state == State.STATES.UNSUBSCRIBED.ordinal()) {
        timeoutScheduler.subscribe(new Action1<Long>() { // Schedule timeout after the first content arrives.
            @Override
            public void call(Long aLong) {
                disposeIfNotSubscribed();
            }
        });
    }
}

实例 14

复制代码

@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
    Channel channel = ctx.channel();
    if (msg instanceof HttpRequest) {
        HttpRequest request = (HttpRequest) msg;
        if (handleRequest(request, channel, ctx)) {
            if (httpMethodInfoBuilder.getHttpResourceModel()
                    .isStreamingReqSupported() &&
                    channel.pipeline().get("aggregator") != null) {
                channel.pipeline().remove("aggregator");
            } else if (!httpMethodInfoBuilder.getHttpResourceModel()
                    .isStreamingReqSupported() &&
                    channel.pipeline().get("aggregator") == null) {
                channel.pipeline().addAfter("router", "aggregator",
                        new HttpObjectAggregator(Integer.MAX_VALUE));
            }
        }
        ReferenceCountUtil.retain(msg);
        ctx.fireChannelRead(msg);
    } else if (msg instanceof HttpContent) {
        ReferenceCountUtil.retain(msg);
        ctx.fireChannelRead(msg);
    }
}

实例 15

复制代码

@Override
public void onData(final ByteBuf input) {
    // We need to retain until the serializer gets around to processing it.
    ReferenceCountUtil.retain(input);
    serializer.execute(new Runnable() {
        @Override
        public void run() {
            if (isTraceBytes()) {
                TRACE_BYTES.info("Received: {}", ByteBufUtil.hexDump(input));
            }
            ByteBuffer source = input.nioBuffer();
            do {
                ByteBuffer buffer = protonTransport.getInputBuffer();
                int limit = Math.min(buffer.remaining(), source.remaining());
                ByteBuffer duplicate = source.duplicate();
                duplicate.limit(source.position() + limit);
                buffer.put(duplicate);
                protonTransport.processInput();
                source.position(source.position() + limit);
            } while (source.hasRemaining());
            ReferenceCountUtil.release(input);
            // Process the state changes from the latest data and then answer back
            // any pending updates to the Broker.
            processUpdates();
            pumpToProtonTransport();
        }
    });
}

原文地址:https://www.cnblogs.com/shihaiming/p/9561838.html

时间: 2024-11-09 03:50:47

Java io.netty.util.ReferenceCountUtil 代码实例的相关文章

记一次netty版本冲突,报java.lang.NoSuchMethodError: io.netty.util.internal.ObjectUtil.checkPositive的问题

elasticsearch 5.6中使用TransportClient初始化抛异常 在引入elasticsearch5.6的transportclient包中,会引入netty进行通信. <!-- transport客户端 --> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version

Java IO总结(含代码)

从API学起 Java流操作有关的类或接口: Java流类图结构: 流的概念和作用 流是一组有顺序的,有起点和终点的字节集合,是对数据传输的总称或抽象.即数据在两设备间的传输称为流,流的本质是数据传输,根据数据传输特性将流抽象为各种类,方便更直观的进行数据操作. IO流的分类 根据处理数据类型的不同分为:字符流和字节流 根据数据流向不同分为:输入流和输出流 字符流和字节流 字符流的由来: 因为数据编码的不同,而有了对字符进行高效操作的流对象.本质其实就是基于字节流读取时,去查了指定的码表. 字节

填坑netty io.netty.util.internal.OutOfDirectMemoryError

我们有个与外部交互的接口是采用netty http,具体版本netty-4.1.18,为什么使用这个版本,我也不知道,历史原因. 由于netty都是异步请求,所以与外部交互总有些唯一的业务标识需要保存,以便前后数据可以勾兑. 这里先说明下,netty里的ByteBuf在读取channelRead未进行写write操作时,需要自己释放release.这和本次Error关系不大,继续说重点. 查看日志,首先发现了OutOfDirectMemoryError错误,这个错误也是间断性的出现,显然是内存不

Java IO流 之 BufferedReader BufferedWriter 实例

http://www.verejava.com/?id=16994737475720 package com.bufferedwriter; import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.util.ArrayList; import java.util.List; /* * 药品名称 规格 生产厂家 有效日

Java Code Examples for io.netty.util.concurrent.GlobalEventExecutor

Example 1 Project: lettuce   File: FuturesTest.java View source code 6 votes @Test public void regularUse() throws Exception { final DefaultPromise<Boolean> target = new DefaultPromise<Boolean>(GlobalEventExecutor.INSTANCE); Futures.PromiseAgg

netty io.netty.buffer简介

io.netty.util.ReferenceCounted 此接口代表一个引用计数的对象,此对象需要显示的释放. 当一个ReferenceCounted对象被实例化的时候,该对象的引用数量就是1,调用retain()方法会增加引用数量,调用 release() 方法会减少引用数量,如果引用数量减少到0,该对象就需要显示释放掉.访问释放掉的对象通常会导致访问冲突. 如果实现ReferenceCounted接口的对象是一个包含同样实现ReferenceCounted接口的对象的容器.当容器的引用数

漫谈Java IO之普通IO流与BIO服务器

今天来复习一下基础IO,也就是最普通的IO. 网络IO的基本知识与概念 普通IO以及BIO服务器 NIO的使用与服务器Hello world Netty入门与服务器Hello world Netty深入浅出 输入流与输出流 Java的输入流和输出流,按照输入输出的单元不同,又可以分为字节流和字符流的. JDK提供了很多输入流和输出流,比如: 字节流可以按照不同的变量类型进行读写,而字符流则是基于字符编码的.不同的字符编码包含的字节数是不一样的,因此在使用字符流时,一定要注意编码的问题. 读写 字

io.netty.resolver.dns.DnsNameResolverContext

java.net.UnknownHostException: failed to resolve 'xxx.com' after 3 queries at io.netty.resolver.dns.DnsNameResolverContext.finishResolve(DnsNameResolverContext.java:699) at io.netty.resolver.dns.DnsNameResolverContext.tryToFinishResolve(DnsNameResolv

java IO流文件的读写具体实例(转载)

引言: 关于java IO流的操作是非常常见的,基本上每个项目都会用到,每次遇到都是去网上找一找就行了,屡试不爽.上次突然一个同事问了我java文件的读取,我一下子就懵了第一反应就是去网上找,虽然也能找到,但自己总感觉不是很踏实,所以今天就抽空看了看java IO流的一些操作,感觉还是很有收获的,顺便总结些资料,方便以后进一步的学习... IO流的分类:1.根据流的数据对象来分:高端流:所有的内存中的流都是高端流,比如:InputStreamReader  低端流:所有的外界设备中的流都是低端流