业务层的代码也应该是面向接口编程,先抽象一个接口或是抽象类,规范一些算法或者功能框架,再在其子类或是实现类中完成具体的方法,易于后期代码的维护。
1、业务层缓存技术
如果数据对实时性要求不高,可以把数据缓存在内存中,提高效率。一般都是利用集合来缓存数据。如下代码:
/** * 存放写线程的缓存器 * * @author way */ public class OutputThreadMap { private HashMap<Integer, OutputThread> map; <span style="color:#ff0000;">private static OutputThreadMap instance;</span> // 私有构造器,防止被外面实例化改对像 private OutputThreadMap() { map = new HashMap<Integer, OutputThread>(); } /<span style="color:#ff0000;">/ 单例模式像外面提供该对象</span> public synchronized static OutputThreadMap getInstance() { if (instance == null) { instance = new OutputThreadMap(); } return instance; } // 添加写线程的方法 public synchronized void add(Integer id, OutputThread out) { map.put(id, out); } // 移除写线程的方法 public synchronized void remove(Integer id) { map.remove(id); } // 取出写线程的方法,群聊的话,可以遍历取出对应写线程 public synchronized OutputThread getById(Integer id) { return map.get(id); } // 得到所有写线程方法,用于向所有在线用户发送广播 public synchronized List<OutputThread> getAll() { List<OutputThread> list = new ArrayList<OutputThread>(); for (Map.Entry<Integer, OutputThread> entry : map.entrySet()) { list.add(entry.getValue()); } return list; } }
2、读写线程代码
/** * 读消息线程和处理方法 * * @author way * */ public class InputThread extends Thread { private Socket socket;// socket对象 private OutputThread out;// 传递进来的写消息线程,因为我们要给用户回复消息啊 private OutputThreadMap map;// 写消息线程缓存器 private ObjectInputStream ois;// 对象输入流 private boolean isStart = true;// 是否循环读消息 public InputThread(Socket socket, OutputThread out, OutputThreadMap map) { this.socket = socket; this.out = out; this.map = map; try { ois = new ObjectInputStream(socket.getInputStream());// 实例化对象输入流 } catch (IOException e) { e.printStackTrace(); } } public void setStart(boolean isStart) {// 提供接口给外部关闭读消息线程 this.isStart = isStart; } @Override public void run() { try { while (isStart) { // 读取消息 readMessage(); } if (ois != null) ois.close(); if (socket != null) socket.close(); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } /** * 读消息以及处理消息,抛出异常 * * @throws IOException * @throws ClassNotFoundException */ public void readMessage() throws IOException, ClassNotFoundException { Object readObject = ois.readObject();// 从流中读取对象 UserDao dao = UserDaoFactory.getInstance();// 通过dao模式管理后台 if (readObject != null && readObject instanceof TranObject) { TranObject read_tranObject = (TranObject) readObject;// 转换成传输对象 switch (read_tranObject.getType()) { case REGISTER:// 如果用户是注册 User registerUser = (User) read_tranObject.getObject(); int registerResult = dao.register(registerUser); System.out.println(MyDate.getDateCN() + " 新用户注册:" + registerResult); // 给用户回复消息 TranObject<User> register2TranObject = new TranObject<User>( TranObjectType.REGISTER); User register2user = new User(); register2user.setId(registerResult); register2TranObject.setObject(register2user); out.setMessage(register2TranObject); break; case LOGIN: User loginUser = (User) read_tranObject.getObject(); ArrayList<User> list = dao.login(loginUser); TranObject<ArrayList<User>> login2Object = new TranObject<ArrayList<User>>( TranObjectType.LOGIN); if (list != null) {// 如果登录成功 TranObject<User> onObject = new TranObject<User>( TranObjectType.LOGIN); User login2User = new User(); login2User.setId(loginUser.getId()); onObject.setObject(login2User); for (OutputThread onOut : map.getAll()) { onOut.setMessage(onObject);// 广播一下用户上线 } map.add(loginUser.getId(), out);// 先广播,再把对应用户id的写线程存入map中,以便转发消息时调用 login2Object.setObject(list);// 把好友列表加入回复的对象中 } else { login2Object.setObject(null); } out.setMessage(login2Object);// 同时把登录信息回复给用户 System.out.println(MyDate.getDateCN() + " 用户:" + loginUser.getId() + " 上线了"); break; case LOGOUT:// 如果是退出,更新数据库在线状态,同时群发告诉所有在线用户 User logoutUser = (User) read_tranObject.getObject(); int offId = logoutUser.getId(); System.out .println(MyDate.getDateCN() + " 用户:" + offId + " 下线了"); dao.logout(offId); isStart = false;// 结束自己的读循环 map.remove(offId);// 从缓存的线程中移除 out.setMessage(null);// 先要设置一个空消息去唤醒写线程 out.setStart(false);// 再结束写线程循环 TranObject<User> offObject = new TranObject<User>( TranObjectType.LOGOUT); User logout2User = new User(); logout2User.setId(logoutUser.getId()); offObject.setObject(logout2User); for (OutputThread offOut : map.getAll()) {// 广播用户下线消息 offOut.setMessage(offObject); } break; case MESSAGE:// 如果是转发消息(可添加群发) // 获取消息中要转发的对象id,然后获取缓存的该对象的写线程 int id2 = read_tranObject.getToUser(); OutputThread toOut = map.getById(id2); if (toOut != null) {// 如果用户在线 toOut.setMessage(read_tranObject); } else {// 如果为空,说明用户已经下线,回复用户 TextMessage text = new TextMessage(); text.setMessage("亲!对方不在线哦,您的消息将暂时保存在服务器"); TranObject<TextMessage> offText = new TranObject<TextMessage>( TranObjectType.MESSAGE); offText.setObject(text); offText.setFromUser(0); out.setMessage(offText); } break; case REFRESH: List<User> refreshList = dao.refresh(read_tranObject .getFromUser()); TranObject<List<User>> refreshO = new TranObject<List<User>>( TranObjectType.REFRESH); refreshO.setObject(refreshList); out.setMessage(refreshO); break; default: break; } } } }
/** * 写消息线程 * * @author way * */ public class OutputThread extends Thread { private OutputThreadMap map; private ObjectOutputStream oos; private TranObject object; private boolean isStart = true;// 循环标志位 private Socket socket; public OutputThread(Socket socket, OutputThreadMap map) { try { this.socket = socket; this.map = map; oos = new ObjectOutputStream(socket.getOutputStream());// 在构造器里面实例化对象输出流 } catch (IOException e) { e.printStackTrace(); } } public void setStart(boolean isStart) { this.isStart = isStart; } // 调用写消息线程,设置了消息之后,唤醒run方法,可以节约资源 public void setMessage(TranObject object) { this.object = object; synchronized (this) { notify(); } } @Override public void run() { try { while (isStart) { // 没有消息写出的时候,线程等待 synchronized (this) { wait(); } if (object != null) { oos.writeObject(object); oos.flush(); } } if (oos != null)// 循环结束后,关闭流,释放资源 oos.close(); if (socket != null) socket.close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }
服务端代码:
/** * 服务器,接受用户登录、离线、转发消息 * * @author way * */ public class Server { private ExecutorService executorService;// 线程池 private ServerSocket serverSocket = null; private Socket socket = null; private boolean isStarted = true; public Server() { try { // 创建线程池,池中具有(cpu个数*50)条线程 executorService = Executors.newFixedThreadPool(Runtime.getRuntime() .availableProcessors() * 50); serverSocket = new ServerSocket(Constants.SERVER_PORT); } catch (IOException e) { e.printStackTrace(); quit(); } } public void start() { System.out.println(MyDate.getDateCN() + " 服务器已启动..."); try { while (isStarted) { socket = serverSocket.accept(); String ip = socket.getInetAddress().toString(); System.out.println(MyDate.getDateCN() + " 用户:" + ip + " 已建立连接"); // <span style="color:#ff0000;">为支持多用户并发访问,采用线程池管理每一个用户的连接请求</span> if (socket.isConnected()) executorService.execute(new SocketTask(socket));// 添加到线程池 } if (socket != null) socket.close(); if (serverSocket != null) serverSocket.close(); } catch (IOException e) { e.printStackTrace(); // isStarted = false; } } private final class SocketTask implements Runnable { private Socket socket = null; private InputThread in; private OutputThread out; private OutputThreadMap map; public SocketTask(Socket socket) { this.socket = socket; map = OutputThreadMap.getInstance(); } @Override public void run() { out = new OutputThread(socket, map);// // 先实例化写消息线程,(把对应用户的写线程存入map缓存器中) in = new InputThread(socket, out, map);// 再实例化读消息线程 out.setStart(true); in.setStart(true); in.start(); out.start(); } } /** * 退出 */ public void quit() { try { this.isStarted = false; serverSocket.close(); } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { new Server().start(); } }
时间: 2024-10-19 22:35:33