一个简单的java多线程例子

现在有这样一个任务,有一份手机号列表(20W),有一份话单的列表(10W),要统计哪些手机号没有出现在话单中,哪些手机号在话单中出现了不止一次。

想到的最直接的方式,就是两层循环去遍历,虽然此方法比较笨,但目前还没有想出更好的办法。

一开始使用单线程来处理,代码是随手写的并没有进行重构,只是做一个简单的说明:

package tool;

import java.util.List;

public class SingleThread
{
	public static void main(String[] args)
	{
		SingleThread st = new SingleThread();

		String userIdPath = "D:\\shell\\store_bak\\tool\\userid.txt";
		List<String> userIds = Util.readUserId(userIdPath);
		List<String> cdrItems = Util.readCdrItem();

		st.process(userIds, cdrItems);
	}

	/**
	 *
	 * @param userIds
	 * @param cdrItems
	 */
	private void process(List<String> userIds, List<String> cdrItems)
	{
		long startTime = System.currentTimeMillis();
		int count = 0;
		for (String key : userIds)
		{
			String[] uninKeys = key.split("\\s+");
			count = 0;
			for (String cdr : cdrItems)
			{
				if (cdr.contains("|" + uninKeys[0] + "|")
						&& cdr.contains("|" + uninKeys[1] + "|"))
				{
					count++;
				}
			}
		}
		System.out.println((System.currentTimeMillis() - startTime) / 1000);
	}

}

Util中的代码就不给出了,就是简单的文件读取操作,整个过程处理下来速度并不是太快,其中最耗时的操作在contains方法上,一开始使用的并不是contains方法,而是使用的正则表达式匹配,结果发现正则表达式的效率并不高,因此改用contains方法。但是效率还是不太理想。因此考虑使用多线程来处理。

和传统的生产者消费者不同,这里实际上只有消费者,因为产生原始数据几乎不耗时,最容易想到的办法就是定义个共享的index标志,依次互斥的进行+1操作,因此这里的index就是一个共享的变量,需要进行同步。直接使用jdk中提供的AtomicInteger,代码如下:

package tool;

import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;

public class MutiThread
{
	private static AtomicInteger lock = new AtomicInteger(0);

	public static void main(String[] args)
	{
		MutiThread tool = new MutiThread();
		String userIdPath = "D:\\shell\\store_bak\\tool\\userid.txt";
		List<String> userIds = Util.readUserId(userIdPath);
		List<String> cdrItems = Util.readCdrItem();

		tool.work2(lock, userIds, cdrItems);
	}

	public void work2(AtomicInteger lock, List<String> userIds,
			List<String> cdrItems)
	{
		final long startTime = System.currentTimeMillis();
		CyclicBarrier cb = new CyclicBarrier(5, new Runnable()
		{

			@Override
			public void run()
			{
				System.out.println((System.currentTimeMillis() - startTime) / 1000);
			}
		});
		for (int i = 0; i < 5; i++)
		{
			new Thread(new Worker(userIds, cdrItems, lock, cb)).start();
		}
	}

	class Worker implements Runnable
	{
		private List<String> userIds;
		private List<String> cdrItems;
		private AtomicInteger lock;
		private CyclicBarrier cb;

		public Worker(List<String> userIds, List<String> cdrItems,
				AtomicInteger lock, CyclicBarrier cb)
		{
			this.userIds = userIds;
			this.cdrItems = cdrItems;
			this.lock = lock;
			this.cb = cb;
		}

		@Override
		public void run()
		{
			while (true)
			{
				int index = lock.getAndIncrement();
				if (index >= userIds.size())
					break;
				String id = userIds.get(index);
				process1(id, cdrItems);
			}

			try
			{
				cb.await();
			} catch (InterruptedException e)
			{
				e.printStackTrace();
			} catch (BrokenBarrierException e)
			{
				e.printStackTrace();
			}
		}

	}

	private void process1(String id, List<String> cdrItems)
	{
		String[] uninKeys = id.split("\\s+");
		int count = 0;
		for (String cdr : cdrItems)
		{
			if (cdr.contains("|" + uninKeys[0] + "|")
					&& cdr.contains("|" + uninKeys[1] + "|"))
			{
				count++;
			}
		}
	}

}

使用多线程的方式确实能够提高不少效率,尤其是数据量大的时候,至少是两倍的速度,这里的线程数也不是越多越好,因为JVM对线程的调度也会消耗资源。

针对这个场景,考虑下concurrenthashmap的实现,可以将资源进行分段处理,可以巧妙的避开多线程的资源征用,因此可以将list分成不同的段,交给不同的线程去处理,代码如下:

package tool;

import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;

public class MutiSegmentMutiThread
{
	private static AtomicInteger lock = new AtomicInteger(0);
	private static int ThreadNum = 10;

	public static void main(String[] args)
	{
		MutiSegmentMutiThread tool = new MutiSegmentMutiThread();
		String userIdPath = "D:\\shell\\store_bak\\tool\\userid.txt";
		List<String> userIds = Util.readUserId(userIdPath);
		List<String> cdrItems = Util.readCdrItem();

		tool.work2(lock, userIds, cdrItems);
	}

	public void work2(AtomicInteger lock, List<String> userIds,
			List<String> cdrItems)
	{
		final long startTime = System.currentTimeMillis();
		CyclicBarrier cb = new CyclicBarrier(ThreadNum, new Runnable()
		{
			@Override
			public void run()
			{
				System.out.println((System.currentTimeMillis() - startTime) / 1000);
			}
		});
		int segmentSize = userIds.size() / ThreadNum;
		int start = 0;
		int end = 0;
		for (int i = 0; i < ThreadNum; i++)
		{
			start = i * segmentSize;
			if (i == ThreadNum - 1)
			{
				end = userIds.size();
			} else
			{
				end = (i + 1) * segmentSize;
			}
			new Thread(new Worker(userIds, cdrItems, cb, start, end)).start();
		}
	}

	class Worker implements Runnable
	{
		private List<String> userIds;
		private List<String> cdrItems;
		private CyclicBarrier cb;
		private int start;
		private int end;

		public Worker(List<String> userIds, List<String> cdrItems,
				CyclicBarrier cb, int start, int end)
		{
			this.userIds = userIds;
			this.cdrItems = cdrItems;
			this.cb = cb;
			this.start = start;
			this.end = end;
		}

		@Override
		public void run()
		{
			for (int i = start; i < end; i++)
			{
				String id = userIds.get(i);
				process1(id, cdrItems);
			}
			try
			{
				cb.await();
			} catch (InterruptedException e)
			{
				e.printStackTrace();
			} catch (BrokenBarrierException e)
			{
				e.printStackTrace();
			}
		}

	}

	private void process1(String id, List<String> cdrItems)
	{
		String[] uninKeys = id.split("\\s+");
		int count = 0;
		for (String cdr : cdrItems)
		{
			if (cdr.contains("|" + uninKeys[0] + "|")
					&& cdr.contains("|" + uninKeys[1] + "|"))
			{
				count++;
			}
		}
	}

}

实际测试中第三种方式确实比第二种要快些,但是提升并不是很明显。以上的代码只是为解决问题提供一个思路,想必还能够继续优化,如果数据量非常大,可以考虑使用分布式计算了。

时间: 2024-10-17 00:05:24

一个简单的java多线程例子的相关文章

[转]一个简单的Linux多线程例子 带你洞悉互斥量 信号量 条件变量编程

一个简单的Linux多线程例子 带你洞悉互斥量 信号量 条件变量编程 希望此文能给初学多线程编程的朋友带来帮助,也希望牛人多多指出错误. 另外感谢以下链接的作者给予,给我的学习带来了很大帮助 http://blog.csdn.net/locape/article/details/6040383 http://www.cnblogs.com/liuweijian/archive/2009/12/30/1635888.html 一.什么是多线程? 当我自己提出这个问题的时候,我还是很老实的拿着操作系

一个简单的Java程序例子以及其几种注释

在说道主题前,先来啰嗦两句,o()︿︶)o 唉,不说两句心里就有个疙瘩,也许这就是所谓的强迫症吧,好了说说我想啰嗦的,其实也就是这样子的,关于Java开发工具箱的下载以及环境的配置.Java的下载进入到oracle官网下载即可(具体的步骤就不赘述了如今网络资源丰富你懂得),下载好之后然后开始安装,根据安装过程中的说明进行下一步直至完成即可,等等不要激动,还没完了,要开始配置Java的开发环境(说明下我这说的只是正对window操作系统的),其实也很简单,就是对着桌面上的”我的电脑”鼠标右键带点击

Java学习笔记 11/15:一个简单的JAVA例子

首先来看一个简单的 Java 程序. 来看下面这个程序,试试看是否看得出它是在做哪些事情! 范例:TestJava.java // TestJava.java,java 的简单范例 public class TestJava { public static void main(String args[]) { int num ; // 声明一个整型变量 num num = 3 ; // 将整型变量赋值为 3 // 输出字符串,这里用"+" 号连接变量 System.out.printl

java 多线程例子

java 多线程例子 编写具有多线程能力的程序经常会用到的方法有: run(), start(), wait(), notify(), notifyAll(), sleep(), yield(), join() 还有一个重要的关键字:synchronized 本文将对以上内容进行讲解. 一:run() 和start() 示例1: public class ThreadTest extends Thread {public void run() {for (int i = 0; i < 10; i

一个简单的java回调函数的实现

回调函数 回调函数涉及的3个函数 登记回调函数 回调函数 响应回调函数 简单的解释 你到一个商店买东西,刚好你要的东西没有货,于是你在店员那里留下了你的电话,过了几天店里有货了,店员就打了你的电话,然后你接到电话后就到店里去取了货.在这个例子里,你的电话号码就叫回调函数,你把电话留给店员就叫登记回调函数,店里后来有货了叫做触发了回调关联的事件,店员给你打电话叫做调用回调函数,你到店里去取货叫做响应回调事件.回答完毕.来自知乎点击打开链接 代码的实现 首先有一个接口 interface CallB

一个简单的回调(例子)

1.声明一个回调Interface: public interface CallBack { /** * 执行回调方法 * @param objects 将处理后的结果作为参数返回给回调方法 */ public void execute(Object... objects ); } 2.回调的地方继承回调,实现回调的方法: import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamRe

一个简单的Java程序

一个.NET技术还是很菜的水平的猿人现在要去学习Java不知道是坏是好,无从得知啊! 不过在网上看了好多Java方面的简单例子,感觉Java还是蛮不错的么!不管以后怎么样啦,先开始自己的Java菜鸟之旅吧! 建立一个Java项目,建立一个属于自己的包,然后就开始自己的Java之旅... 创建的时候勾上这个生成main方法的选项,这个好像类似与我们.Net程序里控制台程序有木有.... 创建完成后就是这么一个样子,可以看到我们的包,还有给我们创建好自己的类,并且带了一个静态的main方法咋看就像.

Java多线程例子讲解

一:知识点声明: 1.区别进程和线程:进程是静态概念,它的执行依赖线程进行. 2.进程的状态:就绪(等待cpu执行),运行,中止,阻塞(等待所需资源,进入阻塞态) 3.Java程序的main函数即是一个线程,被称做主线程.此时如果新建线程,则和主线程一起并行运行. 4.Java中的构造方法.main函数谁先执行? main函数先执行,因为main是静态方法,程序一开始就执行:而构造方法只有在类实例化时才去调用. 二:实例程序 public class GetCurrentThread imple

一个简单的Spring定时器例子 注解方式

首先在applicationContext.xml中增加 文件头中增加一条 xmlns:task="http://www.springframework.org/schema/task"xsi:schemaLocation 中增加一条 http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd <beans xmlns:task=&quo