WebSocket接口中有一个直接发送对象给页面的方法:
voidjavax.websocket.RemoteEndpoint.Basic.sendObject(Object
arg0) throws IOException,EncodeException
如果直接使用
client.session.getBasicRemote().sendObject(obj);
就会出现以下错误:
javax.websocket.EncodeException: No encoder specified for object of class [class org.ywzn.po.Messagepojo]
解决的方法有一个,就是需要一个编码器,具体步骤如下:
1、自定义的一个普通Java实体类,这个没有什么特别,一些属性、set、get、toString方法而已
package org.ywzn.po; import java.io.Serializable; /** * 发送给ActiveMQ的实体类 * * @author 夏小雪 日期:2015年6月11日 时间:下午12:14:18 */ public class Messagepojo implements Serializable { private static final long serialVersionUID = -6451812593150428369L; private String sourse;// 信息来源 private String messageType;// 消息类型 private String msgContent;// 消息内容 private String target;// 发送目的地 private String infoSourceIP;// 信息来源ip private String createtime;// 消息保存时间 private String otherContent;// 其他信息 public Messagepojo() { super(); } public Messagepojo(String sourse, String messageType, String msgContent, String target, String infoSourceIP, String createtime, String otherContent) { super(); this.sourse = sourse; this.messageType = messageType; this.msgContent = msgContent; this.target = target; this.infoSourceIP = infoSourceIP; this.createtime = createtime; this.otherContent = otherContent; } public String getSourse() { return sourse; } public void setSourse(String sourse) { this.sourse = sourse; } public String getMessageType() { return messageType; } public void setMessageType(String messageType) { this.messageType = messageType; } public String getMsgContent() { return msgContent; } public void setMsgContent(String msgContent) { this.msgContent = msgContent; } public String getTarget() { return target; } public void setTarget(String target) { this.target = target; } public String getInfoSourceIP() { return infoSourceIP; } public void setInfoSourceIP(String infoSourceIP) { this.infoSourceIP = infoSourceIP; } public String getCreatetime() { return createtime; } public void setCreatetime(String createtime) { this.createtime = createtime; } public String getOtherContent() { return otherContent; } public void setOtherContent(String otherContent) { this.otherContent = otherContent; } @Override public String toString() { return "Messagepojo [sourse=" + sourse + ", messageType=" + messageType + ", msgContent=" + msgContent + ", target=" + target + ", infoSourceIP=" + infoSourceIP + ", createtime=" + createtime + ", otherContent=" + otherContent + "]"; } }
2、编写自己的解码器
在encode()的这个方法内,可以自定义要返回的值,这里我需要的是Java对象转成的Json格式字符串,用的是一个自己写的工具类,这个可以根据自己的需求自定义,这里只给各位做一个参考。
package org.ywzn.websocket; import javax.websocket.EncodeException; import javax.websocket.Encoder; import javax.websocket.EndpointConfig; import org.ywzn.po.Messagepojo; import org.ywzn.util.Java2Json; import com.sdicons.json.mapper.MapperException; /** * definition for our encoder * * @编写人: 夏小雪 日期:2015年6月14日 时间:上午11:58:23 */ public class ServerEncoder implements Encoder.Text<Messagepojo> { @Override public void destroy() { // TODO Auto-generated method stub } @Override public void init(EndpointConfig arg0) { // TODO Auto-generated method stub } @Override public String encode(Messagepojo messagepojo) throws EncodeException { try { return Java2Json.JavaToJson(messagepojo, false); } catch (MapperException e) { // TODO Auto-generated catch block e.printStackTrace(); return null; } } }
3、WebSocket的服务终端
重要的地方只有一个:
@ServerEndpoint(value = "/websocket/news", encoders = { ServerEncoder.class })
encoders加入了刚刚自定义的解码类,这样就可以直接使用client.session.getBasicRemote().sendObject(obj);传入对象了。
package org.ywzn.websocket; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import javax.websocket.EncodeException; import javax.websocket.EndpointConfig; import javax.websocket.OnClose; import javax.websocket.OnError; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.ServerEndpoint; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.ywzn.activemq.Sender; import org.ywzn.po.Messagepojo; import org.ywzn.util.Java2Json; import com.sdicons.json.mapper.MapperException; /** * 消息协查中心 WebSocket 消息推送服务类 * * @author 夏小雪 日期:2015年6月10日 时间:上午9:48:59 */ @ServerEndpoint(value = "/websocket/news", encoders = { ServerEncoder.class }) public class NewsAnnotation { private static final Log log = LogFactory.getLog(NewsAnnotation.class); private static final String GUEST_PREFIX = "Guest"; private static final AtomicInteger connectionIds = new AtomicInteger(0); private static final Map<String, Object> connections = new HashMap<String, Object>(); private static final String[] GUESTNAME = { "高勇", "梁肇辉", "李燕", "梁晓晓", "蔡俊", "张新", "高帅", "夏小雪", "彭连英", "刘剑" }; private String nickname; private Session session; // 当前用户id private Integer id; // private HttpSession httpSession; public NewsAnnotation() { // nickname = GUEST_PREFIX + connectionIds.getAndIncrement(); // int index = connectionIds.getAndIncrement(); // if (index >= 8) { // connectionIds.set(0); // } // nickname = GUESTNAME[index]; } @OnOpen public void start(Session session, EndpointConfig config) { // Set<Entry<String, Object>> entrySet = // config.getUserProperties().entrySet(); // for (Iterator<Entry<String, Object>> iterator = entrySet.iterator(); // iterator.hasNext();) { // String key = iterator.next().getKey(); // System.out.println("[EndpointConfig-->key]:" + key); // System.out.println("value-->" + config.getUserProperties().get(key)); // PojoMethodMapping p = // (PojoMethodMapping)config.getUserProperties().get(key); // System.out.println("p.getWsPath()-->" + p.getWsPath()); // } // String negotiatedSubprotocol = session.getNegotiatedSubprotocol(); // System.out.println("[getAuthority]:" + // session.getRequestURI().getAuthority()); // System.out.println("[getFragment]:" + // session.getRequestURI().getFragment()); // System.out.println("[getPath]:" + session.getRequestURI().getPath()); // System.out.println("[getPort]:" + session.getRequestURI().getPort()); // System.out.println("[getQuery]:" + // session.getRequestURI().getQuery()); // System.out.println("[getRawUserInfo]:" + // session.getRequestURI().getRawAuthority()); // System.out.println("[getRawFragment]:" + // session.getRequestURI().getRawFragment()); // System.out.println("[getHost]:" + session.getRequestURI().getHost()); // System.out.println("[getRawUserInfo]:" + // session.getRequestURI().getRawUserInfo()); // System.out.println("[getScheme]:" + // session.getRequestURI().getScheme()); // System.out.println("[getSchemeSpecificPart]:" + // session.getRequestURI().getSchemeSpecificPart()); // System.out.println("[getUserInfo]:" + // session.getRequestURI().getUserInfo()); // System.out.println("[negotiatedSubprotocol]:" + // negotiatedSubprotocol); // Set<Entry<String, String>> entrySet = // session.getPathParameters().entrySet(); // for(Iterator<Entry<String, String>> iter = // entrySet.iterator();iter.hasNext();){ // System.out.println("[getKey]:" + iter.next().getKey()); // } // System.out.println("[session.getProtocolVersion()]:" + // session.getProtocolVersion()); this.session = session; id = Integer.parseInt(this.session.getId()); if (id > 8) { id = 0; } nickname = GUESTNAME[id]; System.out.println("当前用户的ID是:" + id + "\n用户姓名是:" + nickname); connections.put(nickname, this); String message = String.format("* %s %s", nickname, "已经登录消息协查中心."); // 群发 broadcast(message, "all"); // 发送登录的消息给activemq Sender.sendMsg("login:" + nickname); } @OnClose public void end() { System.out.println("消息中心连接关闭."); connections.remove(this); String message = String .format("* %s %s", nickname, "has disconnected."); // 群发 // broadcast(message, "all"); // 发送登录的消息给activemq // Sender.sendMsg("quit:" + nickname); } /** * 消息发送触发方法 * * @param message */ @OnMessage public void incoming(String message) { System.out.println("Java收到了网页[" + this.nickname + "]的消息[message]:" + message); // Never trust the client String filteredMessage = String.format("%s: %s", nickname, HTMLFilter.filter(message.toString())); // 发送登录的消息给activemq Messagepojo messagepojo = new Messagepojo(this.nickname, "用户对话", message, "all", this.id.toString(), new SimpleDateFormat( "yyyy-MM-dd hh:mm:ss").format(new Date()), ""); try { String javaToJson = Java2Json.JavaToJson(messagepojo, false); System.out.println("发送信息:" + javaToJson); Sender.sendMsg("Message-->" + javaToJson); } catch (MapperException e) { // TODO Auto-generated catch block e.printStackTrace(); } // Sender.sendMsg("sendMsg:" + message); // broadcast(filteredMessage, "all"); } @OnError public void onError(Throwable t) throws Throwable { log.error("Chat Error: " + t.toString(), t); } /** * 消息发送方法 author 夏小雪 日期:2015年6月10日 时间:上午9:49:45 * * @param msg * 发送的内容 * @param user * 发给指定的用户,如果要发给所有的则填""或"all" */ private static void broadcast(String msg, String user) { // 是否群发 if (user == null || "all".equals(user) || "".equals(user)) { sendAll(msg); } else { sendUser(msg, user); } } /** * 向所有用户发送 author 夏小雪 日期:2015年6月10日 时间:上午9:50:34 * * @param msg */ public static void sendAll(String msg) { for (String key : connections.keySet()) { NewsAnnotation client = null; try { client = (NewsAnnotation) connections.get(key); synchronized (client) { if (client.session.isOpen()) { // 如果这个session是打开的 client.session.getBasicRemote().sendText(msg); } } } catch (IOException e) { log.debug("Chat Error: Failed to send message to client", e); connections.remove(client); try { client.session.close(); } catch (IOException e1) { // Ignore } String message = String.format("* %s %s", client.nickname, "has been disconnected."); // 群发 broadcast(message, "all"); } } } /** * 向所有用户发送 * * @编写人: 夏小雪 日期:2015年6月14日 时间:上午11:14:46 * @param obj * 对象 */ public static void sendAll(Object obj) { for (String key : connections.keySet()) { NewsAnnotation client = null; try { client = (NewsAnnotation) connections.get(key); synchronized (client) { if (client.session.isOpen()) { // 如果这个session是打开的 client.session.getBasicRemote().sendObject(obj); } } } catch (IOException e) { log.debug("Chat Error: Failed to send message to client", e); connections.remove(client); try { client.session.close(); } catch (IOException e1) { // Ignore } String message = String.format("* %s %s", client.nickname, "has been disconnected."); // 群发 broadcast(message, "all"); } catch (EncodeException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } /** * 向指定用户发送消息 author 夏小雪 日期:2015年6月10日 时间:上午9:50:45 * * @param msg * 内容 * @param user * 用户 */ public static void sendUser(String msg, String user) { // 获取要发送的用户 NewsAnnotation c = (NewsAnnotation) connections.get(user); try { if (c != null) { c.session.getBasicRemote().sendText(msg); } } catch (IOException e) { log.debug("Chat Error: Failed to send message to client", e); connections.remove(c); try { c.session.close(); } catch (IOException e1) { // Ignore } String message = String.format("* %s %s", c.nickname, "has been disconnected."); // 群发 broadcast(message, "all"); } } }
我这里已经搭建了WebSocket和ActiveMQ的框架,下面截图看下效果:
参考资料:
编码器的任务是将应用程序特定的数据转换成可传送到客户机端点格式。创建一个编码器,我们将要知道的一些接口,就像这样,我们不得不在创建解码器。
Encoder | The Encoder interface defines how developers can provide a way to convert their custom objects into web socket messages. The Encoder interface contains subinterfaces that allow encoding algorithms to encode custom objects to: text, binary data, character stream and write to an output stream. |
Encoder.Binary<T> | This interface defines how to provide a way to convert a custom object into a binary message. |
Encoder.BinaryStream<T> | This interface may be implemented by encoding algorithms that want to write the encoded object to a binary stream. |
Encoder.Text<T> | This interface defines how to provide a way to convert a custom object into a text message. |
Encoder.TextStream<T> | This interface may be implemented by encoding algorithms that want to write the encoded object to a character stream. |
- See more at: http://www.javabeat.net/encoder-websocket-in-javaee-7/#sthash.fmBFmc2Q.dpuf
参考:http://www.javabeat.net/encoder-websocket-in-javaee-7/