文章: http://blog.csdn.net/lili72
背景: 分布式文件通过rsync同步到生产机。 文件数 1440=24*60 也就是一分钟生成一个文件 文件命名 0000 0001 0002 ... 2358 2359 。由于文件传过来是JSON格式,需要对文件进行解析,导入HDFS中。
过程
1 rsync同步文件到当天的日期目录,每天实时把文件同步到生产机制定目录,每天生成新文件夹,由于是每分钟生成文件,每个文件夹都有1440个文件。
1.1 修改配置文件,增加一个目录的同步权限。
Vi /etc/rsyncd.conf
[orders]
path = /etldata/order
list=yes
ignore errors
auth users = hadoop
secrets file = /etc/rsyncd.secrets
comment = This is test data
1.2 一条命令即可同步。
rsync -az --port=8730 /data1/queue/ex_user_lastlogin/20141210/ [email protected]::userOrder/
2 对当天日期文件进行解析成以|分隔,检查文件数量是否达到1440个,对当天的日期文件夹中的文件进行解析转换。文件命名用一定的规则。 处理好该天的数据生成日期.ok文件
3 导入HDFS(hive)中,每天定时检查日期.ok文件是否生成,生成则load 前一天日期的数据。
2.1 Json解析工具选择,Jackson效率比较高
Json数据样例:{"data":{"serverid":"1001","appid":1005,"client_ip":"118.180.156.249","time":"2014-12-11 23:59:59","userid":361443577},"ordertype":1}
新建java model
public class UserLog { private String serverid=""; private String servertime=""; private String userid=""; private String appid=""; private String client_ip=""; //空的构造函数一定要 public UserLog(){ } public UserLog(String serverid, String servertime, String userid, String appid, String client_ip) { this.serverid = serverid; this.servertime = servertime; this.userid = userid; this.appid = appid; this.client_ip = client_ip; } public String getServerid() { return serverid; } public void setServerid(String serverid) { this.serverid = serverid; } public String getUserid() { return userid; } public void setUserid(String userid) { this.userid = userid; } public String getAppid() { return appid; } public void setAppid(String appid) { this.appid = appid; } public String getClient_ip() { return client_ip; } public void setClient_ip(String client_ip) { this.client_ip = client_ip; } public String getServertime() { return servertime; } public void setServertime(String servertime) { this.servertime = servertime; } @Override public String toString() { return serverid+"|" + userid+"|" +appid+"|"+servertime+"|"+client_ip; }
另外一个model
public class UserModel { private UserLog data; private String type="" ; public String getType() { return type; } public void setType(String type) { this.type = type; } public UserModel(){ } public UserLog getData() { return data; } public void setData(UserLog data) { this.data = data; } @Override public String toString() { return data.toString()+"|"+type; } }
解析程序:
import java.io.File; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.io.LineNumberReader; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; import org.codehaus.jackson.map.ObjectMapper; public class UserLoginLog { private static final ObjectMapper mapper = new ObjectMapper(); public static final String descPath="/etldata/usertest/data"; public static long total_count=0; public static void JsonToBean(String strline) throws Exception { UserLog bean = mapper.readValue(strline, UserLog.class); System.out.println(bean.toString()); } public static void main(String[] args) throws Exception { if(args.length<1){ System.err.println("Usage Main : [java -jar sourcePathFileName [descPath] "); System.exit(0); } // 传入基础路径参数 // /etldata/userlogin/85 String basePath =args[0]; String desFileName =null ; String sourcFileName="F:/0000.log"; //获取 24 * 60 分钟的文件名称 List<String> paths=getFileName(); //记录开始时间 long start_time= System.currentTimeMillis(); SimpleDateFormat sf = new SimpleDateFormat("yyyyMMdd"); String currentDate= sf.format(new Date()); //输出路径 默认是 /etldata/userlogin/data/yyyymmdd/0000.txt desFileName =descPath+"/"+currentDate ; //如果传入的目标参数 不为空 则生成文件到指定的目录 if(args.length==2){ desFileName=args[1]; } //循环读取源文件夹下的文件 循环解析到目标目录 for(String fileNum: paths){ sourcFileName =basePath+"/"+fileNum+".log"; System.out.println("sourcFileName-----" +sourcFileName); System.out.println("desFileName------" + desFileName); readFile(sourcFileName,desFileName+"/" +fileNum+".txt"); } long end_time =System.currentTimeMillis(); System.out.println("do finsh -----! 花费时间 " +(end_time-start_time) +" 处理 "+ total_count +" 行数据"); } /** * @return * 构造 24 * 60 每分钟生成一个文件的 文件名称 0000 0001 0002 .... 2358 2359 */ public static List<String> getFileName() { String hour = ""; String minu = ""; List<String> fileNameList = new ArrayList<String>(1441); for (int i = 0; i < 24; i++) { hour = i + ""; if (hour.length() == 1) { hour = "0" + hour; } for (int j = 0; j < 60; j++) { minu = j + ""; if (minu.length() == 1) { minu = "0" + minu; } fileNameList.add(hour + minu); } } return fileNameList; } /** * @param sourcFileName * @param desFileName * @throws Exception * 读取数据出来 然后解析 同时记录条数 * {"data":{"serverid":"1001","appid":1001,"client_ip":"120.217.97.205","time":"2014-12-11 23:59:59","userid":19617632},"ordertype":1} */ public static void readFile(String sourcFileName,String desFileName) throws Exception{ FileReader fr=new FileReader(sourcFileName); LineNumberReader lr=new LineNumberReader(fr,512); while(lr.readLine()!=null){ String str=lr.readLine(); UserModel bean =null; if(str!=null){ bean = mapper.readValue(str, UserModel.class); writeFile(bean.toString()+"\n",desFileName); total_count = total_count +1 ; } } lr.close(); } /** * @param content * @param pathName * @return * @throws IOException * 解析之后的文件 写到另外一个目录 */ public static String writeFile(String content,String pathName) throws IOException { File file = new File(pathName); if(!file.exists()){ file.getParentFile().mkdirs(); file.createNewFile(); } appendFileStr(pathName, content ); return pathName ; } /** * @param fileName * @param content * 追加文件内容 */ public static void appendFileStr(String fileName, String content){ try { FileWriter writer = new FileWriter(fileName, true); writer.write(content); writer.close(); } catch (IOException e) { e.printStackTrace(); } } }
依赖的jar 下载地址:http://download.csdn.net/detail/lili72/8279053
3 用脚本定时起调。
#!/usr/bin/env bash # ************************************************************************ # yyyymmdd version author modified # -------- --------- ------------- ---------------------- # 20141208 V14.00.001 lisc # # ************************************************************************ if [ $# -gt 1 ];then # 参数个数,需视具体参数修改 echo "Params error." echo "Useage: load_user_login_log.sh [data_date]" exit 1 fi logfile=$BIPROG_ROOT/logs/`basename $0`.log #定义写日志文件名 ###############################引入公共函数库######################### vDay=${1:-"`lastday YYYY-MM-DD`"} #如果没有传日期的参数,默认取昨天 vDay2=${1:-"`lastday YYYYMMDD`"} #如果没有传日期的参数,默认取昨天 dtstr=`date -d "0 day ago " +%Y-%m-%d" "%H:%M:%S` echo $vDay2 ############################ 功能执行部分 ############################ writelog "Program($VERSION) start..." # 先解析文件 java -jar /etldata/userorder/pare_json.jar /etldata/userorder/85 /etldata/userorder/data/${vDay2} touch /etldata/userorder/data/${vDay2}/${vDay2}.ok # load 到hive SQL=" load data local inpath '/etldata/userorder/data/${vDay2}/*.txt' overwrite into table userorder.st_userorder_log partition(dt='${vDay}');" echo $SQL | $HIVE_HOME/bin/hive
时间: 2024-10-14 06:07:48