[关闭]
@lemonguge 2015-07-03T04:29:05.000000Z 字数 10008 阅读 370

Java nio(三)

NIO


分散和聚集

nio开始支持scatter/gather,专业术语称之为“分散/聚集”,用于描述从通道中读取或者写入到通道的操作。

简单的说,一个分散的读取就像一个常规通道读取,只不过它是将数据读到一个缓冲区数组中而不是读到单个缓冲区中。同样地,一个聚集写入是向缓冲区数组而不是向单个缓冲区写入数据。

分散和聚集经常用于需要将传输的数据分开处理的场合,例如传输一个由消息头和消息体组成的消息,你可能会将消息体和消息头分散到不同的buffer中,这样你可以方便的处理消息头和消息体。

通道可以有选择地实现两个新的接口:ScatteringByteChannelGatheringByteChannelFileChannel文件通道就实现了这两个接口,下面将对这两个接口进行介绍。

分散读取

分散读取的read方法很类似标准的raad方法,只不过它们不是取单个缓冲区而是取一个缓冲区数组。通道依次填充每个缓冲区,填满一个缓冲区后,它就开始填充下一个。在某种意义上,缓冲区数组就像一个大缓冲区。这也意味着它不适用于动态消息(消息大小不固定)。换句话说,如果存在消息头和消息体,每次消息头大小都应该固定(例如 128byte),分散读取才能按照我们期望的工作。

分散读取

聚集写入

聚集写入类似于分散读取,只不过是用来写入,把一组单独的缓冲区中组成单个数据流。可以使用聚集写入来自动将网络消息的各个部分组装为单个数据流,以便跨越网络传输消息。聚集写入能较好的处理动态消息。

聚集写入


UPD传输协议的通道

正如上一篇所介绍的“DatagramChannel通过UDP读写网络中的数据”,因为UDP是无连接的网络协议,所以不能像其它通道那样读取和写入,它发送和接收的是数据包。由于该类也实现了分散和聚集的接口,因此也会具有上面所讲诉的方法。

接收端

首先我们来明确一下接收端的设计思路:

  1. 使用DatagramChannel类的静态open()打开一个数据报通道;
  2. 通过socket()方法获取与此通道关联的数据报套接字;
  3. 通过接收端套接字的bind()绑定到特定的地址和端口;
  4. 通过receive()方法从DatagramChannel接收数据。

注意receive()方法,该方法将数据报复制到给定的字节缓冲区中并返回数据报的源地址(如同read操作一样)。如果缓冲区中的剩余字节空间小于保存数据报所需的空间,则丢弃余下的数据报。

  1. import java.io.IOException;
  2. import java.net.DatagramSocket;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.DatagramChannel;
  6. public class UDPRece {
  7. public static void main(String[] args) throws IOException {
  8. DatagramChannel receChnl = DatagramChannel.open();
  9. DatagramSocket ds = receChnl.socket();
  10. ds.bind(new InetSocketAddress(8001));
  11. ByteBuffer buf = ByteBuffer.allocate(1024);
  12. receChnl.receive(buf);
  13. buf.flip();
  14. while (buf.hasRemaining()) { // 判断当前位置和限制之间是否有元素
  15. System.out.print((char) buf.get());
  16. }
  17. }
  18. } /* Output: // 启动接收端后,再启动下面的发送端,可以看到下面输出
  19. abc123
  20. *///:~

receive()方法默认为阻塞式方法,我们可以调用通道的configureBlocking(false)来调整此通道的阻塞模式为非阻塞式。

发送端

同样来明确一下发送端的设计思路:

  1. 使用DatagramChannel类的静态open()打开一个数据报通道;
  2. 通过send()方法从DatagramChannel发送数据。

注意send()方法,该方法接收两个参数,分别为:要发送的数据报的缓冲区和将数据报发送到的地址,将返回发送的字节数。因为UDP在数据传送方面没有任何保证,所以不会通知你发出的数据包是否已收到。

  1. import java.io.IOException;
  2. import java.net.InetAddress;
  3. import java.net.InetSocketAddress;
  4. import java.net.SocketAddress;
  5. import java.nio.ByteBuffer;
  6. import java.nio.channels.DatagramChannel;
  7. public class UDPSend {
  8. public static void main(String[] args) throws IOException {
  9. // 打开数据报通道
  10. DatagramChannel sendChnl = DatagramChannel.open();
  11. // 要发送数据报的缓冲区
  12. ByteBuffer buf = ByteBuffer.wrap("abc123".getBytes());
  13. // 将数据报发送到的目的地址
  14. SocketAddress target = new InetSocketAddress(InetAddress.getLocalHost(), 8001);
  15. int byteSend = sendChnl.send(buf, target);
  16. System.out.println(byteSend);
  17. }
  18. } /* Output:
  19. 6
  20. *///:~

connect方法

这个方法比较特殊,查看API可以发现该方法的意思是“连接此通道的套接字”,将DatagramChannel“连接”到网络中的特定地址的。由于UDP是无连接的,连接到特定地址并不会像TCP通道那样创建一个真正的连接。而是锁住DatagramChannel,让其只能从特定地址收发数据。

注意,一旦连接后,就无法和任何其他地址进行数据报的接收或发送。在显式地断开数据报套接字的连接或将其关闭之前,该套接字始终保持连接状态。当连接后,也可以使用read()write()方法,就像在用FileChannel文件通道一样,只是在数据传送方面没有任何保证。

  1. import java.io.IOException;
  2. import java.net.InetAddress;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.DatagramChannel;
  6. // 通过connect方法实现发送端并获取接收端发回的数据
  7. public class UDPConn {
  8. public static void main(String[] args) throws IOException {
  9. DatagramChannel chnl = DatagramChannel.open();
  10. chnl.connect(new InetSocketAddress(InetAddress.getLocalHost(), 8899));
  11. ByteBuffer buf = ByteBuffer.wrap("Hello World!".getBytes());
  12. while(buf.hasRemaining())
  13. chnl.write(buf);
  14. buf = ByteBuffer.allocate(1024);
  15. // 从此通道读取数据报传输给缓冲区
  16. chnl.read(buf);
  17. buf.flip();
  18. while (buf.hasRemaining()) {
  19. System.out.print((char) buf.get());
  20. }
  21. }
  22. } /* Output: // 先启动服务端才可以看到下面输出,否则会出现java.net.PortUnreachableException
  23. OK!
  24. *///:~
  1. import java.io.IOException;
  2. import java.net.DatagramSocket;
  3. import java.net.InetSocketAddress;
  4. import java.net.SocketAddress;
  5. import java.nio.ByteBuffer;
  6. import java.nio.channels.DatagramChannel;
  7. // 接收端
  8. public class UDPRece {
  9. public static void main(String[] args) throws IOException {
  10. DatagramChannel receChnl = DatagramChannel.open();
  11. DatagramSocket ds = receChnl.socket();
  12. ds.bind(new InetSocketAddress(8899));
  13. ByteBuffer buf = ByteBuffer.allocate(1024);
  14. // 返回数据报的源地址
  15. SocketAddress socket = receChnl.receive(buf);
  16. buf.flip();
  17. while (buf.hasRemaining()) {
  18. System.out.print((char) buf.get());
  19. }
  20. buf = ByteBuffer.wrap("OK!".getBytes());
  21. // 向源地址回送数据
  22. receChnl.send(buf, socket);
  23. }
  24. } /* Output:
  25. Hello World!
  26. *///:~

TCP传输协议的通道

SocketChannel

nio中的SocketChannel是一个连接到TCP网络套接字的通道。可以通过以下2种方式创建SocketChannel

  1. 使用SocketChannel的静态open()方法打开一个客户端套接字通道SocketChannel
  2. 当一个新连接到达ServerSocketChannel时,通过accept()会返回连接到此服务端套接字通道的SocketChannel

当使用完SocketChannel之后,应该调用SocketChannel.close();来关闭SocketChannel

调用一个read()的方法,从SocketChannel读取到的数据将会放到指定的缓冲区中,可以调用write()方法将缓冲区写入到通道。read()write()方法的使用和之前讲过的一样,所以我不会进行讲解。查看API可以发现SocketChannel类也实现了分散和聚集的接口。

之前在对DatagramChannel进行讲诉时,曾经提到过可以调用通道的configureBlocking(false)来调整此通道的阻塞模式为非阻塞式。其中这个方法在抽象类AbstractSelectableChannel中,而继承并实现这个类还有SocketChannelServerSocketChannelPipe的读取写入管道(下一篇会进行介绍),因此也可以给TCP传输协议的通道调整阻塞模式。当设置SocketChannel为非阻塞模式时,原先的阻塞式方法connect()read()write()将变成非阻塞式的,也就是说:

FileChannel不能切换到非阻塞模式,而套接字通道都可以切换。

非阻塞模式与Selector选择器搭配会工作的更好,通过将一或多个SocketChannel注册到选择器,可以询问选择器哪个通道已经准备好了读取,写入等。SelectorSocketChannel的搭配使用会在后面详讲。

ServerSocketChannel

查看API可以发现该类中的方法不多,只有四个,使用ServerSocketChannel的静态open()方法打开ServerSocketChannel,通过close()方法关闭ServerSocketChannelaccept()方法监听新进来的连接,当accept()方法返回的时候,它返回一个包含新进来的连接的SocketChannel,因此accept()方法会一直阻塞到有新连接到达。通过socket()获取与此通道关联的服务器套接字,我们必须对该服务器套接字调用bind()来绑定(侦听)SocketAddress特定地址。

当将ServerSocketChannel设置成非阻塞模式。在非阻塞模式下,accept()方法会立刻返回,如果还没有新进来的连接,返回的将是null


异步I/O

异步I/O是一种没有阻塞地读写数据的方法。

正如前面提到过,非阻塞模式与Selector选择器搭配会工作的更好。异步I/O中的核心对象就是Selector选择器。我们可以在Selector注册各种感兴趣的I/O事件,而且当那些事件发生时,将由这个对象告诉我们所发生的事件,然后我们可以对这些事件进行处理。使用异步I/O,可以监听任何数量的通道上的事件,不用轮询,也不用额外的线程。

选择器与非阻塞模式的搭配

首先,我们可以通过Selector的静态open()方法创建一个选择器。

然后,对不同的通道对象调用register()方法来注册我们对这些对象中发生的I/O事件的兴趣。(该方法在抽象类AbstractSelectableChannel中,因此FileChannel不具有该方法)

查看API可以发现register()方法有一个重载的版本,先对那个简单的版本进行讲解,SelectionKey register(Selector sel, int ops)方法的第一个参数是这个Selector,第二个参数是int类型,这里代表我们想要监听事件,用SelectionKey选择键的四个常量来表示:

如果对不止一种事件感兴趣,那么可以用位或|操作符将常量连接起来。注意register()返回一个SelectionKey选择键对象,该对象代表这个通道在这个选择器上的这个注册。当某个选择器通知某个传入事件时,通过提供对应该事件的SelectionKey选择键对象来进行。重载的register()方法可以额外的传入附加对象,可以通过SelectionKey选择键对象的attachment()获取该附加对象。甚至可以通过SelectionKey选择键对象的cancel()方法取消通道的注册。

当我们已经注册了一些感兴趣的I/O事件,我们可以通过选择器的select()方法返回读事件已经就绪的那些通道的数量。注意,这个方法会阻塞,直到至少有一个已注册的事件发生。

接下来可以调用选择器的selectedKeys()方法,该方法返回发生了事件的SelectionKey选择键对象的一个集合。

我们现在只需要循环遍历选择键集中的每个键,并检测各个键所对应的通道的就绪事件。通过选择键的channel()方法可以获得通道这个继承体系的基类,如果可以确定具体的通道,应该向下转型。当我们处理完后,必须调用迭代器的remove()方法来删除处理过的SelectionKey。它仍然会在调用select()方法产生的主集合中以一个激活的键出现,这会导致我们尝试再次处理它。

到现在为止,使用选择器的流程大致讲完了,在展示示例之前,仍然有些方法没有提到。

选择器的其他方法

选择器的select()方法一共有三种:

要等待多久,这是三种选择方法之间的唯一本质差别。

某个线程调用select()方法后阻塞了,即使没有通道已经就绪,也有办法让其从select()方法返回。只要让其它线程在第一个线程调用select()方法的那个选择器对象上调用wakeup()方法即可。阻塞在select()方法上的线程会立马返回。如果有其它线程调用了wakeup()方法,但当前没有线程阻塞在select()方法上,下个调用select()方法的线程会立即“醒来”。

用完选择器后调用其close()方法会关闭该选择器,会使注册到该选择器上的所有选择键实例无效,通道本身并不会关闭。

选择键

选择键包含两个表示为整数值的操作集,操作集的每一位都表示该键的通道所支持的一类可选择操作。

用位与|操作interest 集合和给定的SelectionKey常量,来确定某个确定的事件是否在interest 集合中。对于ready 集合的四个判断方法,可以使用位与|操作来代替,例如isAcceptable()方法会等价与通过readyOps()获取此键的(ready 操作集合|SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT

以下是一个监听8001、8002和8003端口的完整服务器端示例,当有客户端套接字连接时,请求并处理。

  1. import java.io.IOException;
  2. import java.net.InetSocketAddress;
  3. import java.net.ServerSocket;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.SelectionKey;
  6. import java.nio.channels.Selector;
  7. import java.nio.channels.ServerSocketChannel;
  8. import java.nio.channels.SocketChannel;
  9. import java.util.Iterator;
  10. import java.util.Set;
  11. public class SelectorDemo {
  12. public static void main(String[] args) throws IOException {
  13. // 创建一个选择器
  14. Selector selector = Selector.open();
  15. int[] ports = { 8001, 8002, 8003 };
  16. ByteBuffer buf = ByteBuffer.wrap("<font color='red'>Respond to Client..</font>".getBytes());
  17. for (int port : ports) {
  18. // 创建一个新的服务器套接字通道
  19. ServerSocketChannel ssc = ServerSocketChannel.open();
  20. // 设置此通道的阻塞模式为非阻塞的
  21. ssc.configureBlocking(false);
  22. // 获取与此通道关联的服务器套接字。
  23. ServerSocket ss = ssc.socket();
  24. // 将服务器套接字绑定到特定地址
  25. ss.bind(new InetSocketAddress(port));
  26. // 向选择器注册感兴趣的事情,当通道接收就绪时
  27. ssc.register(selector, SelectionKey.OP_ACCEPT);
  28. System.out.println("Going to listen on " + port);
  29. }
  30. while (true) {
  31. // 阻塞式方法,直到至少有一个已注册的事件发生。
  32. int num = selector.select();
  33. if (num == 0)
  34. continue;
  35. // 获取发生了事件的选择键对象的一个集合
  36. Set<SelectionKey> selectedKeys = selector.selectedKeys();
  37. System.out.println("size:" + selectedKeys.size());
  38. Iterator<SelectionKey> it = selectedKeys.iterator();
  39. while (it.hasNext()) {
  40. SelectionKey key = it.next();
  41. if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
  42. // 返回创建此键的服务器套接字通道
  43. ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
  44. // 接受到此通道服务器套接字的连接。
  45. SocketChannel sc = ssc.accept();
  46. // 设置客户端套接字通道为非阻塞模式
  47. sc.configureBlocking(false);
  48. // 将键添加到选择器的已选择键集,向给定的选择器注册此客户端套接字通道,当读取就绪时
  49. sc.register(selector, SelectionKey.OP_READ);
  50. // 删除处理过的选择键
  51. it.remove();
  52. System.out.println("Get connection from " + ssc.socket().getLocalPort());
  53. } else if (key.isReadable()) { // 当客户端套接字通道读取就绪
  54. // 获取客户端套接字通道
  55. SocketChannel sc = (SocketChannel) key.channel();
  56. // 保证将缓冲区的字节全部写入
  57. while (buf.hasRemaining())
  58. sc.write(buf);
  59. // 使缓冲区为重新读取已包含的数据做好准备
  60. buf.rewind();
  61. // 处理请求完成,关闭以释放资源
  62. sc.close();
  63. System.out.println("Respond to Client..");
  64. it.remove();
  65. }
  66. }
  67. }
  68. }
  69. } ///:OK~
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注