多线程实现数据库的并发操作

  在Java中,程序需要操作数据库,操作数据首要事就是要获得数据库的Connection对象,利用多线程对数据导入数据库中将会加快操作进度,但是多个线程共享Connection对象,是不安全的,因为可以利用Java中的ThreadLocal为每个线程保存一个Connection对象,代码如下:

package com.quar.innovation.db;

import java.sql.Connection;
import java.sql.DriverManager;

public class ConnnectionManager {

	private static final ThreadLocal<Connection> connectionHolder = new ThreadLocal<Connection>();

	private static final String BETADBURL = "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8&autoReconnect=true&user=root&password=root";

	public static Connection getConnectionFromThreadLocal() {
		Connection conn = connectionHolder.get();
		try {
			if (conn == null || conn.isClosed()) {
				Connection con = ConnnectionManager.getConnection();
				connectionHolder.set(con);
				System.out.println("[Thread]" + Thread.currentThread().getName());
				return con;
			}
			return conn;
		} catch (Exception e) {
			System.out.println("[ThreadLocal Get Connection Error]" + e.getMessage());
		}
		return null;

	}

	public static Connection getConnection() {
		Connection conn = null;
		try {
			Class.forName("com.mysql.jdbc.Driver");
			conn = (Connection) DriverManager.getConnection(BETADBURL);
		} catch (Exception e) {
			System.out.println("[Get Connection Error]" + e.getMessage());
		}
		return conn;
	}
}

  通过ThreadLocal就可以为每个线程保留一份Connection对象,利用Java的ThreadPoolExecutor启动线程池,完成数据库操作,完整代码如下:

public class QunarThreadPoolExecutor extends ThreadPoolExecutor {

    // 记录每个线程执行任务开始时间
    private ThreadLocal<Long> start = new ThreadLocal<Long>();

    // 记录所有任务完成使用的时间
    private AtomicLong totals = new AtomicLong();

    // 记录线程池完成的任务数
    private AtomicInteger tasks = new AtomicInteger();

	public QunarThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
			BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
	}

	public QunarThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
			BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
	}

	public QunarThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
			BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
	}

	public QunarThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
			BlockingQueue<Runnable> workQueue) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
	}

	 /**
     * 每个线程在调用run方法之前调用该方法
     * */
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        start.set(System.currentTimeMillis());
    }

    /**
     * 每个线程在执行完run方法后调用该方法
     * */
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        tasks.incrementAndGet();
        totals.addAndGet(System.currentTimeMillis() - start.get());
    }

    @Override
    protected void terminated() {
        super.terminated();
        System.out.println("完成"+ tasks.get() +"个任务,平均耗时: [" + totals.get() / tasks.get() + "] ms");
    }

public class DataUpdater implements Runnable {

	private PreparedStatement pst;

	private List<UserProfileItem> userProfiles;

	private final String SQL = "insert into userprofile (`uid` ,`profile` , `logday`) VALUES (?, ? ,?) ON DUPLICATE KEY UPDATE `profile`= ? ";

	public DataUpdater(List<UserProfileItem> userProfiles) {
		this.userProfiles = userProfiles;
	}

	public void run() {
		try {
			pst = ConnnectionManager.getConnectionFromThreadLocal().prepareStatement(SQL);
			for (UserProfileItem userProfile : userProfiles) {
				if(userProfile.getUid() != null && !userProfile.getUid().isEmpty() &&
						userProfile.getProfile() != null && !userProfile.getProfile().isEmpty()) {
					pst.setString(1, userProfile.getUid());
					pst.setString(2, userProfile.getProfile());
					pst.setInt(3, userProfile.getLogday());
					pst.setString(4, userProfile.getProfile());
					pst.addBatch();
				}
			}
			pst.executeBatch();
		} catch (Exception e) {
			System.err.println("[SQL ERROR MESSAGE]" + e.getMessage());
		} finally {
			 close(pst);
		}

	}

	public void close(PreparedStatement pst) {
		if (pst != null) {
			try {
				pst.close();
			} catch (SQLException e) {
				System.err.println("[Close Statement Error]" + e.getMessage());
			}
		}
	}
}

public class UserProfileItem {

	private String uid;

	private String profile;

	private int logday;

	public UserProfileItem(String uid, String profile , int logday) {
		this.logday = logday;
		this.profile = profile;
		this.uid = uid;
	}

	public String getUid() {
		return uid;
	}

	public String getProfile() {
		return profile;
	}

	public int getLogday() {
		return logday;
	}

}

public class DataUpdaterMain {

	private LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();

	private QunarThreadPoolExecutor qunarThreadPoolExecutor = new QunarThreadPoolExecutor(5, 8, 5, TimeUnit.MINUTES, queue);

	public void shutThreadPool(ThreadPoolExecutor executor) {
		if (executor != null) {
			executor.shutdown();
			try {
				if (!executor.awaitTermination(20 , TimeUnit.MINUTES)) {
					executor.shutdownNow();
				}
			} catch (Exception e) {
				System.err.println("[ThreadPool Close Error]" + e.getMessage());
			}

		}
	}

	public void close(Reader reader) {
		if (reader != null) {
			try {
				reader.close();
			} catch (IOException e) {
				System.err.println("[Close Io Error]" + e.getMessage());
			}
		}
	}

	public void closeConnection(Connection conn , Statement st) {
		try {
			if (conn != null) {
				conn.close();
			}
			if (st != null) {
				conn.close();
			}
		} catch (Exception e) {
			System.err.println("[Close MySQL Error]" + e.getMessage());
		}
	}

	public boolean update(String file ,int logday) {
		long start = System.currentTimeMillis();
		BufferedReader br = null;
		int num = 0;
		try {
			br = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
			String line = null;
			List<UserProfileItem> userProfiles = new LinkedList<UserProfileItem>();
			while ((line = br.readLine()) != null) {
				++num;
				String []items = line.split("\t");
				if (items.length == 2) {
					String uid = items[0];
					String profile = items[1];
					userProfiles.add(new UserProfileItem(uid, profile, logday));
					if (userProfiles.size() >= 100) {
						qunarThreadPoolExecutor.execute(new DataUpdater(userProfiles));
						userProfiles = new LinkedList<UserProfileItem>();
					}
				} else {
					System.err.println("[Data Error]" + line);
				}
			}
			qunarThreadPoolExecutor.execute(new DataUpdater(userProfiles));;
		} catch (Exception e) {
			e.printStackTrace();
			System.err.println("[Read File Error]" + e.getMessage());
			return false;
		}  finally {
			System.err.println("[Update] take time " + (System.currentTimeMillis() - start) + ".ms");
			System.err.println("[Update] update item " + num);
			shutThreadPool(qunarThreadPoolExecutor);;
			close(br);
		}
		return true;
	}

	public static void main(String []args) {
		String file = "D:\\workspaces\\promotionwordData.log";
		int logday = Integer.parseInt("20150606");
		DataUpdaterMain dataUpdaterMain = new DataUpdaterMain();
		dataUpdaterMain.update(file, logday);
	}
}

  

时间: 2024-07-29 23:30:40

多线程实现数据库的并发操作的相关文章

【转载】数据库大并发操作要考虑死锁和锁的性能问题

本文转载自:http://blog.csdn.net/yuanyuanispeak/article/details/52756167 1 前言 数据库大并发操作要考虑死锁和锁的性能问题.看到网上大多语焉不详(尤其更新锁),所以这里做个简明解释,为下面描述方便,这里用T1代表一个数据库执行请求,T2代表另一个请求,也可以理解为T1为一个线程,T2 为另一个线程.T3,T4以此类推.下面以SQL Server(2005)为例. 2 锁的种类 共享锁(Shared lock). 例1: -------

数据库的并发操作

数据库的并发操作 事务 事务(Transaction)是用户定义的一个数据库操作序列,这些操作要么全做,要么全不做,是一个不可分割的工作单位. 事务是恢复和并发控制的基本单位 事务的ACID特性: 原子性(Atomicity):事务是数据库的逻辑工作单位 一致性(Consistency):事务执行的结果必须是使数据库从一个一致性状态变 到另一个一致性状态 隔离性(Isolation):一个事务的执行不能被其他事务干扰 持续性(Durability ):一个事务一旦提交,它对数据库中数据的改变就应

如何处理大量数据并发操作

文件缓存,数据库缓存,优化sql,数据分流,数据库表的横向和纵向划分,优化代码结构! 锁述的概 一. 为什么要引入锁 多个用户同时对数据库的并发操作时会带来以下数据不一致的问题: 丢失更新 A,B两个用户读同一数据并进行修改,其中一个用户的修改结果破坏了另一个修改的结果,比如订票系统 脏读 A用户修改了数据,随后B用户又读出该数据,但A用户因为某些原因取消了对数据的修改,数据恢复原值,此时B得到的数据就与数据库内的数据产生了不一致 不可重复读 A用户读取数据,随后B用户读出该数据并修改,此时A用

Qt 多线程与数据库操作需要注意的几点问题(QSqlDatabase对象只能在当前线程里使用)

彻底抛弃MFC, 全面应用Qt 已经不少时间了.除了自己看书按步就班做了十几个验证性的应用,还正式做了3个比较大的行业应用,总体感觉很好.Native C++ 下, Qt 基本是我用过的最简便的界面库了.遇到了一些问题,大都解决的很顺利,回头想想,还是有几个问题很有意思,尤其是数据库应用.这里把我的经历分享一下. 1.线程内注册与连接数据库的竞争问题 文档上对多线程下数据库应用的注意事项写的很简明,一个线程创建的 QSqlDatabase 对象和 查出来的 QSqlQuery 对象只能给本线程用

Qt 多线程与数据库操作需要注意的几点问题

源地址:http://blog.csdn.net/goldenhawking/article/details/10811409 彻底抛弃MFC, 全面应用Qt 已经不少时间了.除了自己看书按步就班做了十几个验证性的应用,还正式做了3个比较大的行业应用,总体感觉很好.Native C++ 下, Qt 基本是我用过的最简便的界面库了.遇到了一些问题,大都解决的很顺利,回头想想,还是有几个问题很有意思,尤其是数据库应用.这里把我的经历分享一下. 1.线程内注册与连接数据库的竞争问题 文档上对多线程下数

数据库中的并发操作带来的一系列问题及解决方法

数据库中常见的并发操作所带来的一致性问题包括:丢失的修改.不可重复读.读脏数据.幻影读(幻影读在一些资料中往往与不可重复读归为一类). 丢失修改 下面我们先来看一个例子,说明并发操作带来的数据的不一致性问题. 考虑飞机订票系统中的一个活动序列: 甲售票点(甲事务)读出某航班的机票余额A,设A=16. 乙售票点(乙事务)读出同一航班的机票余额A,也为16. 甲售票点卖出一张机票,修改余额A←A-1.所以A为15,把A写回数据库. 乙售票点也卖出一张机票,修改余额A←A-1.所以A为15,把A写回数

PHP使用数据库的并发问题

原载于我的博客 http://starlight36.com/post/php-db-concurrency 在并行系统中并发问题永远不可忽视.尽管PHP语言原生没有提供多线程机制,那并不意味着所有的操作都是线程安全的.尤其是在操作诸如订单.支付等业务系统中,更需要注意操作数据库的并发问题. 接下来我通过一个案例分析一下PHP操作数据库时并发问题的处理问题. 首先,我们有这样一张数据表: mysql> select * from counter; +----+-----+ | id | num

简述数据库事务并发机制

摘要: 事务是最小的逻辑执行单元,也是数据库并发控制的基本单位,其执行的结果必须使数据库从一种一致性状态变到另一种一致性状态.事务具有四个重要特性,即原子性(Atomicity).一致性(Consistency).隔离性 (Isolation)和持久性 (Durability).本文首先叙述了数据库中事务的本质及其四大特性(ACID)的内涵,然后重点介绍了事务隔离性的动机和内涵,并介绍了数据库为此所提供的事务隔离级别以及这些事务隔离级别能解决的事务并发问题.介于并发安全与并发效率的平衡,我们一般

多线程、进程、并发、并行、同步、异步、伪并发、真并发

进程.线程 1.进程 一个程序,可以独立运行的一段程序.系统对它进行资源分配和调度. 2.线程 进程的基本单位,对它进行cpu分配和调度.只拥有一点在运行中必不可少的资源(寄存器,栈,程序计数器) 3.线程与进程的联系与区别 联系: (1)线程是指进程内的一个执行单元,一个线程只能属于一个进程,而一个进程可以有多个线程,但至少有一个线程(通常说的主线程). 但是存在 DOS 这样的单进程(而且无线程概念)系统. (2)资源分配给进程,同一进程的所有线程共享该进程的所有资源,线程自己基本上不拥有系