Java使用多线程发送消息

在后台管理用户信息的时候,经常会用到批量发送提醒消息,首先想到的有:

(1)、循环发送列表,逐条发送。优点是:简单,如果发送列表很少,而且没有什么耗时的操作,是比较好的一种选择,缺点是:针对大批量的发送列表,不可取,耗时,程序会出现严重的阻塞问题。

(2)、使用队列(BlockingQueue),开启多个线程,分为三个部分。一部分负责处理将发送列表放入队列;一部分负责从队列中读取并发送消息;第三部分负责监视队列是否为空及后续的操作。

(3)、以下说到的这种模式,使用Future、Callable来返回发送结果,觉得是一种比较好的方式,很简单代码也很详细,就不介绍了。

代码如下:

public class PublishMsgTest {
    //创建固定大小为100 的线程池
    private static ExecutorService service = Executors.newFixedThreadPool(100);

    //发送消息的业务逻辑方法
    public int sendMsg(List<Integer> receivers,String content){
        long begin = System.nanoTime();
        AtomicInteger ai = new AtomicInteger(0);
        List<Future<Integer>> list = new ArrayList<>();
        //循环发送消息
        for(int i=0;i<receivers.size();i++){
            Integer receiver = receivers.get(i);
            //使用Future,Callable实现发送消息后返回发送结果
            Future<Integer> future = service.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    //调用相对比较耗时的发送消息接口
                    Thread.sleep(200);
                    //发送消息
                    int resultStatus = sendMsg(receiver,content);
                    System.out.println("接收者【"+receiver+"】,发送结果【"+resultStatus+"】");
                    return resultStatus;
                }
            });
            list.add(future);
        }
        System.out.println("-----------------------"+(System.nanoTime()-begin)/1000_000d+"-----------------------");
        //循环接收发送结果,相当于一个使线程同步的过程,这个过程是比较耗时的
        for(int i=0;i<list.size();i++){
            try {
                int resultStatus = list.get(i).get();
                if(resultStatus == 0){//发送成功
                    ai.incrementAndGet();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        System.out.println("发送消息结束,耗时:"+(System.nanoTime()-begin)/1000_000d);
        return ai.get();
    }

    public static void main(String[] args){
        PublishMsgTest pmt = new PublishMsgTest();
        //待接收人
        List<Integer> receivers = new ArrayList<Integer>();
        for(int i=0;i<1000;i++){
            receivers.add(i);
        }
        String content = "群发消息_测试代码";
        int successCount = pmt.sendMsg(receivers, content);
        System.out.println("共有【"+receivers.size()+"】接收者,发送成功【"+successCount+"】");
    }

    //完成发送消息
    private int sendMsg(Integer receiver, String content) {
        if(receiver%2 == 0){//模拟被2整除,即为发送成功
            return 0;
        }
        return 1;
    }

以上代码的执行结果:

-----------------------14.786889-----------------------
接收者【2】,发送结果【0】
接收者【3】,发送结果【1】
接收者【4】,发送结果【0】
接收者【0】,发送结果【0】
接收者【6】,发送结果【0】
接收者【7】,发送结果【1】
接收者【1】,发送结果【1】
接收者【5】,发送结果【1】
接收者【10】,发送结果【0】
.
.
.
.
接收者【994】,发送结果【0】
接收者【993】,发送结果【1】
接收者【992】,发送结果【0】
接收者【996】,发送结果【0】
接收者【999】,发送结果【1】
接收者【998】,发送结果【0】
接收者【997】,发送结果【1】
发送消息结束,耗时:2033.053433
共有【1000】接收者,发送成功【500】

摘自:http://www.cnblogs.com/quanenmin/p/4914620.html

时间: 2025-01-12 10:18:56

Java使用多线程发送消息的相关文章

java客户端向单机版kafka发送消息没有接收到

kafka版本:kafka_2.11-0.10.0.0 在kafka服务器命令发送消息,消费者可以接受到, 但是在java客户端向kafka发送消息时消费者接受不到, 在kafka/config/sever.properties把这个注解打开 advertised.listeners=PLAINTEXT://ip.137:9092        #本机服务器ip 意思就是说:hostname.port都会广播给producer.consumer.如果你没有配置了这个属性的话,则使用listene

java Socket多线程聊天程序(适合初学者)

一个非常简单的java聊天程序,有客户端和服务器端,目前只有群聊功能,其他的所有功能都可以在这个基础上添加,现在我分享出来主要是为了保持一个最简单的java聊天程序便于初学者学习,界面也非常的简洁,只有两个文件,主要是用了socket,java多线程,知识点不是很多,很适合初学者 下面是服务器端代码 package tk.socket; import java.awt.Dimension; import java.awt.FlowLayout; import java.awt.event.Act

第4节 Scala中的actor介绍:1、actor概念介绍;2、actor执行顺序和发送消息的方式

要看这一节... 10.    Scala Actor并发编程 10.1.   课程目标 10.1.1.    目标一:熟悉Scala Actor并发编程 10.1.2.    目标二:为学习Akka做准备 注:Scala Actor是scala 2.10.x版本及以前版本的Actor. Scala在2.11.x版本中将Akka加入其中,作为其默认的Actor,老版本的Actor已经废弃. 10.2.   什么是Scala  Actor 10.2.1.    概念 Scala中的Actor能够实

第四篇 :微信公众平台开发实战Java版之完成消息接受与相应以及消息的处理

温馨提示: 这篇文章是依赖前几篇的文章的. 第一篇:微信公众平台开发实战之了解微信公众平台基础知识以及资料准备 第二篇 :微信公众平台开发实战之开启开发者模式,接入微信公众平台开发 第三篇 :微信公众平台开发实战之请求消息,响应消息以及事件消息类的封装 首先,我们看看原来写的dopost方法: /** * 处理微信服务器发来的消息 */ public void doPost(HttpServletRequest request, HttpServletResponse response) thr

kafka无法发送消息问题处理

背景 在服务器上搭建了一个单机环境的kafka broker,在服务器上使用命令生产消息时,一切正常.当在本地使用JAVA程序发送消息时,一直出错. 抛出的错误为: Exception in thread "main" Failed to send requests for topics test with correlation ids in [0,12] kafka.common.FailedToSendMessageException: Failed to send messag

Delphi中多线程用消息实现VCL数据同步显示

Delphi中多线程用消息实现VCL数据同步显示 Lanno Ckeeke 2006-5-12 概述: delphi中严格区分主线程和子主线程,主线程负责GUI的更新,子线程负责数据运算,当数据运行完毕后,子线程可以向主线程式发送消息,以便通知其将VCL中的数据更新. 实现: 关键在于消息的发送及接收.在消息结构Tmessage中wParam和lParam类型为Longint,而指针类型也定义为Longint,可以通过此指针来传递自己所感兴趣的数据.如传递字符数组: 数组定义: const MA

我是企业号体验账户 我发送消息:微信错误 errcode=60011,

http://qydev.weixin.qq.com/qa/index.php?qa=3197&qa_1=%E6%88%91%E6%98%AF%E4%BC%81%E4%B8%9A%E5%8F%B7%E4%BD%93%E9%AA%8C%E8%B4%A6%E6%88%B7-%E6%88%91%E5%8F%91%E9%80%81%E6%B6%88%E6%81%AF%EF%BC%9A%E5%BE%AE%E4%BF%A1%E9%94%99%E8%AF%AF&show=3197#q3197 我是企业号

微信企业号回调模式验证与发送消息

最近放假闲着无聊,研究了一下微信企业号, 打算通过企业号做一个运维报警信息发送的功能,记录自己的操作 第一步 注册企业号,网上一搜一大把的教程,这里略过  微信企业号登录地址  https://qy.weixin.qq.com/ 第二步  登录后 点左侧 应用中心 -新建应用 第三步  在第二步第一图中的自建应用下面找到刚刚新建的应用 拉到最下面有一个模式选择,点击回调模式 会看到下图界面 Token 和EncodingAESKey 点击随机获取即可,上面的url需要你有自己的服务地址  你的服

使用DWR长连接技术实现客户端一对一发送消息

关于DWR怎么使用我的上一篇博文里面记录了,这里写一个DWR一对一消息推送的WEB程序,也就是WEB一对一聊天.我的思路是这样的:首先每个用户在登陆后在各自的页面放置一个唯一标记(如用户的ID,也可以放在session里面),用户A向用户B发送的消息 -->服务器 -->JAVA方法-->JAVA方法调用前端所有正在访问聊天页面的JS函数-->JS判断消息发送至的客户端是否是用户B -->是则显示,否则不显示:用户B向A同样过程 首先是该项目的web.xml文件: <?