java中使用FIFO队列:java.util.Queue实现多台服务器发邮件的代码

代码下载地址:http://www.zuidaima.com/share/1838230785625088.htm

原文:java中使用FIFO队列:java.util.Queue实现多台服务器发邮件的代码

最近由于zuidaima.com注册用户的增多,qq企业邮箱发送邮件会被封禁账号导致注册后面的用户收不到周总结,所以紧急开发了一套多账号,多服务器发送邮件的程序。

大概的设计思路如下:

1.服务器可以无限扩展,但由于qq企业邮箱是限定域名,所以要想多服务器还得有多域名,多账号也不行。

2.最核心的代码是FIFO分配指定长度的邮件列表这一步,通过java线程同步Queue实现。

3.发送失败的要加到重试队列继续发送。

4.经测试每分钟发一封邮件,qq企业邮箱最大可以发送7000封邮件,这样最代码2个账号[email protected]和[email protected]就可以发送到15000个邮件,该方案绝对是性价比超高的,目前市面上每封邮件1分钱,那15000也得150元人民币,如果你每周呢?

5.可以指定某个用户id来只发送某个范围的邮件列表

6.可以指定clear参数来清空所有邮件列表,不过该参数慎用,避免多发送邮件。

下面分享下核心代码,有思路和代码做参考,我想大家肯定可以自己搭建起来了:

分配邮件端核心java代码:

package com.zuidaima.mail;

private final static Map<Long, Queue<User>> map = new HashMap<Long, Queue<User>>();

// 申请发邮件的邮箱地址,每次根据mailTaskCount配置来取个数
	public @ResponseBody
	JSONObject apply_mails(
			@PathVariable("id") Long id,
			@RequestParam(value = "start_date", required = false) String startDate,
			@RequestParam(value = "end_date", required = false) String endDate,
			@RequestParam(value = "clear", required = false) String clear,
			@RequestParam(value = "user_id", required = false) Long user_id)
			throws IOException, TemplateException {
		JSONObject json = new JSONObject();
		String error = "";
		if (StringUtils.isBlank(startDate) || StringUtils.isBlank(endDate)) {
			error = "参数非法,startDate和endDate";
			json.put("error", error);
			return json;
		}
		final Task task = taskService.findOneById(id);
		if (task.getType() != ModuleConstants.TASK_TYPE_WEEK_SUMMARY) {
			error = "任务" + id + "不是周总结";
			json.put("error", error);
			return json;
		}
		Queue<User> allUsers = null;
		List<User> users = new ArrayList<User>();
		synchronized (map) {//注意线程安全
			if (StringUtils.isNotBlank(clear)) {// clear
				map.put(id, null);
				error = "任务清空成功";
				json.put("error", error);
				return json;
			}
			allUsers = map.get(id);
			if (allUsers == null) {
				allUsers = new LinkedList<User>();
				if (user_id == null) {
					user_id = 0l;
				}
				Page<User> page = userService
						.findAllReceiveMailUsersGreaterThanId(user_id, 1,
								Integer.MAX_VALUE);
				allUsers.addAll(page.getContent());
				map.put(id, allUsers);
			}
			int size = allUsers.size();
			if (size == 0) {
				error = "任务" + id + "执行完毕!";
				json.put("error", error);
				return json;
			}

			int i = 0;
			while (i < mailTaskCount) {
				User _user = allUsers.poll();
				if (_user == null) {
					break;
				}
				users.add(_user);
				i++;
			}
		}

		File templateFile = new File(deployPath + "/apply_week_summary_" + id
				+ ".html");
		String templdate = "";
		if (templateFile.exists()) {
			templdate = FileUtils.readFileToString(templateFile, "utf-8");
		} else {
			List<Long> weekSummaryShareIds = findIds(startDate, endDate);
			ModelMap model = new ModelMap();
			Collection<Project> weekSummaryShares = projectService
					.findAllByIds(weekSummaryShareIds);
			Configuration configuration = freemarkerConfig.getConfiguration();
			configuration.setDirectoryForTemplateLoading(new File(deployPath
					+ "/WEB-INF/template/"));
			Template template = configuration
					.getTemplate("apply_week_summary.htm");
			Page<Rank> weekRankUsers = rankService.findAllByType(
					ModuleConstants.RANK_TYPE_USER_CONTRIBUTE_WEEK,
					ParamConstants.DEFAULT_PAGE,
					ParamConstants.DEFAULT_TOP_LESS_COUNT,
					ModuleConstants.RANKSORT);
			model.put("weekRankUsers", weekRankUsers);

			Page<Rank> monthRankUsers = rankService.findAllByType(
					ModuleConstants.RANK_TYPE_USER_CONTRIBUTE_MONTH,
					ParamConstants.DEFAULT_PAGE,
					ParamConstants.DEFAULT_TOP_LESS_COUNT,
					ModuleConstants.RANKSORT);
			model.put("monthRankUsers", monthRankUsers);

			model.put("task", task);
			model.put("weekSummaryShares", weekSummaryShares);

			Sort sort = new Sort(Direction.DESC, "secondSort");
			Page<Project> downloadProjects = projectService.findAllByType(
					ModuleConstants.PROJECT_TYPE_SHARE_CODE, 1,
					ParamConstants.DEFAULT_TOP_COUNT, sort);
			model.put("downloadProjects", downloadProjects);

			sort = new Sort(Direction.DESC, "thirdSort");
			Page<Project> commentProjects = projectService.findAllByType(
					ModuleConstants.PROJECT_TYPE_SHARE_CODE, 1,
					ParamConstants.DEFAULT_TOP_COUNT, sort);
			model.put("commentProjects", commentProjects);

			StringWriter stringWriter = new StringWriter();
			stringWriter.flush();
			stringWriter.close();
			template.process(model, stringWriter);
			FileUtils.writeStringToFile(templateFile, stringWriter.toString(),
					"utf-8");
			templdate = stringWriter.toString();
		}

		json.put("template", templdate);
		JSONArray usersJson = new JSONArray();
		for (User user : users) {
			JSONObject userJson = new JSONObject();
			userJson.put("id", user.getId());
			userJson.put("uuid", user.getUuid());
			String code = Security.encryptUnsubscribeWeeklyEmail(user.getId()
					+ "_" + new Date().getTime());
			userJson.put("unsubscribe_weekly_email_code", code);
			userJson.put("email", user.getEmail());
			usersJson.add(userJson);
		}
		json.put("users", usersJson);
		json.put("error", error);
		json.put("title", task.getTitle());
		return json;
	}

发送邮件端核心java代码:

package com.zuidaima.mail;

public static void main(String[] args) {
		if (args.length < 3) {
			System.out
					.println("需要3个参数:\n第一个是task id\n第二个是开始时间,如:2014-05-05\n第三个是结束时间,如:2014-05-09\n第四个参数user_id\n第五个参数是clear,如true");
			return;
		}

		String task_id = args[0];
		String start_date = args[1];
		String end_date = args[2];
		String user_id = "0";
		if (args.length >= 4) {
			user_id = args[3];
		}
		String clear = "";
		if (args.length >= 5) {
			clear = args[4];
		}
		String url = String.format(APPLY_EMAIL_URL, task_id, start_date,
				end_date, user_id, clear);
		while (true) {
			String response = request(url);
			JSONObject json = JSONObject.fromObject(response);
			String error = json.getString("error");
			if (StringUtils.isNotBlank(error)) {
				System.out.println("发送邮件结束:error:" + error);
				break;
			}

			JSONArray usersJson = json.getJSONArray("users");
			if (usersJson.size() == 0) {
				System.out.println("发送邮件结束");
				break;
			}
			String template = json.getString("template");
			String title = json.getString("title");
			int i = 1;
			try {
				List<JSONObject> failed1 = new ArrayList<JSONObject>();
				for (Object _userJson : usersJson) {
					send(_userJson, title, template, i, failed1,
							usersJson.size());
					i++;
				}
				if (failed1.size() == 0) {
					System.out.println("发送邮件结束一批");
					continue;
				}

				System.out.println("第一次发送失败的邮箱列表" + failed1);
				Thread.sleep(1 * 60 * 60 * 1000);
				List<JSONObject> failed2 = new ArrayList<JSONObject>();
				i = 1;
				for (Object userJson : failed1) {
					send(userJson, title, template, i, failed2, failed1.size());
					i++;
				}
				if (failed2.size() == 0) {
					System.out.println("failed1发送邮件结束一批");
					continue;
				}

				System.out.println("第二次发送失败的邮箱列表" + failed2);
				Thread.sleep(1 * 60 * 60 * 1000);
				List<JSONObject> failed3 = new ArrayList<JSONObject>();
				i = 1;
				for (Object userJson : failed2) {
					send(userJson, title, template, i, failed3, failed2.size());
					i++;
				}

				if (failed3.size() == 0) {
					System.out.println("failed2发送邮件结束一批");
					continue;
				}

				System.out.println("第三次发送失败的邮箱列表" + failed3);
				Thread.sleep(1 * 60 * 60 * 1000);
				List<JSONObject> failed4 = new ArrayList<JSONObject>();
				i = 1;
				for (Object userJson : failed3) {
					send(userJson, title, template, i, failed4, failed3.size());
					i++;
				}
				System.out.println("最终发送失败的邮箱列表:" + failed4);
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}

线上正式环境运行截图:

java中使用FIFO队列:java.util.Queue实现多台服务器发邮件的代码

时间: 2024-08-02 19:09:26

java中使用FIFO队列:java.util.Queue实现多台服务器发邮件的代码的相关文章

多线程编程学习六(Java 中的阻塞队列).

介绍 阻塞队列(BlockingQueue)是指当队列满时,队列会阻塞插入元素的线程,直到队列不满:当队列空时,队列会阻塞获得元素的线程,直到队列变非空.阻塞队列就是生产者用来存放元素.消费者用来获取元素的容器. 当线程 插入/获取 动作由于队列 满/空 阻塞后,队列也提供了一些机制去处理,或抛出异常,或返回特殊值,或者线程一直等待... 方法/处理方式 抛出异常 返回特殊值 一直阻塞 超时退出 插入方法 add(e) offer(e) put(e) offer(e, timeout, unit

Java分布式:消息队列(Message Queue)

Java分布式:消息队列(Message Queue) 引入消息队列 消息,是服务间通信的一种数据单位,消息可以非常简单,例如只包含文本字符串:也可以更复杂,可能包含嵌入对象.队列,是一种常见的数据结构,它是保存消息的容器.那么消息队列就是以消息为基本单位的优先队列. 借助消息队列,系统的不同部分可相互通信并异步执行处理操作.消息队列提供一个临时存储消息的轻量级缓冲区,以及允许软件组件连接到队列以发送和接收消息的终端节点.这些消息通常较小,可以是请求.恢复.错误消息或明文信息等. 为什么使用消息

Java中的阻塞队列

1. 什么是阻塞队列? 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列.这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空.当队列满时,存储元素的线程会等待队列可用.阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程.阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素. 阻塞队列提供了四种处理方法: 方法\处理方式 抛出异常 返回特殊值 一直阻塞 超时退出 插入方法 add(e) offer(e) put

聊聊并发(七)——Java中的阻塞队列

1. 什么是阻塞队列? 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列.这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空.当队列满时,存储元素的线程会等待队列可用.阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程.阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素. 阻塞队列提供了四种处理方法: 方法\处理方式 抛出异常 返回特殊值 一直阻塞 超时退出 插入方法 add(e) offer(e) put

聊聊并发(七)Java中的阻塞队列

什么是阻塞队列 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列.这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空.当队列满时,存储元素的线程会等待队列可用.阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程.阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素. 阻塞队列提供了四种处理方法: 抛出异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException("Queue

【转】java中PriorityQueue优先级队列使用方法

优先级队列是不同于先进先出队列的另一种队列.每次从队列中取出的是具有最高优先权的元素. PriorityQueue是从JDK1.5开始提供的新的数据结构接口. 如果不提供Comparator的话,优先队列中元素默认按自然顺序排列,也就是数字默认是小的在队列头,字符串则按字典序排列. 由于网上的资料大多将优先级队列各个方法属性,很少有实例讲解的,为方便大家以后使用,我就写了个demo~ 如果想实现按照自己的意愿进行优先级排列的队列的话,需要实现Comparator接口.下面的方法,实现了根据某个变

《转》JAVA中PriorityQueue优先级队列使用方法

该文章转自:http://blog.csdn.net/hiphopmattshi/article/details/7334487 优先级队列是不同于先进先出队列的另一种队列.每次从队列中取出的是具有最高优先权的元素. PriorityQueue是从JDK1.5开始提供的新的数据结构接口. 如果不提供Comparator的话,优先队列中元素默认按自然顺序排列,也就是数字默认是小的在队列头,字符串则按字典序排列. 由于网上的资料大多将优先级队列各个方法属性,很少有实例讲解的,为方便大家以后使用,我就

java中的阻塞队列BlockingQueue

一.概述 位于java.util.concurrent下,声明:public interface BlockingQueue<E> extends Queue<E> 支持两个附加操作的 Queue,这两个操作是:获取元素时等待队列变为非空,以及存储元素时等待空间变得可用. BlockingQueue 方法以四种形式出现,对于不能立即满足但可能在将来某一时刻可以满足的操作,这四种形式的处理方式不同:第一种是抛出一个异常,第二种是返回一个特殊值(null 或 false,具体取决于操作

java中PriorityQueue优先级队列使用方法

优先级队列是不同于先进先出队列的另一种队列.每次从队列中取出的是具有最高优先权的元素. PriorityQueue是从JDK1.5开始提供的新的数据结构接口. 如果不提供Comparator的话,优先队列中元素默认按自然顺序排列,也就是数字默认是小的在队列头,字符串则按字典序排列. 由于网上的资料大多将优先级队列各个方法属性,很少有实例讲解的,为方便大家以后使用,我就写了个demo~ 如果想实现按照自己的意愿进行优先级排列的队列的话,需要实现Comparator接口.下面的方法,实现了根据某个变