前言
越长大越发现,人生其实没多少道理,小学课本上都学过:学而时习之,不亦乐乎。笔者通过对netty一段时间的学习和使用,从代码角度对netty有了一些新的体会。
2016-11-17补充:参见Redis与Reactor模式, 通过select/poll/epoll/kqueue这些I/O多路复用函数库,我们解决了一个线程处理多个连接的问题,我觉得这是nio与bio的根本区别。
netty更进一步,将accept时间与read/write事件分开处理。
几种io模型代码的直观感受
《Netty权威指南》开篇使用各种io模型实现了一个TimeServer和TimeClient
BIO的实现
public class TimeServer{
public static void main(String[] args){
ServerSocket serverSocket = null;
...
while(true){
Socket socket = serverSocket.accept();
new Thread(new TimeServerHandler()).start();
}
}
}
public class TimeServerHandler implements Runnable{
private Socket socket;
public void run(){
BufferedReader in = null;
PrintWriter out = null;
try{...}catch(Exception e){...}
}
}
NIO的实现
public class TimeServerHandler implements Runnable{
private selector selector;
private ServerSocketChannel servChannel
public void run(){...}
}
AIO的实现
public class TimeServerHandler implements Runnable{
AsynchronousServerSocketChannel asyncServerSocketChannel;
public void run(){
CountDownLatch latch = new CountDownLatch(1);
asyncServerSocketChannel.accept(this,new CompletionHandler(){
public void completed(AsynchronousSocketChannel channel,TimeServerHandler attachment){
channel opt...
}
});
latch.await();
}
}
网络数据读写,一方是用户线程,一方是内核处理,AIO、NIO和BIO,正体现了生产者和消费者两方线程的几种交互方式。从TimeServerHandler类成员的不同,就可以看到使用方式的差异。AIO和NIO都需要我们显式的提供线程去驱动数据的读写和处理,AIO由jdk底层的线层池负责回调,并驱动读写操作。
java原生NIO类库
java nio类库的三个基本组件bytebuffer,channel,selector, 它们是spi接口,java并不提供详细的实现(由jvm提供),java只是将这三个组件赤裸裸的提供给你,线程模型由我们自己决定采用,数据协议由我们自己制定并解析。
首先我们要了解java nio原生的类体系。以Channel interface为例,Channel,InterruptibleChannel,SelectableChannel等interface逐步扩展了Channel的特性。
public interface Channel extends Closeable {
public boolean isOpen();
public void close() throws IOException;
}
// 并没有新增方法,只是说明,实现这个接口的类,要支持Interruptible特性。
public interface InterruptibleChannel
extends Channel
public void close() throws IOException;
}
A channel that can be asynchronously closed and interrupted. A channel that implements this interface is asynchronously closeable: If a thread is blocked in an I/O operation on an interruptible channel then another thread may invoke the channel’s close method. This will cause the blocked thread to receive an AsynchronousCloseException.
这就解释了,好多类携带Interruptible的含义。
public abstract class SelectableChannel extends AbstractInterruptibleChannel implements Channel{ // SelectorProvider,Service-provider class for selectors and selectable channels. public abstract SelectorProvider provider();
public abstract int validOps();
public abstract boolean isRegistered();
public abstract SelectionKey register(Selector sel, int ops, Object att)
throws ClosedChannelException;
public final SelectionKey register(Selector sel, int ops)
throws ClosedChannelException{
return register(sel, ops, null);
}
public abstract SelectableChannel configureBlocking(boolean block)
throws IOException;
public abstract boolean isBlocking();
public abstract Object blockingLock();
}
In order to be used with a selector, an instance of this class must first be registered via the register method. This method returns a new SelectionKey object that represents the channel’s registration with the selector.
通过以上接口定义,我们可以知道,Channel接口定义的比较宽泛,理论上bio也可以实现Channel接口。所以,我们在分析selector和Channel的关系时,准确的说是分析selector与selectableChannel的关系:它们是相互引用的。selector和selectableChannel是多对多的关系,数据库中表示多对多关系,需要一个中间表。面向对象表示多对多关系则需要一个中间对象,SelectionKey。selector和selectableChannel都持有这个selectionkey集合。
netty做了什么工作
java nio类库的三个基本组件bytebuffer、channel、selector。java只是将这三个组件赤裸裸的提供给你,线程模型由我们自己决定采用,数据协议由我们自己制定并解析。
这里说点题外话:我们常用netty的nio特性,但netty不只用了nio,一些较高层的接口,bio也可以实现。以nio或selectable为前缀的子类,才是针对nio的扩展。(java.nio.channels也是类似的道理)
java.nio | netty | |
---|---|---|
bytebuf | netty的bytebuf提供的接口与nio的bytebuffer是一致的,只是功能的增强,bytebuf只有在编解码器中才会用到 | |
selector | 完全隐藏 | |
channel | 完全重写 | |
线程模型 | 固定好了 |
channel如何重写呢?AbstractChannel类特别能说明问题。AbstractChannel聚合了所有channel使用到的能力对象,由AbstractChannel提供初始化和统一封装,如果功能和子类强相关,则定义成抽象方法,由子类具体实现。
AbstractChannel{
Channel parent;
Unsafe unsafe;
// 读写操作全部转到pipeline上
DefaultChannelPipeline pipeline;
EventLoop eventloop;
// 保有这么多future,这是要干啥
SuccessedFuture,ClosedFuture,voidPromise,unsafeVoidPromise
localAddress,remoteAddress
}
为什么重写后的Channel会有这么多成员呢?这事儿得慢慢说。
how netty works
笔者曾经读过一本书《how tomcat works》,从第一个例子十几行代码开始讲述tomcat是如何写出来的,此处也用类似的风格描述下。
我们先从一个最简单的NIOServer代码示例开始,单线程模型:
public class NIOServer {
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 所有连接、所有事件阻塞在一处
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
handleKey(key);
}
}
}
public static void handleKey(SelectionKey key) throws IOException {
if (key.isAcceptable()) {
// Accept the new connection
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
// Add the new connection to the selector
sc.register(key.selector(), SelectionKey.OP_READ | SelectionKey.OP_WRITE);
System.out.println("accept...");
} else if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
// handle buffer
int count = sc.read(readBuffer);
if (count > 0) {
String receiveText = new String(readBuffer.array(), 0, count);
System.out.println("服务器端接受客户端数据--:" + receiveText);
}
}
}
}
以下忽略main方法的书写。
我们对上述代码进行简单的抽取,将while(it.hasNext()){..},handleKey(){...}
抽取到一个worker线程中。这样的线程有个学名,叫eventloop,于是
class NIOServer{
main(){
ServerSocketChannel ...
while(true){
selector.select(1000);
new Worker(SelectionKey).start();
}
}
}
当然了,大家都提倡将acceptable和read/write event分开,我们可以换个方式抽取原始代码:boss和worker线程都执行while(true){selector.select(1000);...}
,只不过boss专门处理acceptable事件,worker只处理r/w事件。
class NIOServer{
ServerSocketChannel ...
Selector selectror = ...
new Boss(selector).start();
new Worker(selector).start();
}
boss和worker共享一个selector虽然简单,但是扩展性太低,因此让boss和worker各用各的selector,boss thread accept得到的socketchannel通过queue传给worker,worker从queue中取下socketChannel”消费”(将socketChannel注册到selector上,interest读写事件)。简单实现如下:
class NIOServer{
Queue<SocketChannel> queue = ...
new Boss(queue).start();
new Worker(queue).start();
}
除了共享queue,传递新accept的socket channel另一种方法是,boss thread保有worker thread的引用,worker thread除了run方法,还提供registerSocketChannel等方法。这样,boos thread就可以通过worker.registerSocketChannel
把得到的SocketChannel注册到worker thread 的selector。
说句题外话,笔者以前分解的代码都是静态的,简单的说就是将一个类分解为多个类。本例中,代码分解涉及到了线程,线程对象不只有一个run方法,还可以具备registerChannel的功能。所以,在nio中,线程模型与nio通信代码的结合,不只是new Thread(runnable).start()去驱动代码执行,还深入到了代码的分解与抽象中。
然后再将Boss和worker线程池化,是不是功德圆满了呢?还没有.
nio类库提供给用户的三个基本操作类bytebuffer,channel,selector,虽然抽象程度低,但简单明了,直接提供read/write data的接口。以我们目前的抽象,netty程序的驱动来自boss和worker thread,问题来了?读取的数据怎么处理(尤其是复杂的处理),我们如何主动地写入数据呢?总得给用户一个入口对象。(任何框架,总得有一个入口对象供用户使用,比如fastjson,JSON对象就是其对应的入口对应。比如rabbitMQ,messageListner是其读入口对象,rabbitTemplate是其写入口对象)。
netty选择将channel作为写的入口对象,将channel从worker thread中提取出来,channel提出来之后,worker thread便需要提供自己(内部的selector)与channel交互的手段,比如register方法。
channel提出来之后,读写数据的具体逻辑代码也要跟着channel提取出来,这样worker thread中的代码可以更简洁。但本质上还是worker.handlekey
才知道什么时候读到了数据,什么时候可以写数据。因此,channel支持触发数据的读写,但读写数据的时机还是由work thread决定。我们要对channel作一定的封装。伪代码如下
ChannelFacade{
channel // 实际的channel
writeBuffer // 发送缓冲区
handleReadData(Buffer){} // 如何处理读到的数据,由worker thread触发
write() // 对外提供的写数据接口
doWrite() // 实际写数据,由workerThread触发
workerThread // 对应的Channel
}
class NIOServer{
ServerSocektChannel srvSocketChannel = ...
new Boss(srvSocketChannel){};
new Worker().start();
}
class Boss extends Thread{
public void run(){
SocketChannel socketChannel = srvSocketChannel.accept();
ChannelFacade cf = facade(socketChannel);
worker.register(cf); //如果cf保有workerThread引用的话,也可以
cf.register();
}
}
将channel与其对应的reactor线程剥离之后,一个重要的问题是:如何确保channel.read/write是线程安全的。一段代码总在一个线程下执行,那么这段代码就是线程安全的,每个channel(或channel对应的channelhandler,ChannelhandlerContext)持有其约定reactor线程的引用,每次执行时判断下:如果在绑定的reactor线程,则直接执行,如果不在约定线程,则向约定线程提交本任务。
channelhandler一门心思处理业务数据,channelhandlercontenxt触发事件函数的调用,并保证其在绑定的reactor线程下执行
这样,我们就以《how tomcat works》的方式,猜想了netty的主要实现思路,当然,netty的实现远较这个复杂。但复杂在提高健壮性、丰富特性上,主要的思路应该是这样的。
读写事件的处理
我们提到,将channel从work thread抽取出来后,channel和 work thread的交互方式。
- read由work thread驱动,work thread 通过select.select()得到selectkey中拿到channel和niosocketchannel(保存在attachment中),就可以调用netty socketchannel的读方法。
- write 由netty socketchannel直接驱动,但问题是,socketchannel作为入口对象,
socketchanel.write
可能在多个线程中被调用,多个线程同时执行channel.write
,同样都是目的缓冲区,你写点,我写点,数据就乱套了。重复一下 解决方法就是,为每个channel绑定一个work thread(一个work thread可以处理多个channel,一个channel却只能被同一个work thread处理)即netty socketchannel持有了work thread引用,执行chanel.write时先判断现在是不是在自己绑定的work thread,是,则直接执行;如果不是,则向work thread提交一个任务,work thread在合适的时机处理(work thread有一个任务队列)。
read的处理过程:worker thread触发unsafe.read ==> pipeline.fireChannelRead ==> head(channelhandlercontext).fireChannelRead
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
write分为两条线:
- worker thread在可写的时候,调用
unsafe.forceFlush() == AbstractUnsafe.flush0() ==> doWrite(outboundBuffer)
,将写缓冲区数据发出。 -
用户ctx.write的时候,一直运行到
headContext.write ==> unsafe.write()
,将数据加入到写缓冲区中。AbstractUnsafe{ ChannelOutboundBuffer outboundBuffer // 写缓冲区 write(msg) 将数据加入到outboundBuffer中 dowrite() // 实际的发送数据 } if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); }
DefaultChannlePipeline有一个HeadContext和TailContext,是默认的pipeline的头和尾,outbound事件会从tail outbound context开始,一直到headcontenxt。
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
pipeline
filter能够以声明的方式插入到http请求响应的处理过程中。
inbound事件通常由io线程触发,outbound事件通常由用户主动发起。
ChannelPipeline的代码相对比较简单,内部维护了一个ChannelHandler的容器和迭代器(pipeline模式都是如此),可以方便的进行ChannelHandler的增删改查。
- ChannelPipeline
- DefaultChannelPipeline
- ChannelHandler
- ChannelHandlerContext,**Enables a ChannelHandler to interact with its ChannelPipeline and other handlers. ** A handler can notify the next ChannelHandler in the ChannelPipeline,modify the ChannelPipeline it belongs to dynamically.
几个类之间的关系
channelpipeline保有channelhandler的容器,这在java里实现办法可就多了
- channelpipeline直接保有一个list(底层实现可以是array或者list)
- 链表实现,Channelpipeline只保有一个header引用(想支持特性更多的话,就得加tail)。只不过这样有一个问题,handler本身要保有一个next引用。如果既想这么做,又想让handler干净点,那就得加一个channelhandlercontext类,替handler保有next引用。
代码如下
channelpipeline{
channelhandlercontext header;
}
channelhandlercontext{
channelhandler handler;
channelhandlercontext next;
EventExecutor executor;
@Override
public ChannelHandlerContext fireChannelActive() {
final AbstractChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelActive();
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeChannelActive();
}
});
}
return this;
}
private void invokeChannelActive() {
try {
((ChannelInboundHandler) handler()).channelActive(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
}
从这就可以看到,Channelhandlercontext不只是替Channelhandler保有下next指针,将pipeline的fireChannelxxx 转化为channelhandler的channelxxx方法。
小结
回过头来再看,java nio类库的三个基本组件bytebuffer、channel、selector,数据的读写就是这三个组件的相互作用,线程模型的选择留给用户。netty则是使用eventloop隐藏了selector(将selector和线程绑在一起),使用pipeline封装了数据的处理,在它们复杂关系的背后,它们的起点,或许还是那个最简单的NIOServer程序。
2018.7.1 参考 异步编程
把书读薄
- 把死记硬背的细节转化为逻辑上可以理解的东西
- 很多知识都可以共用,比如此处的链式模式,如果以前知道链式模式的实现,那么理解这块就会很简单。