@lemonguge
2015-07-03T04:29:05.000000Z
字数 10008
阅读 370
NIO
nio开始支持scatter/gather,专业术语称之为“分散/聚集”,用于描述从通道中读取或者写入到通道的操作。
Channel中读取是指在读操作时将读取的数据写入多个Buffer中。因此,Channel将从Channel中读取的数据“分散(scatter)”到多个Buffer中。Channel是指在写操作时将多个Buffer的数据写入同一个Channel,因此,Channel将多个Buffer中的数据“聚集(gather)”后发送到Channel。简单的说,一个分散的读取就像一个常规通道读取,只不过它是将数据读到一个缓冲区数组中而不是读到单个缓冲区中。同样地,一个聚集写入是向缓冲区数组而不是向单个缓冲区写入数据。
分散和聚集经常用于需要将传输的数据分开处理的场合,例如传输一个由消息头和消息体组成的消息,你可能会将消息体和消息头分散到不同的buffer中,这样你可以方便的处理消息头和消息体。
通道可以有选择地实现两个新的接口:ScatteringByteChannel和GatheringByteChannel。FileChannel文件通道就实现了这两个接口,下面将对这两个接口进行介绍。
ScatteringByteChannel可将字节读入缓冲区序列的通道(分散) long read(ByteBuffer[] dsts)将字节序列从此通道读入给定的缓冲区。如果该通道已到达流的末尾,则返回-1;long read(ByteBuffer[] dsts, int offset, int length)将字节序列从此通道读入给定缓冲区的子序列中。方法中offset是要读入字节的第一个缓冲区在缓冲区数组中的偏移量,length为要访问的最大缓冲区数。分散读取的read方法很类似标准的raad方法,只不过它们不是取单个缓冲区而是取一个缓冲区数组。通道依次填充每个缓冲区,填满一个缓冲区后,它就开始填充下一个。在某种意义上,缓冲区数组就像一个大缓冲区。这也意味着它不适用于动态消息(消息大小不固定)。换句话说,如果存在消息头和消息体,每次消息头大小都应该固定(例如 128byte),分散读取才能按照我们期望的工作。

GatheringByteChannel可从缓冲区序列写入字节的通道(聚集) long write(ByteBuffer[] srcs)将字节序列从给定的缓冲区写入此通道,返回写入的字节数;long write(ByteBuffer[] srcs, int offset, int length)将字节序列从给定缓冲区的子序列写入此通道。聚集写入类似于分散读取,只不过是用来写入,把一组单独的缓冲区中组成单个数据流。可以使用聚集写入来自动将网络消息的各个部分组装为单个数据流,以便跨越网络传输消息。聚集写入能较好的处理动态消息。

正如上一篇所介绍的“DatagramChannel通过UDP读写网络中的数据”,因为UDP是无连接的网络协议,所以不能像其它通道那样读取和写入,它发送和接收的是数据包。由于该类也实现了分散和聚集的接口,因此也会具有上面所讲诉的方法。
首先我们来明确一下接收端的设计思路:
DatagramChannel类的静态open()打开一个数据报通道;socket()方法获取与此通道关联的数据报套接字;bind()绑定到特定的地址和端口;receive()方法从DatagramChannel接收数据。注意receive()方法,该方法将数据报复制到给定的字节缓冲区中并返回数据报的源地址(如同read操作一样)。如果缓冲区中的剩余字节空间小于保存数据报所需的空间,则丢弃余下的数据报。
import java.io.IOException;import java.net.DatagramSocket;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.DatagramChannel;public class UDPRece {public static void main(String[] args) throws IOException {DatagramChannel receChnl = DatagramChannel.open();DatagramSocket ds = receChnl.socket();ds.bind(new InetSocketAddress(8001));ByteBuffer buf = ByteBuffer.allocate(1024);receChnl.receive(buf);buf.flip();while (buf.hasRemaining()) { // 判断当前位置和限制之间是否有元素System.out.print((char) buf.get());}}} /* Output: // 启动接收端后,再启动下面的发送端,可以看到下面输出abc123*///:~
receive()方法默认为阻塞式方法,我们可以调用通道的configureBlocking(false)来调整此通道的阻塞模式为非阻塞式。
同样来明确一下发送端的设计思路:
DatagramChannel类的静态open()打开一个数据报通道;send()方法从DatagramChannel发送数据。注意send()方法,该方法接收两个参数,分别为:要发送的数据报的缓冲区和将数据报发送到的地址,将返回发送的字节数。因为UDP在数据传送方面没有任何保证,所以不会通知你发出的数据包是否已收到。
import java.io.IOException;import java.net.InetAddress;import java.net.InetSocketAddress;import java.net.SocketAddress;import java.nio.ByteBuffer;import java.nio.channels.DatagramChannel;public class UDPSend {public static void main(String[] args) throws IOException {// 打开数据报通道DatagramChannel sendChnl = DatagramChannel.open();// 要发送数据报的缓冲区ByteBuffer buf = ByteBuffer.wrap("abc123".getBytes());// 将数据报发送到的目的地址SocketAddress target = new InetSocketAddress(InetAddress.getLocalHost(), 8001);int byteSend = sendChnl.send(buf, target);System.out.println(byteSend);}} /* Output:6*///:~
这个方法比较特殊,查看API可以发现该方法的意思是“连接此通道的套接字”,将DatagramChannel“连接”到网络中的特定地址的。由于UDP是无连接的,连接到特定地址并不会像TCP通道那样创建一个真正的连接。而是锁住DatagramChannel,让其只能从特定地址收发数据。
注意,一旦连接后,就无法和任何其他地址进行数据报的接收或发送。在显式地断开数据报套接字的连接或将其关闭之前,该套接字始终保持连接状态。当连接后,也可以使用read()和write()方法,就像在用FileChannel文件通道一样,只是在数据传送方面没有任何保证。
import java.io.IOException;import java.net.InetAddress;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.DatagramChannel;// 通过connect方法实现发送端并获取接收端发回的数据public class UDPConn {public static void main(String[] args) throws IOException {DatagramChannel chnl = DatagramChannel.open();chnl.connect(new InetSocketAddress(InetAddress.getLocalHost(), 8899));ByteBuffer buf = ByteBuffer.wrap("Hello World!".getBytes());while(buf.hasRemaining())chnl.write(buf);buf = ByteBuffer.allocate(1024);// 从此通道读取数据报传输给缓冲区chnl.read(buf);buf.flip();while (buf.hasRemaining()) {System.out.print((char) buf.get());}}} /* Output: // 先启动服务端才可以看到下面输出,否则会出现java.net.PortUnreachableExceptionOK!*///:~
import java.io.IOException;import java.net.DatagramSocket;import java.net.InetSocketAddress;import java.net.SocketAddress;import java.nio.ByteBuffer;import java.nio.channels.DatagramChannel;// 接收端public class UDPRece {public static void main(String[] args) throws IOException {DatagramChannel receChnl = DatagramChannel.open();DatagramSocket ds = receChnl.socket();ds.bind(new InetSocketAddress(8899));ByteBuffer buf = ByteBuffer.allocate(1024);// 返回数据报的源地址SocketAddress socket = receChnl.receive(buf);buf.flip();while (buf.hasRemaining()) {System.out.print((char) buf.get());}buf = ByteBuffer.wrap("OK!".getBytes());// 向源地址回送数据receChnl.send(buf, socket);}} /* Output:Hello World!*///:~
nio中的SocketChannel是一个连接到TCP网络套接字的通道。可以通过以下2种方式创建SocketChannel:
SocketChannel的静态open()方法打开一个客户端套接字通道SocketChannel;ServerSocketChannel时,通过accept()会返回连接到此服务端套接字通道的SocketChannel。当使用完SocketChannel之后,应该调用SocketChannel.close();来关闭SocketChannel。
调用一个read()的方法,从SocketChannel读取到的数据将会放到指定的缓冲区中,可以调用write()方法将缓冲区写入到通道。read()和write()方法的使用和之前讲过的一样,所以我不会进行讲解。查看API可以发现SocketChannel类也实现了分散和聚集的接口。
之前在对DatagramChannel进行讲诉时,曾经提到过可以调用通道的configureBlocking(false)来调整此通道的阻塞模式为非阻塞式。其中这个方法在抽象类AbstractSelectableChannel中,而继承并实现这个类还有SocketChannel、ServerSocketChannel和Pipe的读取写入管道(下一篇会进行介绍),因此也可以给TCP传输协议的通道调整阻塞模式。当设置SocketChannel为非阻塞模式时,原先的阻塞式方法connect()、read()和write()将变成非阻塞式的,也就是说:
connect(),该方法可能在连接建立之前就返回了。为了确定连接是否建立,可以调用finishConnect()的方法循环判断,该方法当且仅当已连接此通道的套接字时才返回true。write()方法在尚未写出任何内容时可能就返回了,所以需要在缓冲区的hasRemaining()循环中调用write()。read()方法在尚未读取到任何数据时可能就返回了,所以需要关注它的返回值,当该通道已到达流的末尾,则返回-1。
FileChannel不能切换到非阻塞模式,而套接字通道都可以切换。
非阻塞模式与Selector选择器搭配会工作的更好,通过将一或多个SocketChannel注册到选择器,可以询问选择器哪个通道已经准备好了读取,写入等。Selector与SocketChannel的搭配使用会在后面详讲。
查看API可以发现该类中的方法不多,只有四个,使用ServerSocketChannel的静态open()方法打开ServerSocketChannel,通过close()方法关闭ServerSocketChannel。accept()方法监听新进来的连接,当accept()方法返回的时候,它返回一个包含新进来的连接的SocketChannel,因此accept()方法会一直阻塞到有新连接到达。通过socket()获取与此通道关联的服务器套接字,我们必须对该服务器套接字调用bind()来绑定(侦听)SocketAddress特定地址。
当将ServerSocketChannel设置成非阻塞模式。在非阻塞模式下,accept()方法会立刻返回,如果还没有新进来的连接,返回的将是null。
异步I/O是一种没有阻塞地读写数据的方法。
正如前面提到过,非阻塞模式与Selector选择器搭配会工作的更好。异步I/O中的核心对象就是Selector选择器。我们可以在Selector注册各种感兴趣的I/O事件,而且当那些事件发生时,将由这个对象告诉我们所发生的事件,然后我们可以对这些事件进行处理。使用异步I/O,可以监听任何数量的通道上的事件,不用轮询,也不用额外的线程。
首先,我们可以通过Selector的静态open()方法创建一个选择器。
然后,对不同的通道对象调用register()方法来注册我们对这些对象中发生的I/O事件的兴趣。(该方法在抽象类AbstractSelectableChannel中,因此FileChannel不具有该方法)
查看API可以发现register()方法有一个重载的版本,先对那个简单的版本进行讲解,SelectionKey register(Selector sel, int ops)方法的第一个参数是这个Selector,第二个参数是int类型,这里代表我们想要监听事件,用SelectionKey选择键的四个常量来表示:
SelectionKey.OP_ACCEPT接收就绪表示服务器套接字通道准备好了接受新进入的连接,相当于ServerSocketChannel通道的专属键SelectionKey.OP_CONNECT连接就绪表示套接字通道成功连接到另一个服务器SelectionKey.OP_READ读取就绪表示有一个数据可读的通道SelectionKey.OP_WRITE写入就绪表示有一个等待写数据的通道如果对不止一种事件感兴趣,那么可以用位或|操作符将常量连接起来。注意register()返回一个SelectionKey选择键对象,该对象代表这个通道在这个选择器上的这个注册。当某个选择器通知某个传入事件时,通过提供对应该事件的SelectionKey选择键对象来进行。重载的register()方法可以额外的传入附加对象,可以通过SelectionKey选择键对象的attachment()获取该附加对象。甚至可以通过SelectionKey选择键对象的cancel()方法取消通道的注册。
当我们已经注册了一些感兴趣的I/O事件,我们可以通过选择器的select()方法返回读事件已经就绪的那些通道的数量。注意,这个方法会阻塞,直到至少有一个已注册的事件发生。
接下来可以调用选择器的selectedKeys()方法,该方法返回发生了事件的SelectionKey选择键对象的一个集合。
我们现在只需要循环遍历选择键集中的每个键,并检测各个键所对应的通道的就绪事件。通过选择键的channel()方法可以获得通道这个继承体系的基类,如果可以确定具体的通道,应该向下转型。当我们处理完后,必须调用迭代器的remove()方法来删除处理过的SelectionKey。它仍然会在调用select()方法产生的主集合中以一个激活的键出现,这会导致我们尝试再次处理它。
到现在为止,使用选择器的流程大致讲完了,在展示示例之前,仍然有些方法没有提到。
选择器的select()方法一共有三种:
int select()方法会阻塞到至少有一个通道在你注册的事件上就绪了int select(long timeout)最长会阻塞timeout 毫秒,如果为零,则等价于上面的方法int selectNow()执行非阻塞的选择操作。如果自从前一次选择操作后,没有通道变成可选择的,则此方法直接返回零要等待多久,这是三种选择方法之间的唯一本质差别。
某个线程调用select()方法后阻塞了,即使没有通道已经就绪,也有办法让其从select()方法返回。只要让其它线程在第一个线程调用select()方法的那个选择器对象上调用wakeup()方法即可。阻塞在select()方法上的线程会立马返回。如果有其它线程调用了wakeup()方法,但当前没有线程阻塞在select()方法上,下个调用select()方法的线程会立即“醒来”。
用完选择器后调用其close()方法会关闭该选择器,会使注册到该选择器上的所有选择键实例无效,通道本身并不会关闭。
选择键包含两个表示为整数值的操作集,操作集的每一位都表示该键的通道所支持的一类可选择操作。
interestOps(int)方法对其进行更改isAcceptable()方法:判断是否接收就绪isConnectable()方法:判断是否连接就绪isReadable()方法:判断释放读取就绪isWritable()方法:判断是否写入就绪用位与|操作interest 集合和给定的SelectionKey常量,来确定某个确定的事件是否在interest 集合中。对于ready 集合的四个判断方法,可以使用位与|操作来代替,例如isAcceptable()方法会等价与通过readyOps()获取此键的(ready 操作集合|SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT。
以下是一个监听8001、8002和8003端口的完整服务器端示例,当有客户端套接字连接时,请求并处理。
import java.io.IOException;import java.net.InetSocketAddress;import java.net.ServerSocket;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;public class SelectorDemo {public static void main(String[] args) throws IOException {// 创建一个选择器Selector selector = Selector.open();int[] ports = { 8001, 8002, 8003 };ByteBuffer buf = ByteBuffer.wrap("<font color='red'>Respond to Client..</font>".getBytes());for (int port : ports) {// 创建一个新的服务器套接字通道ServerSocketChannel ssc = ServerSocketChannel.open();// 设置此通道的阻塞模式为非阻塞的ssc.configureBlocking(false);// 获取与此通道关联的服务器套接字。ServerSocket ss = ssc.socket();// 将服务器套接字绑定到特定地址ss.bind(new InetSocketAddress(port));// 向选择器注册感兴趣的事情,当通道接收就绪时ssc.register(selector, SelectionKey.OP_ACCEPT);System.out.println("Going to listen on " + port);}while (true) {// 阻塞式方法,直到至少有一个已注册的事件发生。int num = selector.select();if (num == 0)continue;// 获取发生了事件的选择键对象的一个集合Set<SelectionKey> selectedKeys = selector.selectedKeys();System.out.println("size:" + selectedKeys.size());Iterator<SelectionKey> it = selectedKeys.iterator();while (it.hasNext()) {SelectionKey key = it.next();if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {// 返回创建此键的服务器套接字通道ServerSocketChannel ssc = (ServerSocketChannel) key.channel();// 接受到此通道服务器套接字的连接。SocketChannel sc = ssc.accept();// 设置客户端套接字通道为非阻塞模式sc.configureBlocking(false);// 将键添加到选择器的已选择键集,向给定的选择器注册此客户端套接字通道,当读取就绪时sc.register(selector, SelectionKey.OP_READ);// 删除处理过的选择键it.remove();System.out.println("Get connection from " + ssc.socket().getLocalPort());} else if (key.isReadable()) { // 当客户端套接字通道读取就绪// 获取客户端套接字通道SocketChannel sc = (SocketChannel) key.channel();// 保证将缓冲区的字节全部写入while (buf.hasRemaining())sc.write(buf);// 使缓冲区为重新读取已包含的数据做好准备buf.rewind();// 处理请求完成,关闭以释放资源sc.close();System.out.println("Respond to Client..");it.remove();}}}}} ///:OK~