重新认识NIO

NIO

non-blocking io

1. 三大组件

1.1 Channel & Buffer

channel 有一点类似于 stream,它就是读写数据的双向通道,可以从 channel 中将数据读入 buffer,也可以将 buffer 的数据写入到 channel 中,而之前的 stream 要么是输入要么是输出,stream 是单向通道,channel 相比较于 stream 更为底层。
image.png
常见的Channel有:

  • FileChannel
  • DatagramChannel
  • SocketChannel
  • ServerSocketChannel

buffer 用来缓冲读写数据,常见的 buffer 有

  • ByteBuffer
    • MappedByteBuffer
    • DirectByteBuffer
    • HeapByteBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer
  • CharBuffer

1.2 Selector

Selector 的作用是什么?
选择器提供选择执行已经就绪的任务的能力。从底层来看,Selector 提供了询问通道是否已经准备好执行每个 I/O 操作的能力。Selector 允许单线程处理多个 Channel。仅用单个线程来处理多个 Channels 的好处是,只需要更少的线程来处理通道。事实上,可以只用一个线程处理所有的通道,这样会大量的减少线程之间上下文切换的开销。

在使用 Selector 之前,处理 socket 连接还有以下两种方法

1.2.1 使用多线程技术

为每个连接分别开辟一个线程,分别去处理对应的 socket 连接

这种方法存在以下几个问题

  • 内存占用高
    • 每个线程都需要占用一定的内存,当连接较多时,会开辟大量线程,导致占用大量内存
  • 线程上下文切换成本高
  • 只适合连接数少的场景
    • 连接数过多,会导致创建很多线程,从而出现问题

1.2.2 使用线程池技术

使用线程池,让线程池中的线程去处理连接

这种方法存在以下几个问题

  • 阻塞模式下,线程仅能处理一个连接
    • 线程池中的线程获取任务(task)后,只有当其执行完任务之后(断开连接后),才会去获取并执行下一个任务
    • 若 socket 连接一直未断开,则其对应的线程无法处理其他 socket 连接
  • 仅适合短连接场景
    • 短连接即建立连接发送请求并响应后就立即断开,使得线程池中的线程可以快速处理其他连接

1.2.3 使用选择器

selector 的作用就是配合一个线程来管理多个 channel(fileChannel 因为是阻塞式的,所以无法使用 selector),获取这些 channel 上发生的事件,这些 channel 工作在非阻塞模式下,当一个 channel 中没有执行任务时,可以去执行其他 channel 中的任务。适合连接数多,但流量较少的场景**

若事件未就绪,调用 selector 的 select() 方法会阻塞线程,直到 channel 发生了就绪事件。这些事件就绪后,select 方法就会返回这些事件交给 thread 来处理。

1.2.4 可选择通道(SelectableChannel)

并不是所有的 Channel,都是可以被 Selector 复用的。比方说,FileChannel 就不能被选择器复用。为什么呢?
判断一个 Channel 能被 Selector 复用,有一个前提:判断他是否继承了一个抽象类 SelectableChannel。如果继承了 SelectableChannel,则可以被复用,否则不能。
SelectableChannel 类提供了实现通道的可选择性所需要的公共方法。它是所有支持就绪检查的通道类的父类。所有 socket 通道,都继承了 SelectableChannel 类都是可选择的,包括从管道(Pipe)对象的中获得的通道。而 FileChannel 类,没有继承 SelectableChannel,因此是不是可选通道。
通道和选择器注册之后,他们是绑定的关系吗?
答案是不是。不是一对一的关系。一个通道可以被注册到多个选择器上,但对每个选择器而言只能被注册一次。
通道和选择器之间的关系,使用注册的方式完成。SelectableChannel 可以被注册到 Selector 对象上,在注册的时候,需要指定通道的哪些操作,是 Selector 感兴趣的。

2. ByteBuffer

缓冲区(Buffer)就是在内存中预留指定大小的存储空间用来对输入/输出(I/O)的数据作临时存储,这部分预留的内存空间就叫做缓冲区:
使用缓冲区有这么两个好处:

  • 减少实际的物理读写次数
  • 缓冲区在创建时就被分配内存,这块内存区域一直被重用,可以减少动态分配和回收内存的次数

ByteBuffer 有几个重要属性

  • capacity
  • position
  • limit

2.1 ByteBuffer 使用

初始化
image.png
写模式,position 是写入位置,limit 等于容量,下图表示写入 4 个字节后的状态
image.png

flip 方法调用后,position 切换为读取位置,limit 切换为读取限制
image.png
读取 4 个字节后,状态如下
image.png

clear 方法调用后,状态如下
image.png
compact 方法,是把未读完的部分向前压缩,然后切换至写模式
image.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Test
void testBuffer() {
ByteBuffer buffer = ByteBuffer.allocate(6);
buffer.put((byte) 1);
buffer.put((byte) 2);
buffer.put((byte) 3);
buffer.put((byte) 4);
buffer.put((byte) 5);
buffer.put((byte) 6); // 初始化一个写满的buffer

buffer.flip();
// position: 0, limit: 6, capacity: 6 -- 切换为读取模式

buffer.get();
buffer.get();
// position: 2, limit: 6, capacity: 6 -- 读取两个字节后,还剩余四个字节

buffer.compact();
// position: 4, limit: 6, capacity: 6 -- 进行压缩之后将从第五个字节开始
}

2.2 ByteBuffer 粘包半包分析

我觉得这篇文章很好的从 TCP 的层面说清楚为什么 TCP 协议有粘包问题?

TCP 协议粘包问题是因为应用层协议开发者的错误设计导致的,他们忽略了 TCP 协议数据传输的核心机制 — 基于字节流,其本身不包含消息、数据包等概念,所有数据的传输都是流式的,需要应用层协议自己设计消息的边界,即消息帧(Message Framing),我们重新回顾一下粘包问题出现的核心原因:

TCP 协议是基于字节流的传输层协议,其中不存在消息和数据包的概念;
应用层协议没有使用基于长度或者基于终结符的消息边界,导致多个消息的粘连;
TCP 协议是面向连接的、可靠的、基于字节流的传输层通信协议 TCP 协议是面向连接的、可靠的、基于字节流的传输层通信协议,应用层交给 TCP 协议的数据并不会以消息为单位向目的主机传输,这些数据在某些情况下会被组合成一个数据段发送给目标的主机。

Negal 算法的优化,当应用发送数据包太小,TCP 为了减少网络请求次数的开销,它会等待多个消息包一起,打成一个 TCP 数据包一次发送出去。这也就是所谓的粘包image.png

因为 TCP 本身传输的数据包大小就有限制,所以应用发出的消息包过大,TCP 会把应用消息包拆分为多个 TCP 数据包发送出去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package cc.voox.nio.buffer;

import org.junit.Test;

import java.nio.ByteBuffer;

public class BufferTest {
@Test
public void test1() {
ByteBuffer byteBuffer = ByteBuffer.allocate(32);
byteBuffer.put("hi!\n i'm TJ\nHo".getBytes());
split(byteBuffer);
byteBuffer.put("w are you?\n".getBytes());
split(byteBuffer);
}

private static void split(ByteBuffer byteBuffer) {
byteBuffer.flip();
for (int i = 0; i < byteBuffer.limit(); i++) {

if ('\n' == byteBuffer.get(i)) {
int len = i + 1 - byteBuffer.position();
ByteBuffer target = ByteBuffer.allocate(len);
for (int j = 0; j < len; j++) {
target.put(byteBuffer.get());
}
System.out.print(new String(target.array()));
}

}
byteBuffer.compact();
}
}

3.Selector

Seletor 中 selectedKeys 方法返回了当前要被处理 channel 的事件集合,当遍历时我们需要及时地从集合中手动删除,否则下一次循环会影响到,可能出现空指针等错误[1]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package cc.voox.nio;


import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Iterator;

@Slf4j
public class Server {

public static void main(String[] args) throws Exception {

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(8080));
serverSocketChannel.configureBlocking(false);
Selector selector = Selector.open();
SelectionKey selectionKey = serverSocketChannel.register(selector, 0, null);
selectionKey.interestOps(SelectionKey.OP_ACCEPT);

while (true) {
selector.select();
log.info("key: {}", selectionKey);

Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = channel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ, null);
log.info("{}", socketChannel);
log.info("scKey: {}", selectionKey);

} else if (key.isReadable()) {
try {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(4);
int read = channel.read(buffer);
if (read == -1) {
log.info("no info...");
selectionKey.cancel();
} else {
buffer.flip();
log.info("chl: {}", channel);
System.out.println(Charset.defaultCharset().decode(buffer));
}
} catch (Exception eO) {
eO.printStackTrace();
selectionKey.cancel();
}

}
}
}
}
}

参考


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!