java实现服务端守护进程来监听客户端通过上传json文件写数据到hbase中

1、项目介绍:

  由于大数据部门涉及到其他部门将数据传到数据中心,大部分公司采用的方式是用json文件的方式传输,因此就需要编写服务端和客户端的小程序了。而我主要实现服务端的代码,也有相应的客户端的测试代码。这里须有一个需要提到的是,我在实现接收json文件的同时,而且还需将数据写到hbase中。写入到hbase当中采用的是批量插入的方式,即一次插入多条记录。

  好了,有了前面的说明,下面来简单的说一下我实现的服务端的小程序把。

2、为了实现服务端能够监听客户端的行为,因此我在服务端采用多线程的技术来实现,并用socket的方式来实现网络通信。具体实现如下:

服务端的主程序:

package com.yiban.datacenter.finalversion;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

public class HbaseServer {

	public static void main(String[] args) {
		// TODO Auto-generated method stub
		backprocess();
	}

	public static void backprocess(){
		try {
			ServerSocket ss=new ServerSocket(11111);
			while(true){
				Socket s=ss.accept();

				Thread deal=new Thread(new DealUserThread(s));
				deal.setDaemon(true);  //这里设置对应线程是后台线程
				deal.start();
			}

		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

处理数据的线程类:

在这里我实现了接收数据,并将数据写入hbase中。

在实现这些大的目标的同时,也将客户端的请求通过日志文件的形式存到服务端的本地磁盘上,供后续查看。

package com.yiban.datacenter.finalversion;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.Socket;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;

import net.sf.json.JSONArray;
import net.sf.json.JSONObject;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;

public class DealUserThread implements Runnable {

	private String testconnect = "username=chenpiao,password=123;username=liujiyu,password=123"; // 这个可以用来验证用户名和密码

	private static Configuration conf = HBaseConfiguration.create();

	private static Connection connection = null;

	private String logFile=null;
	// 配置hbase的信息
	static {
		try {
			conf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.27.233");
			conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181);
			connection = ConnectionFactory.createConnection(conf);
		} catch (Exception e) {
			// TODO: handle exception
			e.printStackTrace();
		}
	}

	private Socket s;

	public DealUserThread(Socket s) {
		this.s = s;
	}

	private String userTableName = "nihao";
	private String columnFamilyName = null;
	private String rowKey = null;

	private BufferedReader serverread = null;
	private BufferedWriter serverwrite = null;

	@Override
	public void run() {
		// TODO Auto-generated method stub
		try {
			// 将通道内的字节流转换成字符流,并用bufferedreader进行封装,InputStreamReader是将字节流转换成字符流
			serverread = new BufferedReader(new InputStreamReader(
					s.getInputStream()));

			// 询问客户端连接是否准备好,接受客户端的连接请求
			String line = serverread.readLine(); // 阻塞
			//System.out.println(line);// 输出客户端的连接请求

			//为日志文件命名,并创建文件
			SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
			logFile="/var/log/datacenter/"+"user.log";
			//System.out.println(logFile);
			File destFile = new File(logFile);
			if (!destFile.exists()) {
				destFile.createNewFile();
			}
			writeByFileWrite(logFile, line+"\n"+sdf.format(System.currentTimeMillis()));
			// 将通道内的字符写入到对应的文件中,利用bufferedwrite进行封装,FileWriter是将字符流写入到文件中
			serverwrite = new BufferedWriter(new OutputStreamWriter(
					s.getOutputStream()));
			String[] strArray = testconnect.split("\\;");
			boolean flag = false;
			for (String str : strArray) {
				if (str.equals(line)) {
					/*
					 * serverwrite.write("连接成功,你可以发送数据了,发送数据前,请先发送你要用的数据库表名!");
					 * serverwrite.newLine(); serverwrite.flush();
					 */
					printInfomationForClient("connection successful ,now you can send data,befor send data ,you must send tablename!");
					flag = true;
					break;
				}
			}

			if (!flag) {
				/*
				 * serverwrite.write("密码或者用户名错误,连接失败!"); serverwrite.newLine();
				 * serverwrite.flush();
				 */
				printInfomationForClient("username or password is error! connection failed!");
				s.close();
			}

			// 准备接收表名
			userTableName = serverread.readLine();
			//System.out.println("tablename:" + userTableName);// 输出客户端的连接请求的表名

			writeByFileWrite(logFile, "tablename="+userTableName);//将内容写到日志文件中

			// 告诉客户端,我接受成功
			if (TableIsExist(userTableName)) {
				printInfomationForClient("received tablename successful!");
			} else {
				printInfomationForClient("tablename Is not exist ");
				s.close();
			}

			line = "[";
			StringBuffer temp = new StringBuffer(line);
			while ((line = serverread.readLine()) != null) {
				temp.append(line);
			}
			temp.append("]");
			//System.out.println(temp.toString());

			// 对接收到的数据进行异常处理,如上传的数据格式不正确等等。
			try {
				// 对json文件进行解析
				JSONArray jsonArray = JSONArray.fromObject(temp.toString());
				// JSONObject jsonobject=JSONObject.fromObject(temp.toString());

				// 解析之后进行输出
				//PrintJsonArray(jsonArray);

				//获取所有的表名
				//getAllTables(conf);

				// 将接收到的数据写入hbase中的表中
				insertData(jsonArray, userTableName);

				//System.out.println("存入数据成功!");

			} catch (Exception e) {
				printErrorForClient(e);
			}
			// 给出一个反馈,提示数据上传成功
			// 封装通道内的输出流,方便对他进行写字符数据
			// BufferedWriter bwserver = new BufferedWriter(new
			// OutputStreamWriter(s.getOutputStream()));

			/*
			 * serverwrite.write("文件上传成功!"); // bwserver.newLine();
			 * serverwrite.flush(); serverwrite.close();
			 */
			printInfomationForClient("upload data successful!");

			// 释放资源
			s.close();

		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
			try {
				printErrorForClient(e);
				writeByFileWrite(logFile,e.getMessage()+e.toString());
			} catch (IOException e1) {
				// TODO Auto-generated catch block
				e1.printStackTrace();
			}
		}
	}

	private void printInfomationForClient(String s) throws IOException {
		try {
			serverwrite.write(s);
			writeByFileWrite(logFile, s);//将内容写到日志文件中
			serverwrite.newLine();
			serverwrite.flush();
		} catch (Exception e) {
			// TODO: handle exception
			e.printStackTrace();
			writeByFileWrite(logFile, e.getMessage()+e.toString());//将内容写到日志文件中
		}
	}

	private void printErrorForClient(Exception e) throws IOException {
		try {
			serverwrite.write("found a error:" + e.getMessage() + e.toString());
			writeByFileWrite(logFile,"found a error:" + e.getMessage() + e.toString());
			serverwrite.newLine();
			serverwrite.flush();
			s.close();
		} catch (IOException e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
			writeByFileWrite(logFile,e1.getMessage()+e1.toString());
		}

	}

	Map<String, String> colvalue = new TreeMap<String, String>();

	private void insertData(JSONArray jsonArray, String userTableName)
			throws IOException {

		// connect the table
		Table table = null;
		try {
			table = connection.getTable(TableName.valueOf(userTableName));
		} catch (IOException e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
			printErrorForClient(e1);
		}

		colvalue.clear();

		for (int i = 0; i < jsonArray.size(); i++) {
			JSONObject obj = jsonArray.getJSONObject(i);
			Set<String> keysets = obj.keySet();
			for (String key : keysets) {
				switch (key) {
				case "category":
					columnFamilyName = obj.getString(key);
					break;
				case "version":
					colvalue.put("version", obj.getString("version"));
					break;
				case "DocumentType":
					colvalue.put("DocumentType", obj.getString("DocumentType"));
					break;
				case "articles":
					JSONArray articlesjars = obj.getJSONArray("articles");
					dealjsonArray(table,articlesjars);
					break;
				default:
					printErrorForClient(new Exception("send datatype is error!"));
				}
			}

		}
	}

	private void insertColDataToHbase(Table table) throws IOException {
		// 判断是否包含对应的列族,若不包含则添加
		HTableDescriptor desc = new HTableDescriptor(table.getName());
		Collection<HColumnDescriptor> familys=desc.getFamilies();
		if (  familys.contains(new HColumnDescriptor(columnFamilyName))
				&& columnFamilyName != null) {
			addColFamily(table, desc, columnFamilyName);
		}

		// insert data
		List<Put> putlist = new ArrayList<Put>();

		if (!colvalue.isEmpty() && rowKey != null && columnFamilyName != null) {
			Put put = new Put(Bytes.toBytes(rowKey));// 指定行,也就是键值
			// 下面就是循环存储列
			for (Entry<String, String> col : colvalue.entrySet()) {
				put.add(Bytes.toBytes(columnFamilyName),
						Bytes.toBytes(col.getKey()),
						Bytes.toBytes(col.getValue()));
				putlist.add(put);
			}

		} else {
			printErrorForClient(new Exception("send datatype is error!"));
		}

		try {
			table.put(putlist);
			table.close();
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
			printErrorForClient(e);
		}
	}

	private void dealjsonArray(Table table,JSONArray articlesjars) throws IOException {
		// TODO Auto-generated method stub
		if(articlesjars.isEmpty()){
			System.out.println("articlesjars is empty");
			printErrorForClient(new Exception("send datatype is error!"));
			return;
		}
		for(int i=0;i<articlesjars.size();i++){
			JSONObject obj=articlesjars.getJSONObject(i);
			Set<String>keysets=obj.keySet();
			for(String key:keysets){
				switch(key){
				case "content":
					colvalue.put("content", obj.getString("content"));
					break;
				case "picture_url":
					colvalue.put("picture_url", obj.getString("picture_url"));
					break;
				case "time":
					colvalue.put("time", obj.getString("time"));
					break;
				case "author":
					colvalue.put("author", obj.getString("author"));
					break;
				case "url":
					rowKey=obj.getString("url");
					break;
				case "title":
					colvalue.put("title", obj.getString("title"));
					break;
				default:
					printErrorForClient(new Exception("send datatype is error!"));
				}
			}
			try {
				insertColDataToHbase(table);
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
				printErrorForClient(e);
			}
		}

	}

	private void addColFamily(Table table, HTableDescriptor desc,
			String colFamily) throws IOException {

		Admin ad = connection.getAdmin();

		HColumnDescriptor family = new HColumnDescriptor(
				Bytes.toBytes(colFamily));// 列簇
		desc.addFamily(family);
		ad.addColumn(table.getName(), family);
		ad.close();

	}

	private boolean TableIsExist(String userTableName2) throws IOException {
		boolean flag = false;
		try {
			// Connection connection = ConnectionFactory.createConnection(conf);
			Admin ad = connection.getAdmin();
			if (ad.tableExists(TableName.valueOf(userTableName2))) {
				flag = true;
				//System.out.println("表存在");
			} else {
				//System.out.println("表不存在");
				//printErrorForClient(new Exception("表不存在"));
			}
		} catch (Exception e) {
			// TODO: handle exception
			e.printStackTrace();
			printErrorForClient(e);
		}

		return flag;
		// TODO Auto-generated method stub

	}

	private void PrintJsonArray(JSONArray jsonArray) {
		int size = jsonArray.size();
		System.out.println("Size: " + size);
		for (int i = 0; i < size; i++) {
			JSONObject jsonObject = jsonArray.getJSONObject(i);
			Set<String> keysets = jsonObject.keySet();
			for (String keyset : keysets) {
				System.out.println(keyset);
			}
		}
	}

	private void PrintJsonArray(JSONObject jsonobject, String... keys) {
		int size = jsonobject.size();
		System.out.println("Size: " + size);
		for (int i = 0; i < size; i++) {
			for (String key : keys) {
				System.out.println(key + ":" + jsonobject.get(key));
			}

			// System.out.println("[" + i + "]id=" + jsonObject.get("id"));
			// System.out.println("[" + i + "]name=" + jsonObject.get("name"));
			// System.out.println("[" + i + "]role=" + jsonObject.get("role"));
		}
	}

	//写日志文件
	public static void writeByFileWrite(String _sDestFile, String _sContent)
			throws IOException {
		FileWriter fw = null;
		try {
			fw = new FileWriter(_sDestFile,true);
			fw.write(_sContent);
			fw.write(‘\n‘);
		} catch (Exception ex) {
			ex.printStackTrace();
		} finally {
			if (fw != null) {
				fw.close();
				fw = null;
			}
		}
	}

	// create table
	private void createTable(Configuration conf) {
		// HBaseAdmin ha=new HBaseAdmin(conf);
		try {
			// Connection connection = ConnectionFactory.createConnection(conf);
			Table table = connection.getTable(TableName.valueOf(userTableName));
			Admin ad = connection.getAdmin();

			// TableName name= TableName.valueOf(Bytes.toBytes(tablename));//表名
			HTableDescriptor desc = new HTableDescriptor(table.getName());

			HColumnDescriptor family = new HColumnDescriptor(
					Bytes.toBytes(columnFamilyName));// 列簇
			desc.addFamily(family);

			ad.createTable(desc);
			ad.close();

		} catch (Exception e) {
			// TODO: handle exception
			e.printStackTrace();
		}

	}

	// Hbase获取所有的表信息
	public static List getAllTables(Configuration conf)
			throws MasterNotRunningException, ZooKeeperConnectionException,
			IOException {

		HBaseAdmin ad = new HBaseAdmin(conf);
		List<String> tables = null;
		if (ad != null) {
			try {
				HTableDescriptor[] allTable = ad.listTables();
				if (allTable.length > 0)
					tables = new ArrayList<String>();
				for (HTableDescriptor hTableDescriptor : allTable) {
					tables.add(hTableDescriptor.getNameAsString());
					System.out.println(hTableDescriptor.getNameAsString());
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		return tables;
	}

}

实现了这些之后,通过eclipse将其导出程可运行的jar包,并将jar包放到服务器上进行部署,部署的方式很简单,但是也要注意一下:

java -jar myserver.jar com.yiban.datacenter.finalversion.HbaseServer &

最后一个&表示后台守护启动该进程。

时间: 2024-10-09 22:45:03

java实现服务端守护进程来监听客户端通过上传json文件写数据到hbase中的相关文章

通过FileWatcher,监听通过web上传的图片,并进行压缩

需求是这样的,通过web传输过来的图片,无论是JS上传,还是其他的上传方式,都需要生成2张缩略图,分别是用于商品列表的小图small,和用于分享的小图share.基于不同上传方式的不同需求,使用exe程序可以简单适配所有情况,因此有以下的解决方案. 首先是简单的FileWatcher的使用,我们只需要监听Create事件即可.因为small和share均在同一个文件夹中,因此无需监控子文件夹的变化. this.fileWatcher.Path = tbDir.Text; fileWatcher.

flume实例一、监听目录日志上传到其他服务器

一.flume-ng简介 请参考官方文档:http://flume.apache.org/FlumeUserGuide.html 二.实例 需求说明:需要监控一个目录,并自动上传到服务器,且需要在传输过程中进行加密. 整体方案:n个client-agent -->server-agent client-agent: a1.sources = r1 a1.channels = c1 a1.sinks = k1 #source a1.sources.r1.type = spooldir a1.sou

flume实例二、监听目录日志上传到HDFS文件系统

一.概述 接实例一,实例一中server-aget是把日志上传保存到服务器上面,随着日志越来越大,公司启动了hadoop项目,需要把日志直接上传hdfs中保存,配置文件target_hdfs.conf如下: a2.sources = r2 a2.channels = c2 a2.sinks = k2 #source a2.sources.r2.type = avro a2.sources.r2.channels = c2 a2.sources.r2.compression-type = defl

网络:多个进程能否监听同一个端口号?

我们都知道socket是网络上两个进程之间的双向通信链路, 即 socket = <A进程的IP地址:端口号,B进程的IP地址:端口号> 那么有个问题就很有意思了,不同的进程可以监听在同一个IP地址:端口号么? 根据Unix网络编程中的知识可知,服务端监听一个端口会经历: 1.根据套接字类型(Ipv4,Ipv6等)创建套接字socket 2.将套接字bind绑定到具体的网络地址和端口号 3.调用listen开始在这个套接字上进行监听. Unix提供了一个接口setsockopt()可以在bin

windows下通过bat脚本和计划任务实现设置某一服务的守护进程

通常服务器上跑的服务或者应用程序比较重要,如果无意间被关闭将造成不定程度的影响.通过为比较重要的服务设置守护进程,守护服务的进程.当服务关闭时可以自动开启,方法如下: 第一步:写守护进程的bat脚本  内容如下: 脚本内容中  set_task=RDO.exe意思为检查是否有RDO.exe进程. 要用的话就改成自己的进程名,如果进程宕了就过会自动重启(会在RDO.exe进程安装目录下生成一个start.bat) 其中 start.bat脚本内容中的start 后面的参数与set_svr后面的参数

搭建一个java web服务端

最近也是做了一个简单的java web 项目,由于以前也是没接触过,在这里记录下搭建一个web服务端的过程. 一般我们做一个服务端要么在本地自己的电脑上先安装环境,一般是windows系统,主要安装jdk + tomcat + mysql,这些安装教程网上都有,也很简单,我这里就不多说了,我要讲的是在一个远程linux服务器上搭建web服务端环境. 对于一个liunx服务器,我们可以使用xshell等终端工具登录来操作远程服务器,使用xshell的好处是,我们可以使用rz ,sz命令上传上载文件

Unity手游之路&lt;二&gt;Java版服务端使用protostuff简化protobuf开发

http://blog.csdn.net/janeky/article/details/17151465 开发一款网络游戏,首先要考虑的是客户端服务端之间用何种编码格式进行通信.之前我们介绍了Unity游戏前端使用protobuf的方法.今天我们来谈谈服务端如何使protobuf.游戏服务端语言百花齐放,除了比较传统的c/c++外,Java,Erlang,Python都有很多团队在使用. 今天推荐一下Java作为服务端开发语言.国内很多出色的页游和手游都是采用Java作为服务端语言的.比如<神曲

Android服务端开发1-使用Eclipse搭建Java Web服务端

本篇博客介绍如何使用Eclipse来创建一个Java Web程序,为后面讲通过Android客户端跟服务端进行交互打下基础,关于服务端可以选用的程序很多,主流的搭配是Android客户端+PHP服务端,我们也可以使用Android客户端+Java EE服务端这样的搭配,就看公司是以哪种方式提供了. 创建一个Java Web程序,没有特别复杂的流程,我们先准备一下原材料: 1. Eclipse(注:这个不是ADT Bundle,最好到官网下载针对开发Java EE的IDE,如果可以的话,选中MyE

Supervisor 为服务创建守护进程

今天需要再服务上部署一个.net 方面的项目:当时开启服务的命令只能在前台执行:使用nohub CMD &等放在后台开启服务都会宕机:所以搜寻了Supervisor 这个解决办法,为服务创建守护进程.具体操作如下 1.什么是守护进程 在linux或者unix操作系统中,守护进程(Daemon)是一种运行在后台的特殊进程,它独立于控制终端并且周期性的执行某种任务或等待处理某些发生的事件.由于在linux中,每个系统与用户进行交流的界面称为终端,每一个从此终端开始运行的进程都会依附于这个终端,这个终