long interval = 900000;
long startTime = new Date("2016/03/01 01:00:00").getTime();
long endTime = new Date("2016/03/31 23:59:59").getTime();
String tagUid=(String) arr.get(i);
Query query = Query.query(Criteria.where("TagGuid").in(tagUid));//查询过滤条件 query.addCriteria(Criteria.where("mDay").is(1));//添加过滤条件 MapReduceOptions options = MapReduceOptions.options();//设置reduce配置项 Map scope = new HashMap(); scope.put("interval", interval); scope.put("startTime", startTime); options.scopeVariables(scope).verbose(true).outputTypeInline();//mapreduce外部变量插入 String mapFunction1 = "function () {" + "var sSecond=new Date(this.Atime).getTime()," + "tag=parseInt((sSecond-startTime)/interval);" + "emit({guid:this.TagGuid,time:tag},{tagUid:this.TagGuid,atime:this.Atime,value:this.TagValue,year:this.mYear,month:this.mMonth,day:this.mDay,seeId:this.SeeID})}";//map方法编写 String reduceFunction = "function(key,values){ return values[0];} ";//reduce方法编写 MapReduceResults<ReturnMessage> result = mongoTemplate.mapReduce(query, "Hdata03", mapFunction1, reduceFunction,options, ReturnMessage.class);//方法调用 Iterator<ReturnMessage> it = result.iterator();//得到结果 long p=0; while (it.hasNext()) { ReturnMessage message = it.next();//自定义实体类容器 String key=(String) JSONObject.fromObject(message.getId()).get("guid"); String insertSql="INSERT INTO t_base_data (data) VALUES (‘"+message.getValue()+"‘)";//插入postgresql baseDao.updateBySql(insertSql); }
最后插入postgesql做了循环插入 很不科学,浪费大量时间;
主要演示了一个mapreduce全局变量的使用;
其他循环只是测试用的,有点累赘;可以不看;
时间: 2024-10-03 19:12:41