Netty : writeAndFlush的线程安全及并发问题

使用Netty编程时,我们经常会从用户线程,而不是Netty线程池发起write操作,因为我们不能在netty的事件回调中做大量耗时操作。那么问题来了 –

1, writeAndFlush是线程安全的吗?

2, 是否使用了锁,导致并发性能下降呢

我们来看代码 – 在DefaultChannelHandlerContext中

@Override
    public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
        DefaultChannelHandlerContext next;
        next = findContextOutbound(MASK_WRITE);
        ReferenceCountUtil.touch(msg, next);
        next.invoker.invokeWrite(next, msg, promise);
        next = findContextOutbound(MASK_FLUSH);
        next.invoker.invokeFlush(next);
        return promise;
}

在DefaultChannelHandlerInvoker.java中

@Override
     public void invokeWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
         if (msg == null) {
             throw new NullPointerException("msg");
         }
         if (!validatePromise(ctx, promise, true)) {
             // promise cancelled
             ReferenceCountUtil.release(msg);
             return;
         }

         if (executor.inEventLoop()) {
             invokeWriteNow(ctx, msg, promise);
         } else {
             AbstractChannel channel = (AbstractChannel) ctx.channel();
             int size = channel.estimatorHandle().size(msg);
             if (size > 0) {
                 ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
                 // Check for null as it may be set to null if the channel is closed already
                 if (buffer != null) {
                     buffer.incrementPendingOutboundBytes(size);
                 }
             }
             safeExecuteOutbound(WriteTask.newInstance(ctx, msg, size, promise), promise, msg);
         }
     }
private void safeExecuteOutbound(Runnable task, ChannelPromise promise, Object msg) {
         try {
             executor.execute(task);
         } catch (Throwable cause) {
             try {
                 promise.setFailure(cause);
             } finally {
                 ReferenceCountUtil.release(msg);
             }
         }
     }

可见,writeAndFlush如果在Netty线程池内执行,则是直接write;否则,将作为一个task插入到Netty线程池执行。

《Netty权威指南》写到
通过调用NioEventLoop的execute(Runnable task)方法实现,Netty有很多系统Task,创建他们的主要原因是:当I/O线程和用户线程同时操作网络资源时,为了防止并发操作导致的锁竞争,将用户线程的操作封装成Task放入消息队列中,由I/O线程负责执行,这样就实现了局部无锁化。

参考
http://www.cnblogs.com/zemliu/p/3667332.html
http://netty.io/5.0/xref/io/netty/channel/DefaultChannelHandlerInvoker.html
http://www.infoq.com/cn/articles/netty-version-upgrade-history-thread-part/

时间: 2024-10-06 11:36:33

Netty : writeAndFlush的线程安全及并发问题的相关文章

Netty Redis 亿级流量 高并发 实战 (长文 修正版)

目录 疯狂创客圈 Java 分布式聊天室[ 亿级流量]实战系列之 -30[ 博客园 总入口 ] 写在前面 1.1. 快速的能力提升,巨大的应用价值 1.1.1. 飞速提升能力,并且满足实际开发要求 1.1.2. 越来越多.大量的应用场景 1.2. 高并发架构中的6大集群 1.2.1. 支撑亿级流量的IM整体架构 1.2.2. IM通讯协议介绍 1.2.3. 长连接和短连接 1.2.4. 技术选型 1.3. 基于Redis 设计分布式Session 1.3.1. SessionLocal本地会话

Java线程测试高并发

package com.expai.utils; import java.io.BufferedReader; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.HttpURLConnection; import java.net.URL; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Execu

Netty writeAndFlush() 流程与异步

Netty writeAndFlush()方法分为两步, 先 write 再 flush @Override public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { DefaultChannelHandlerContext next; next = findContextOutbound(MASK_WRITE); ReferenceCountUtil.touch(msg, next); next.invok

Java线程同步和并发第1部分

通过优锐课核心java学习笔记中,我们可以看到,码了很多专业的相关知识, 分享给大家参考学习.我们将分两部分介绍Java中的线程同步,以更好地理解Java的内存模型. 介绍 Java线程同步和并发是复杂应用程序各个设计阶段中讨论最多的主题. 线程,同步技术有很多方面,它们可以在应用程序中实现高并发性. 多年来,CPU(多核处理器,寄存器,高速缓存存储器和主内存(RAM))的发展已导致通常是开发人员往往忽略的某些领域-例如线程上下文,上下文切换,变量可见性,JVM内存 型号与CPU内存型号. 在本

Netty(二) 从线程模型的角度看 Netty 为什么是高性能的?

前言 在之前的 SpringBoot 整合长连接心跳机制 一文中认识了 Netty. 但其实只是能用,为什么要用 Netty?它有哪些优势?这些其实都不清楚. 本文就来从历史源头说道说道. 传统 IO 在 Netty 以及 NIO 出现之前,我们写 IO 应用其实用的都是用 java.io.* 下所提供的包. 比如下面的伪代码: ServeSocket serverSocket = new ServeSocket(8080); Socket socket = serverSocket.accep

那些年读过的书《Java并发编程实战》一、构建线程安全类和并发应用程序的基础

1.线程安全的本质和线程安全的定义 (1)线程安全的本质 并发环境中,当多个线程同时操作对象状态时,如果没有统一的状态访问同步或者协同机制,不同的线程调度方式和不同的线程执行次序就会产生不同的不正确的结果.要确保获得最后正确的结果就需要对线程访问对象状态 的操作上进行同步或者协同,使多个线程无论在什么样的调度方式和线程执行顺序的情况中,都能产生正确的结果. 线程安全的本质就对(对象)状态的访问操作进行统一管理,使之在不同的执行环境下均能产生正确的结果.也就是在不同的并发环境下,保持对象状态的不变

第二章:线程安全性——java并发编程实战

一个对象是否需要是线程安全的取决于它是否被多个线程访问. 当多个线程访问同一个可变状态量时如果没有使用正确的同步规则,就有可能出错.解决办法: 不在线程之间共享该变量 将状态变量修改为不可变的 在访问状态变量时使用同步机制 完全由线程安全类构造的程序也不一定是线程安全的,线程安全类中也可以包含非线程安全的类 一.什么是线程安全性 线程安全是指多个线程在访问一个类时,如果不需要额外的同步,这个类的行为仍然是正确的.(因为线程安全类中封装了必要的同步代码) 一个无状态的类是线程安全的.无状态类是指不

内存、线程安全与并发

@内存机制引用自 一.java内存机制 java程序在内存中的分配有4种,分别是: 全局数据区:保存static修饰的属性: 全局代码区:保存static修饰的静态方法: 栈内存空间:保存所有的对象名称,这些对象名称指向对象所在的堆内存空间: 堆内存空间:保存对象: 二.java变量的作用域: java变量分为4种: 类变量:也称为全局变量或者静态变量,需要用修饰符static修饰,在类定义后就分配内存空间,对应内存中的全局数据区,无需实例化就可以使用: 对象变量:也称为成员变量,实例化之后才分

如何处理线程池的并发?

[前言]我们从事Android开发以来,都自始自终被灌输着处理耗时的任务时要在非UI线程做.于是我们有了各种处理并发的编程手段,无论是自己用new Thread(Runnable)新起工作线程(Worker thread),还是利用Android提供的API(AsnyTask,CursorLaoder等)都是处理耗时任务的解决方案.但是在一个大型的应用程序中,如果我们需要处理数量很多且频繁的耗时任务时,如果还是采用之前的手段,无疑会带来很多不便:一来频繁创建销毁线程会造成资源(内存和Cpu)的浪