利用java多线程写的一个工具向MongoDb中存储大量数据

jdk:1.7mogodb:3.2mongodb_java_driver:3.2.2

  

  1 import java.io.BufferedReader;
  2 import java.io.File;
  3 import java.io.FileInputStream;
  4 import java.io.IOException;
  5 import java.io.InputStreamReader;
  6 import java.util.LinkedList;
  7 import java.util.List;
  8 import org.bson.Document;
  9 import com.mongodb.MongoClient;
 10 import com.mongodb.client.MongoCollection;
 11 import com.mongodb.client.MongoDatabase;
 12
 13 public class Test {
 14     public static void main(String[] args) {
 15         Pool p = new Pool();
 16         Produce pro = new Produce("p",p);//一个生产者
 17         //三个消费者,用来向mogodb中存储数据       Customer cus = new Customer("c", p);
 18         Customer cus2 = new Customer("c2", p);
 19         Customer cus3 = new Customer("c3", p);
 20          new Thread(pro).start();
 21          new Thread(cus).start();
 22          new Thread(cus2).start();
 23          new Thread(cus3).start();
 24
 25     }
 26 }
 27
 28 class Produce implements Runnable{//生产者
 29
 30     private static final String DIR = "E:\\targets";//扫描文件路径
 31     private static final String FILE_SUFFIX = "html";//扫描文件类型
 32     private Pool pool=null;
 33     private String name = null;
 34     public Produce(String name,Pool pool){
 35         this.pool= pool;
 36         this.name = name;
 37     }
 38     @Override
 39     public void run() {
 40         getFilesInDir(DIR, FILE_SUFFIX);
 41     }
 42
 43     public void getFilesInDir(String dir,String suffix){
 44         if(null!=dir && dir.trim().length()>0){
 45             File file = new File(dir.trim());
 46             if(file.isDirectory()){
 47                 File[] flist = file.listFiles();
 48                 if(null!=flist){
 49                     for(File f:flist){
 50                         if(f.isFile()){
 51                             if(null==suffix){
 52                                 pool.putFile(f);
 53                             }
 54                             if(null!=suffix &&suffix.trim().length()>0){
 55                                 if(f.getName().endsWith(suffix.trim())){
 56                                     pool.putFile(f);
 57                                     System.out.println(name+"pool存东西"+f.getName());
 58                                 }
 59                             }
 60                         }else{
 61                             getFilesInDir(f.getAbsolutePath(),suffix);
 62                         }
 63                     }
 64                 }
 65             }
 66         }
 67     }
 68
 69 }
 70
 71 class Customer implements Runnable{//消费者
 72     private static final String CHARSET = "UTF-8";//文件处理编码格式
 73     private static final String HOST = "127.0.0.1";//主机
 74     private static final int PORT = 27017;//端口
 75     private static final String DATABASE_NAME="mydb";//存储数据库名称
 76     private static final String COLLECTION_NAME="mycol";//存储数据库Collection
 77     MongoClient client =new MongoClient(HOST,PORT);;
 78     private String name=null;
 79     private Pool pool=null;
 80     public Customer(String name,Pool pool){
 81         this.name = name;
 82         this.pool = pool;
 83     }
 84     @Override
 85     public void run() {
 86         while(true){
 87             try {
 88                 File f = pool.fetchFile();
 89                 saveToMonGoDb(f);
 90                 System.out.println(name+"pool取东西"+f.getName());
 91             } catch (Exception e) {
 92                 e.printStackTrace();
 93             }
 94         }
 95     }
 96
 97     private void saveToMonGoDb(File file){//将文件保存到数据库
 98         MongoDatabase dataBase = client.getDatabase(DATABASE_NAME);
 99         MongoCollection<Document> collection = dataBase.getCollection(COLLECTION_NAME);
100         String _id = file.getName().substring(0,file.getName().lastIndexOf("."));
101         String content = readFileContext(file, CHARSET);
102         Document document = new Document("_id",_id).append("content", content);
103         collection.insertOne(document);
104     }
105
106     public static String readFileContext(File file,String charset)  {
107         StringBuilder sb;
108         BufferedReader reader=null;
109         try {
110             reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), charset));
111             String line = null;
112             sb = new StringBuilder();
113             while(null!=(line = reader.readLine())){
114                 sb.append(line+"\n");
115             }
116             return sb.toString();
117         }catch (Exception e) {
118             e.printStackTrace();
119         }finally{
120          try {
121             reader.close();
122         } catch (IOException e) {
123             e.printStackTrace();
124         }
125         }
126         return null;
127     }
128
129 }
130
131  class Pool{
132     volatile int size=0;
133     volatile private  List<File> files = new LinkedList<File>();
134     public  synchronized void putFile(File file){
135         while(size>1000){
136              try {
137                 this.wait();
138             } catch (InterruptedException e) {
139                 e.printStackTrace();
140             }
141          }
142         files.add(file);
143         ++size;
144         notify();
145     }
146
147     public synchronized File fetchFile(){
148         while(size<=0){
149              try {
150                 this.wait();
151             } catch (InterruptedException e) {e.printStackTrace();}
152          }
153         File file = null;
154         if(files.size()>0){
155             file = files.remove(0);
156             --size;
157         }
158         notify();
159         return file;
160     }
161
162     public int getSize(){
163         return this.size;
164     }
165 }
时间: 2024-10-12 03:28:31

利用java多线程写的一个工具向MongoDb中存储大量数据的相关文章

利用java的Socket实现一个简单hello/hi聊天程序

利用java的Socket实现一个简单hello/hi聊天程序 首先,我们来用java实现一个简单的hello/hi聊天程序.在这个程序里,我学习到了怎么用socket套接套接字来进行编程.简单理解了一些关于socket套接字和底层调用的关系.关于java的封装思想,我学会了一些东西,java里真的是万物皆对象.还学到了一点多线程的知识. TCP 在这里,不得不先介绍以下TCP.TCP是传输层面向连接的协议.提供了端到端的进程之间的通信方式.TCP在通信之前要先建立连接.这里我们称这个建立连接的

写了一个二叉树构造及中序遍历函数

本题就是测试读入数据的速度的. 如果有大量的数据读入,使用cin是很慢的. 那么使用scanf那么会快很多,但是如果数据量更大的话那么就还是不够快了. 所以这里使用fread. 首先开一个buffer,然后使用fread大块大块地读入数据就可以非常快地读入了. 题目如下: Input The input begins with two positive integers n k (n, k<=107). The next n lines of input contain one positive

纯 java 实现 Http 资源读取工具,支持发送和接收数据,不依赖任何第三方 jar 包

原文:纯 java 实现 Http 资源读取工具,支持发送和接收数据,不依赖任何第三方 jar 包 源代码下载地址:http://www.zuidaima.com/share/1550463379950592.htm 纯 java 实现 Http 资源读取工具,支持发送和接收数据,不依赖任何第三方 jar 包 1. 抓取指定 URL 的资源,可以作为流,也可以作为 String 2. 向指定 URL POST 数据,模拟表单提交. 例如:你想模拟 XXX 自动登陆,然后再发表心情.签名之类的 3

利用Java手写简单的httpserver

前言: 在看完尚学堂JAVA300中讲解如何实现一个最简单的httpserver部分的视频之后, 一.前置知识 1.HTTP协议 当前互联网网页访问主要采用了B/S的模式,既一个浏览器,一个服务器,浏览器向服务器请求资源,服务器回应请求,浏览器再将接收到的回应解析出来展现给用户.这一问一答的过程可以抽象成浏览器向服务器发送一个Request然后服务器返回一个Response的过程 其中Request和Reponse在HTTP中有有具体的格式要求 一个Request的例子 Method Path-

(转)Java多线程的监控分析工具(VisualVM)

原文链接:http://blog.csdn.net/chendc201/article/details/22905511 在Java多线程程序运行时,多数情况下我们不知道到底发生了什么,只有出了错误的日志的时候,我们才知道原来代码中有死锁.撇开代码检查工具,我们先讨论一下利用VisualVM监控,分析我们的多线程的运行情况. AD:51CTO学院:IT精品课程在线看! 在Java多线程程序运行时,多数情况下我们不知道到底发生了什么,只有出了错误的日志的时候,我们才知道原来代码中有死锁.撇开代码检

JAVA多线程提高十:同步工具CyclicBarrier与CountDownLatch

今天继续学习其它的同步工具:CyclicBarrier与CountDownLatch 一.CyclicBarrier CyclicBarrier是一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point).在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用.因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier. CyclicBarrier类似于CountDownL

写了一个工具分析基金回撤,分享实现过程

最近有一个网友让我帮他写一个工具分析基金回撤情况,前几天项目比较忙就直没动手,今天晚上有点时间,研究了一下. 先把今天的研究成果分享: 要分析基金净值回撤,首先肯定要有基金的净值变化数据.要抓数据肯定是到天天基金网上抓,毕竟人家是专业机构. 我找了一只个人比较喜欢的基金,易方达中小盘混合 (110011),在天天基金网站上很容易找到它的历史净值页面: http://fundf10.eastmoney.com/jjjz_110011.html 因为之前抓过天天基金当日基金净值数据,知道它的页面数据

Java多线程及JDK5线程工具类总结

内容摘抄来自:传智播客 张孝祥 老师的<Java高新技术>视频,   并加入了个人总结和理解. 虽然我没有参加过任何培训班,但我很大一部分知识都来自于传智播客的网络分享视频. 十分真挚的感谢张老师的公开视频. 1.传统线程技术的回顾 1 import org.junit.Test; 2 /** 3 * 传统线程回顾 4 * 多线程不一定会提高运行效率,和CPU设计和架构有关. 5 * 多线程下载是抢了服务器的资源,并不是自身的机器加快. 6 * @author LiTaiQing 7 */ 8

JAVA多线程提高十一:同步工具Exchanger

Exchanger可以在对中对元素进行配对和交换的线程的同步点.每个线程将条目上的某个方法呈现给 exchange 方法,与伙伴线程进行匹配,并且在返回时接收其伙伴的对象.Exchanger 可能被视为 SynchronousQueue 的双向形式.Exchanger 可能在应用程序(比如遗传算法和管道设计)中很有用. 构造方法摘要  Exchanger() 创建一个新的 Exchanger. 方法摘要 CountDownLatch CyclicBarrier V exchange(V x) 等