创建DatagramChannel的模式和创建其他socket通道是一样的:调用静态的open( )方法来创建一个新实例。新DatagramChannel会有一个可以通过调用socket( )方法获取的对等DatagramSocket对象。DatagramChannel对象既可以充当服务器(监听者)也可以充当客户端(发送者)。如果您希望新创建的通道负责监听,那么通道必须首先被绑定到一个端口或地址/端口组合上。绑定DatagramChannel同绑定一个常规的DatagramSocket没什么区别,都是委托对等socket对象上的API实现的:
DatagramChannel channel = );DatagramSocket socket = channel.socket( ); socket.bind (new InetSocketAddress (portNumber));
一个未绑定的DatagramChannel仍能接收数据包。当一个底层socket被创建时,一个动态生成的端口号就会分配给它。绑定行为要求通道关联的端口被设置为一个特定的值(此过程可能涉及安全检查或其他验证)。不论通道是否绑定,所有发送的包都含有DatagramChannel的源地址(带端口号)。未绑定的DatagramChannel可以接收发送给它的端口的包,通常是来回应该通道之前发出的一个包。已绑定的通道接收发送给它们所绑定的熟知端口(wellknown port)的包。数据的实际发送或接收是通过send( )和receive( )方法来实现的:
public abstract class DatagramChannel extends AbstractSelectableChannel implements ByteChannel, ScatteringByteChannel, GatheringByteChannel { // This is a partial API listing public abstract SocketAddress receive(ByteBuffer dst) throws IOException; public abstract int send(ByteBuffer src, SocketAddress target); }
receive( )方法将下次将传入的数据报的数据净荷复制到预备好的ByteBuffer中并返回一个SocketAddress对象以指出数据来源。如果通道处于阻塞模式,receive( )可能无限期地休眠直到有包到达。如果是非阻塞模式,当没有可接收的包时则会返回null。如果包内的数据超出缓冲区能承受的范围,多出的数据都会被悄悄地丢弃。
调用send( )会发送给定ByteBuffer对象的内容到给定SocketAddress对象所描述的目的地址和端口,内容范围为从当前position开始到末尾处结束。如果DatagramChannel对象处于阻塞模式,调用线程可能会休眠直到数据报被加入传输队列。如果通道是非阻塞的,返回值要么是字节缓冲区的字节数,要么是“0”。发送数据报是一个全有或全无(all-or-nothing)的行为。如果传输队列没有足够空间来承载整个数据报,那么什么内容都不会被发送。
如果安装了安全管理器,那么每次调用send( )或receive( ) 时安全管理器的checkConnect( )方法都会被调用以验证目的地址,除非通道处于已连接的状态 。
请注意,数据报协议的不可靠性是固有的,它们不对数据传输做保证。send( )方法返回的非零值并不表示数据报到达了目的地,仅代表数据报被成功加到本地网络层的传输队列。此外,传输过程中的协议可能将数据报分解成碎片。例如,以太网不能传输超过1,500个字节左右的包。如果您的数据报比较大,那么就会存在被分解成碎片的风险,成倍地增加了传输过程中包丢失的几率。被分解的数据报在目的地会被重新组合起来,接收者将看不到碎片。但是,如果有一个碎片不能按时到达,那么整个数据报将被丢弃。
DatagramChannel有一个connect( )方法:
public abstract class DatagramChannel extends AbstractSelectableChannel implements ByteChannel, ScatteringByteChannel, GatheringByteChannel { // This is a partial API listing public abstract DatagramChannel connect(SocketAddress remote) throws IOException; public abstract boolean isConnected(); public abstract DatagramChannel disconnect() throws IOException; }
当DatagramChannel已连接时,使用同样的令牌,您不可以发送包到除了指定给connect( )方法的目的地址以外的任何其他地址。试图一定要这样做的话会导致一个SecurityException异常
我们可以通过调用带SocketAddress对象的connect( )方法来连接一个DatagramChannel,该SocketAddress对象描述了DatagramChannel远程对等体的地址。如果已经安装了一个安全管理器,那么它会进行权限检查。之后,每次send/receive时就不会再有安全检查了,因为来自或去到任何其他地址的包都是不允许的。
不同于流socket,数据报socket的无状态性质不需要同远程系统进行对话来建立连接状态。没有实际的连接,只有用来指定允许的远程地址的本地状态信息。由于此原因,DatagramChannel上也就没有单独的finishConnect( )方法。我们可以使用isConnected( )方法来测试一个数据报通道的连接状态。
不同于SocketChannel(必须连接了才有用并且只能连接一次),DatagramChannel对象可以任意次数地进行连接或断开连接。每次连接都可以到一个不同的远程地址。调用disconnect( )方法可以配置通道,以便它能再次接收来自安全管理器(如果已安装)所允许的任意远程地址的数据或发送数据到这些地址上。
当一个DatagramChannel处于已连接状态时,发送数据将不用提供目的地址而且接收时的源地址也是已知的。这意味着DatagramChannel已连接时可以使用常规的read( )和write( )方法,包括scatter/gather形式的读写来组合或分拆包的数据:
public abstract class DatagramChannel extends AbstractSelectableChannel implements ByteChannel, ScatteringByteChannel, GatheringByteChannel { // This is a partial API listing public abstract int read(ByteBuffer dst) throws IOException; public abstract long read(ByteBuffer[] dsts) throws IOException; public abstract long read(ByteBuffer[] dsts, int offset, int length) throws IOException; public abstract int write(ByteBuffer src) throws IOException; public abstract long write(ByteBuffer[] srcs) throws IOException; public abstract long write(ByteBuffer[] srcs, int offset, int length) throws IOException; }
read( )方法返回读取字节的数量,如果通道处于非阻塞模式的话这个返回值可能是“0”。write( )方法的返回值同send( )方法一致:要么返回您的缓冲区中的字节数量,要么返回“0”(如果由于通道处于非阻塞模式而导致数据报不能被发送)。当通道不是已连接状态时调用read( )或write( )方法,都将产生NotYetConnectedException异常。
? 您的程序可以承受数据丢失或无序的数据。
? 您希望“发射后不管”(fire and forget)而不需要知道您发送的包是否已接收。
? 数据吞吐量比可靠性更重要。
? 您需要同时发送数据给多个接受者(多播或者广播)。
? 包隐喻比流隐喻更适合手边的任务。
/** * */ package test.noi.datagramChannel.timeServer; import; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.DatagramChannel; import java.util.Date; import java.util.Iterator; import java.util.LinkedList; import java.util.List; /** * * Request time service, per RFC 868. RFC 868 * * ( is a very simple time protocol * * whereby one system can request the current time from another system. * Most * Linux, BSD and Solaris systems provide RFC 868 time service * on port 37. This * simple program will inter-operate with those. * The National Institute of * Standards and Technology (NIST) operates * a public time server at * ** The RFC 868 protocol specifies a 32 bit unsigned value be * sent, * representing the number of seconds since Jan 1, 1900. The Java * epoch * begins on Jan 1, 1970 (same as unix) so an adjustment is * made by adding or * subtracting 2,208,988,800 as appropriate. To * avoid shifting and masking, a * four-byte slice of an * eight-byte buffer is used to send/recieve. But * getLong( ) * is done on the full eight bytes to get a long value. ** When run, * this program will issue time requests to each hostname * given on the command * line, then enter a loop to receive packets. * Note that some requests or * replies may be lost, which means * this code could block forever. ** @author * Ron Hitchens ([email protected]) */ public class TimeClient { private static final int DEFAULT_TIME_PORT = 37; private static final long DIFF_1900 = 2208988800L; protected int port = DEFAULT_TIME_PORT; protected List remoteHosts; protected DatagramChannel channel; public TimeClient(String[] argv) throws Exception { if (argv.length == 0) { throw new Exception("Usage: [ -p port ] host ..."); } parseArgs(argv); =; } protected InetSocketAddress receivePacket(DatagramChannel channel, ByteBuffer buffer) throws Exception { buffer.clear(); // Receive an unsigned 32-bit, big-endian value return ((InetSocketAddress) channel.receive(buffer)); } // Send time requests to all the supplied hosts protected void sendRequests() throws Exception { ByteBuffer buffer = ByteBuffer.allocate(1); Iterator it = remoteHosts.iterator(); while (it.hasNext()) { InetSocketAddress sa = (InetSocketAddress); System.out.println("Requesting time from " + sa.getHostName() + ":" + sa.getPort()); // Make it empty (see RFC868) buffer.clear().flip(); // Fire and forget channel.send(buffer, sa); } } // Receive any replies that arrive. public void getReplies() throws Exception { // Allocate a buffer to hold a long value ByteBuffer longBuffer = ByteBuffer.allocate(8); // Assure big-endian (network) byte order longBuffer.order(ByteOrder.BIG_ENDIAN); // Zero the whole buffer to be sure longBuffer.putLong(0, 0); // Position to first byte of the low-order 32 bits longBuffer.position(4); // Slice the buffer; gives view of the low-order 32 bits ByteBuffer buffer = longBuffer.slice(); int expect = remoteHosts.size(); int replies = 0; System.out.println(""); System.out.println("Waiting for replies..."); while (true) { InetSocketAddress sa; sa = receivePacket(channel, buffer); buffer.flip(); replies++; printTime(longBuffer.getLong(0), sa); if (replies == expect) { System.out.println("All packets answered"); break; } // Some replies haven‘t shown up yet System.out.println("Received " + replies + " of " + expect + " replies"); } } // Print info about a received time reply protected void printTime(long remote1900, InetSocketAddress sa) { // local time as seconds since Jan 1, 1970 long local = System.currentTimeMillis() / 1000; // remote time as seconds since Jan 1, 1970 long remote = remote1900 - DIFF_1900; Date remoteDate = new Date(remote * 1000); Date localDate = new Date(local * 1000); long skew = remote - local; System.out.println("Reply from " + sa.getHostName() + ":" + sa.getPort()); System.out.println(" there: " + remoteDate); System.out.println(" here: " + localDate); System.out.print(" skew: "); if (skew == 0) { System.out.println("none"); } else if (skew > 0) { System.out.println(skew + " seconds ahead"); } else { System.out.println((-skew) + " seconds behind"); } } protected void parseArgs(String[] argv) { remoteHosts = new LinkedList(); for (int i = 0; i < argv.length; i++) { String arg = argv[i]; // Send client requests to the given port if (arg.equals("-p")) { i++; this.port = Integer.parseInt(argv[i]); continue; } // Create an address object for the hostname InetSocketAddress sa = new InetSocketAddress(arg, port); // Validate that it has an address if (sa.getAddress() == null) { System.out.println("Cannot resolve address: " + arg); continue; } remoteHosts.add(sa); } } // -------------------------------------------------------------- public static void main(String[] argv) throws Exception { TimeClient client = new TimeClient(argv); client.sendRequests(); client.getReplies(); } }
例3-9 使用DatagramChannel 的时间服务客户端
/** * */ package; import; import; import java.lang.reflect.InvocationTargetException; import java.sql.SQLException; import java.sql.Timestamp; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import javax.annotation.Resource; import javax.xml.bind.PropertyException; import org.apache.commons.fileupload.FileItem; import org.apache.commons.httpclient.HttpException; import; import; import org.hibernate.Query; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Controller; import cn.jorcen.commons.lang.Assert; import cn.jorcen.commons.dao.utils.SearchMap; import; import; import cn.jorcen.commons.lang.CollectionUtil; import cn.jorcen.commons.lang.DateUtil; import cn.jorcen.commons.lang.StringUtil; import cn.jorcen.dropins.lbs.web.geocoding.LbsGeocodingUtil; import cn.jorcen.dropins.lbs.web.geocoding.bean.ReGeocodingResponse; import cn.jorcen.dropins.lbs.web.poi.LbsPlacePOISearchUtil; import cn.jorcen.dropins.lbs.web.poi.bean.Location; import cn.jorcen.exception.core.BusinessException; import; import com.sai.activities.po.TSdAuction; import com.sai.activities.service.impl.AbstractAuctionGatheredServiceImpl; import com.sai.activities.service.impl.DolistAuctionServiceImpl; import com.sai.commons.po.TSdFeedback; import com.sai.commons.server.BaseServer; import com.sai.context.ServiceAware; import com.sai.user.po.TSdUser; import com.umeng.push.UmengPush; /** * * * @author mjorcen * @email [email protected] * @dateTime May 9, 2014 4:47:16 PM * @version 1 */ @Controller @Scope("prototype") public class AppServer extends BaseServer { protected static String wt_log_uri; protected static String wt_log_path; protected static String wt_file_uri; protected static String wt_file_path; private final static String properties = "properties/"; static { try { wt_log_uri = getFileUploadProperty(WT_LOG); wt_log_path = getFileUploadRealPath(WT_LOG); wt_file_uri = getFileUploadProperty(WT_FILE); wt_file_path = getFileUploadRealPath(WT_FILE); } catch (Exception e) { e.printStackTrace(); } } private TSdFeedback feedback; @Resource private ServiceAware serviceAware; private Location location; public void version() throws PropertyException, IOException, IllegalAccessException, InvocationTargetException, SQLException { // app版本 Map<String, String> versionMap = getVersion(); super.generalResponse.setResultData(versionMap); } private String getProperty(String key) throws PropertyException, IOException { return PropertiesFactory.getClassPathProperty(properties, key); } public void push() throws PropertyException, IOException { cn.jorcen.commons.lang.Assert.notNull(chownId, "chownId 不能为空"); Map<String, Object> map = new HashMap<String, Object>(4); map.put("sysMsgCount", null); map.put("version", null); map.put("auctionMsg", null); map.put("custom", null); TSdUser user = this.serviceAware.getUserService().assertUser( super.chownId); // 系统消息数 long count = this.serviceAware.getMsgService().getSystemMsgCount( super.chownId, user.getLastActivateSystemMsgTime()); map.put("sysMsgCount", count); // app版本 Map<String, String> versionMap = getVersion(); map.put("version", versionMap); SearchMap searchMap = new SearchMap(); Timestamp date = new Timestamp(System.currentTimeMillis()); searchMap.addAndSearchCondition() .between("auctionBeginTime", date, DateUtil.addDays(date, 1)) .le("auctionStatus", "3"); searchMap.addOrders("auctionStatus", "desc"); searchMap.addOrders("auctionBeginTime"); searchMap.addOrders("productName"); List<TSdAuction> auctions = this.serviceAware.getAuctionService().find( searchMap); if (CollectionUtil.isNotEmpty(auctions)) { new DolistAuctionServiceImpl(serviceAware).sortAuction( super.chownId, auctions); List<Map> maps = new ArrayList<Map>(auctions.size()); for (TSdAuction tSdAuction : auctions) { if (tSdAuction.getRegistration()) { maps.add(getAuctionMap(tSdAuction)); } } if (CollectionUtil.isEmpty(maps)) { maps.add(getAuctionMap(auctions.get(0))); } map.put("auctionMsg", maps); } super.generalResponse.setResultData(map); } public void umengPush() throws Exception { UmengPush.sendGroupcast("213123", "fasfas", "1231231"); } private Map<String, String> getVersion() throws PropertyException, IOException { String version = null; String url = null; String msg = null; try { if (super.msg.equalsIgnoreCase("android")) { version = getProperty("AndroidVersion"); msg = getProperty("AndroidMsg"); url = getProperty("AndroidUrl"); } else { version = getProperty("IOSVersion"); url = getProperty("IOSUrl"); msg = getProperty("IOSMsg"); } } catch (Exception e) { } Map<String, String> versionMap = new HashMap<String, String>(2); versionMap.put("version", version); versionMap.put("msg", msg); versionMap.put("url", url); return versionMap; } public void doFake() throws IllegalAccessException, InvocationTargetException, SQLException { DoFalseComment.doFalse(serviceAware); } private Map<String, Object> getAuctionMap(TSdAuction tSdAuction) { Map<String, Object> auctionMap = new HashMap<String, Object>(); auctionMap.put("auctionBeginTime", tSdAuction.getAuctionBeginTime()); auctionMap.put("productName", tSdAuction.getProductName()); auctionMap.put("registration", tSdAuction.getRegistration()); return auctionMap; } public void log() throws Exception { List<UploadFile> files = filePropertyUpload(upload, uploadFileName, uploadContentType, wt_log_path, wt_log_uri); for (UploadFile uploadFile : files) {; } write(getJson(BASE_JSON)); } public void isOpenVoucher() throws HttpException, BusinessException, PropertyException, IOException { String city = getCity(); StringBuilder hql = new StringBuilder("select count(*) from "); hql.append(TSdFeedback.class.getName()).append( " where feedbackInfo = ‘openVoucher‘ "); if (this.location != null) { if (StringUtil.hasText(city)) { hql.append(" and feedbackTitle = ‘").append(city).append("‘ "); } } Query query = this.serviceAware.getUserService().getSession() .createQuery(hql.toString()); Long count = (Long) query.list().get(0); super.generalResponse.setResultData(count); query = this.serviceAware .getUserService() .getSession() .createQuery( hql.append(" and userId = ‘").append(super.chownId) .append("‘ ").toString()); count = (Long) query.list().get(0); if (count == 0) { super.generalResponse.setResultAddedData(false); } else { super.generalResponse.setResultAddedData(true); } super.generalResponse.setResultInfo(StringUtil.isBlank(city) ? "全国" : city); } public void openVoucher() throws BusinessException, HttpException, PropertyException, IOException { Assert.notNull(this.chownId, "chownId 不能为空"); SearchMap searchMap = new SearchMap(); TSdUser user = this.serviceAware.getUserService().assertUser( this.chownId); this.serviceAware.getUserService().isLock(user, true); if ( == null) { = new TSdFeedback(); } if (StringUtil.isNotBlank(feedback.getFeedbackTitle())) { } else if (location != null) { String city = getCity(); feedback.setFeedbackTitle(city); } cn.jorcen.commons.dao.utils.SearchCondition sc = searchMap .addAndSearchCondition().eq("userId", this.chownId) .eq("feedbackInfo", "openVoucher"); if (StringUtil.isNotBlank(feedback.getFeedbackTitle())) { sc.eq("feedbackTitle", feedback.getFeedbackTitle()); } List list = this.serviceAware.getUserService().find(searchMap, TSdFeedback.class); if (CollectionUtil.isNotEmpty(list)) { throw new BusinessException("你已经申请开通了"); } feedback.setUserId(user.getUserId()); feedback.setFeedbackInfo("openVoucher"); this.serviceAware.getUserService().save(feedback); } private String getCity() throws HttpException, IOException, BusinessException, PropertyException { Assert.notNull(this.location, ",location.lng 不能为空"); Assert.notNull(this.location.getLat(), "lat 不能为空"); Assert.notNull(this.location.getLng(), "lng 不能为空"); ReGeocodingResponse address = LbsGeocodingUtil.reGeocoder( location.getLng() + "", location.getLat() + "", LbsPlacePOISearchUtil.getKey()); String city = address.getResult().getAddressComponent().getCity(); return city; } public void uploadFile() throws Exception, IOException { cn.jorcen.commons.lang.Assert.notEmpty(upload, "文件不能为空"); for (int i = 0; i < upload.length; i++) { File file = upload[i]; File destFile = new File(wt_file_path, uploadFileName[i]); FileUtils.copyFile(file, destFile); file.delete(); } } public TSdFeedback getFeedback() { return feedback; } public void setFeedback(TSdFeedback feedback) { = feedback; } public Location getLocation() { return location; } public void setLocation(Location location) { this.location = location; } }
例3-10中的程序是一个RFC 868时间服务器。这段代码回答来自例3-9中的客户端的请求并显示出DatagramChannel是怎样绑定到一个熟知端口然后开始监听来自客户端的请求的。该时间服务器仅监听数据报(UDP)请求。大多数Unix和Linux系统提供的rdate命令使用TCP协议连接到一个RFC 868时间服务。
以上内容出自 nio 一书.