前言
netty的学习有以下几个难点:
- netty基于nio进行了较为复杂的封装,而很多童鞋对nio都不是很了解。建议先查看 java NIO
- netty中应用了Reactor模式,而Reactor模式本身有多种线程模型可以实现。其实io 模型的多线程扩展方面,不论bio 还是nio 宏观上都类似,这一点可以与 《how tomcat works》笔记 对比着看,tomcat 在演化的过程中,也由单纯的操作socket 演进为提供 HttpConnector、HttpProcessor抽象。其背后都符合多线程扩展的基本思想:找到串行任务中的并行部分(逻辑上不相关/弱相关),将一个线程的事情拆分到多个线程中
为减少篇幅,本文涉及的所有代码忽略了次要部分及异常处理,所有代码可以在git@code.csdn.net:lqk654321/nio-demo.git
下载。
本文从最简单的代码开始,逐步拆解,演示下如何在nio编程中应用各种线程模型。
单线程模型
public class NIOServer {
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
handleKey(key);
}
}
}
public static void handleKey(SelectionKey key) throws IOException {
if (key.isAcceptable()) {
// Accept the new connection
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
// Add the new connection to the selector
sc.register(key.selector(), SelectionKey.OP_READ | SelectionKey.OP_WRITE);
System.out.println("accept...");
} else if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
// handle buffer
int count = sc.read(readBuffer);
if (count > 0) {
String receiveText = new String(readBuffer.array(), 0, count);
System.out.println("服务器端接受客户端数据--:" + receiveText);
}
}
}
}
代码中,while(true){监听并处理事件}
循环有个学名,叫eventloop。
while(ture){
// 阻塞
selectKeys = select();
// 非阻塞
handleKey(selectKeys);
}
在该示例中,所有工作放在一个线程中处理,很明显可靠性较低且性能不高。
- 从事件属性上讲,包括:accept事件、read/write事件。
- 从任务属性上讲,包括io任务(r/w data),read/write数据的处理(对data的业务处理)等
事件 | 任务(处理事件) | |
---|---|---|
accept | 新连接进来 | 将新连接的socket注册到selector |
read | 读缓冲区有数据 | 数据解码、进行业务处理 |
write | 写缓冲区有空闲 | 数据编码,写入socket send buffer |
对于server端,连接建立得到socket后,要为新建立的socket的注册selector(在我以前的理解中,忽视了这一点)。
多线程模型——io线程和业务线程分开
最容易想到的办法,当数据readable时,启动线程池,开启一个新的任务专门处理该数据,高大上点就是:io线程和业务线程分开,因此上节的handlerKey方法简化成了。
public static void handleKey(SelectionKey key) throws IOException {
if (key.isAcceptable()) {
// Accept the new connection
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
// Add the new connection to the selector
sc.register(key.selector(), SelectionKey.OP_READ | SelectionKey.OP_WRITE);
System.out.println("accept...");
} else if (key.isReadable()) {
executor.execute(new Reader(key));
}
}
对应的,红色部分交给线程池处理
事件 | 任务(处理事件) | |
---|---|---|
accept | 新连接进来 | 将新连接的socket注册到selector |
read | 读缓冲区有数据 | 数据解码、进行业务处理 |
write | 写缓冲区有空闲 | 数据编码,写入socket send buffer |
主从线程模型——io线程按事件类型分开
到目前为止,代码还没有完全并行化,因为acceptable事件和readable/writable事件的处理,也没什么关系。于是我们可以搞两个selector(笔者以前默认的意识里,一直以为selector只能有一个。当笔者看到可以两个selector监听不同事件的时候,一下子对netty豁然开朗了),一个负责监听acceptable事件,一个监听readable/writable事件的处理,分散在不同的线程中处理。
public class NIOServer {
private static ExecutorService boosExecutor = Executors.newFixedThreadPool(1);
private static ExecutorService workerExecutor = Executors.newFixedThreadPool(10);
private static Queue<SocketChannel> workerQueue = new ConcurrentLinkedQueue<SocketChannel>();
public static void main(String[] args) throws IOException {
/**
* boss只处理连接事件,worker只处理读写事件。
* 将两者分开的关键就是使用两个selector
*/
Selector bossSelector = Selector.open();
Selector workerSelector = Selector.open();
Boss boss = new Boss(bossSelector,workerQueue);
boss.bind();
boosExecutor.execute(boss);
workerExecutor.execute(new Worker(workerSelector,workerQueue));
}
}
boss线程实现
public class Boss implements Runnable {
public void bind() throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
}
public void run() {
while (true) {
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
handleKey(key);
}
}
}
private void handleKey(SelectionKey key) throws IOException {
if (key.isAcceptable()) {
// Accept the new connection
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
System.out.println("boss connect...");
// 向woker队列中发送建立连接的SocketChannel
workerQueue.add(sc);
System.out.println("boss queue size " + workerQueue.size());
}
}
}
worker线程实现
public class Worker implements Runnable {
public void run() {
while (true) {
process(workerQueue);
process(selector);
Thread.sleep(1000);
}
}
public void process(Queue<SocketChannel> queue) throws IOException{
// 如果队列为空,会返回null,不会阻塞
SocketChannel sc = workerQueue.poll();
if(null != sc){
System.out.println("worker accept ...");
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
}
public void process(Selector selector) throws IOException{
//此处必须设置超时,因为最开始worker的selector没有绑定SocketChannel,所以“selector.select()会阻塞,并且再也无法恢复”
selector.select(1000);
// 处理读写事件
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
handleKey(key);
}
}
}
对应的,蓝色部分由boss线程(group)负责,红色部分由worker线程(group)负责。
事件 | 任务(处理事件) | |
---|---|---|
accept | 新连接进来 | 将新连接的socket注册到selector |
read | 读缓冲区有数据 | 数据解码、进行业务处理 |
write | 写缓冲区有空闲 | 数据编码,写入socket send buffer |
io线程与selector 绑定
io 线程的分化是随着 selector 功能的分化进行的,每个io线程都会聚合一个selector
XXthread{
private Selector selector
public void run(){
while(true){
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
...
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
...
}
}
}
kafka 中 专门负责acceptable事件 线程称为 acceptor thread, 负责readable/writable事件的称为 processor thread。netty 中都称为 eventloop,但在使用时命名有所区分,前者称之为boss,后者称之为worker。netty的处理方式也体现了,“io线程与业务线程分化” 与 “io线程本身的分化” 有一点不同:后者虽然分化了,但不同线程整体的处理逻辑是一致的。就好比一些rpc 框架支持快慢线程池的功能,快慢线程池中不同线程的处理逻辑是一样的,只是为了优先保证重要业务的可靠性才做的区分。
java 多线程编程中,一般推荐将自己的逻辑封装为Runnable提交给线程池。但在很多框架的实现中,通常会将 thread 和 相关数据封装一下 对外提供系统的操作对象,这类对象通常不聚合thread 对象也没有什么问题。按照“程序=控制+逻辑”的公式,此时的线程只是一个“控制”方式,并不影响 封装对象对外提供的 逻辑接口,我们从 reactor 模式的概念图中也没找到 thread的影子不是。
io线程和业务线程的数据交互
在netty 中,io线程读取到数据后,可以直接操作线程池对象来执行 业务处理逻辑。kafka 则在io 线程与业务线程之间 提供了一个队列来缓冲数据,Processor thread 将读取的数据放入 RequestChannel(对队列的封装),Handler thread读取 并将处理后的响应通过 RequestChannel 传递给Processor。 Handler thread 属于kafka的 API层。
NIO的使用套路
在具体实践中,各个框架套路不一样
- hadoop,Thread-Per-Connection,与BIO的思路一样,只是用了NIO的API
- kafka客户端,一个线程 一个selector 裸调java NIO 进行网络通信
- netty和kafka 服务端都采用主从线程模型,但在io线程和业务线程的交互上有差异,如上文所述