前言
《netty权威指南》中在谈到netty逻辑架构时,将netty分为三个层面,上层为下层提供运行基础。
- Reactor通信调度层
- 职责链ChannelPipeline
-
业务逻辑编排层ChannelHanlder,主要分为两类
- 纯粹的业务逻辑编排
- 应用层协议插件,用于特性协议相关的会话和链路管理
我们习惯了使用框架,有工具性框架,比如apache的common-lang。有应用型框架,自带执行逻辑(也就是包含线程),只需start和close即可,用户通过暴露的接口实现自定义逻辑。
- 暴露接口我们习惯了,比如rabbitmq,接收消息时实现一个MessageListener即可。
- 暴露多个接口我们也习惯了,比如j2ee的filter、listener和servlet
- 暴露接口、只暴露一部分,暴露的接口以pipeline的形式组织,需要我们自己介入线程的启动与关闭,换个马甲就有点认不得了。不得不说,现在spring的应用,掩盖了许多程序的本质。
破除对netty的一些“想当然”
笔者在接到任务(用到netty)时,基本没进行过java网络开发(大学课本上的bio例子算么?),越过nio直接学习netty,就好比一个web开发人员越过servlet直接上springmvc,netty封装的太好了,是一种很好的nio开发模式,但切记,它不是唯一的nio开发模式,netty不是nio的全部。
基于nio进行java网络编程,比较关键的有以下两个问题,以下以服务端tcp开发为例:
-
SocketChannel和Selector,一对一,一对多,多对一?
hadoop因为对大文件分块处理和传输,也会涉及到网络io操作,在其
org.apache.hadoop.net
包中,一个selector只负责一个channel,确切的说只负责该channel的一个操作,“主角”还是SocketChannel。在netty中,一个selector负责多个服务端socketchannel
-
根据Selector和SocketChannel关系的不同(一对一,一对多,亦或是监听事件类型的不同),线程模型不同。但不管线程模型如何,不能两个以上线程同时调用
socket.write|read|connect
,(一个socket对应内核一个struct sock,每个sock有个自己的send 和 receive sk_buff queue)。两个线程操作同一个socket交替往tcp发送缓冲区中写数据,两个相邻的sk_buff 可能就不是来自同一个上层数据了。因此,线程与socket的关系,可以一对一,一对多。但绝不能多对一和多对多。
demo代码再分析
public class TimeClient {
public static void main(String[] args) throws InterruptedException {
String host = "127.0.0.1";
int port = 8080;
new TimeClient().connect(host, port);
}
public void connect(String host,int port) throws InterruptedException{
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChildChannelHandler());
ChannelFuture f = b.connect(host, port).sync();
// 此处,你其实可以直接使用f.writeAndFlush发送数据
// 等待关闭
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
protected void initChannel(SocketChannel arg0) throws Exception {
arg0.pipeline().addLast(new TimeClientHandler());
}
}
}
这个netty demo,对netty的使用造成了一些误区(如果读者不深入学习的话)。误区的本质就是,netty的入口对象是什么?我们知道,java.nio类库提供了bytebufffer,channel,selector,虽然用起来稍复杂,但直接。使用了netty之后,读的代码在ChannelHanlder的方法里,写数据用的ctx.writeAndFlush(msg)
,我们找不到“入口对象”了。这就需要我们理清一些关系和变化。
总的来说,nio的channel、bytebuf、selector是依赖关系(函数参数),而netty的bytebuf、channel则是关联关系(作为成员),甚至包括了线程。
- Bootstrap的作用是为了构造channel,确保其连接成功。即便使用了netty,我们依然可以实现基于SocketChannel的连接池
-
对于netty client,可以发送数据的地方有两个,一个pipeline中,一个是直接调用channel发送数据(并且还可以设置超时时间,同步异步等)
- 写代码的重点变了。在java bio或者hadoop对nio的封装中,我们读完数据数据将直面一个byte[]。使用netty后,我们的重点成了配置pipeline,但pipeline说穿了只是固化和简化了byte[]的处理逻辑。
Bootstrap和SocketChannel的关系
SocketChannel的几种创建方式
- 最简单,
SocketChannel.open()
- 工厂模式,
SocketChanelFactory.createChannel()
- 进一步包装,使用bootstrap,包含初始化一个SocketChannel的所有信息(一个SocketChannel包括EventLoop,pipeline,底层socket参数设置等。对SocketChannel的初始化,就是对它们的设置过程)
Bootstrap 以及 ServerBootstrap类都继承自 AbstractBootstrap。AbstractBootstrap is a helper class that makes it easy to bootstrap a Channel. It support method-chaining to provide an easy way to configure the AbstractBootstrap.也就是说,这是一个帮助类,为了更方便启用Channel,为了方便使用,支持method-chainning(也就是说函数都返回一个对自身的引用。这样我们就可以使用 instance.get().put(3).add(4)之类的写法,在NIO中很常见。)
在我的博客http://topsli.github.io/2015/11/30/program-habit.html
中就提到如何抽取一个类。bootstrap就可以理解为从channel复杂的初始化过程中抽取的一个类。
EventLoop 和 SocketChannel的关系
EvnetLoop实现关系非常复杂,观察它们多余的方法,你会发现其中的不同,它是在渐次实现一个目标:实现一个io读写线程安全的,“提交-执行”模型的框架。
- ScheduledExecutorService只是执行任务(调度的事不是重点,这里不提了,以下简称为Executor),任务干什么活儿,它们并不控制。实现Executor的类必须要有执行任务的能力,一个Executor的常见实现是:维护一个或多个线程集合,一个任务队列。用户线程调用
Executor.submit|execute
将任务加到任务队列中。Executor中的线程从任务队列中取出任务并执行。线程集合不一定非得是多个线程(大多数时候用的是ThreadPoolExecutor,所以会有这样的错觉),一个线程也可以。 - 从名字上看,如果说EventExecutorGroup是包含EventExecutor的容器,那EventExecutor为毛还要实现EventExecutorGroup接口? The EventExecutor is a special EventExecutorGroup which comes with some handy methods to see if a Thread is executed in a event loop.Beside this it also extends the EventExecutorGroup to allow a generic way to access methods. EventExecutor和EventExecutorGroup的一个主要区别是它们的
next()
方法实现不同,EventExecutorGroup从自己的EventExecutor集合中拿出一个返回,而EventExecutor只能返回自己。 - 对eventExecutor和eventExecutorGroup暂不做区分,EventExecutor和Executor的区别在哪里?Executor(以ThreadPoolExecutor为例) 中消费任务队列的是一个线程池。EventExecutor(以SingleThreadEventExecutor为例)消费任务队列的是一个线程。如果消费任务队列的是一个线程(以下叫EventExecutor.thread),就有有意思的地方啦。
eventExecutor.submit
可以由不同的调用线程调用来提交任务,但其taskQueue中的任务只有EventExecutor.thread一个线程执行,如果任务中包含对一个对象的操作,那么相当于只有一个线程执行这个对象。这样做有什么好处? -
我们换个描述,假设有一个MyObject
}class MyObject{ EventExecutor eventExecutor;// 将MyObject与一个eventExecutor绑定 void func1(){ // 如果当前线程对象是EventExecutor.thread,直接执行 if(eventExecutor.inEventLoop()){ dofunc1(); }else{ eventExecutor.submit(new Runnable(){ dofunc1(); }); } }
刚才我们提到,任何线程
eventExecutor.submit
提交的task最后只有EventExecutor.thread执行。现在变成了任何线程(除了EventExecutor.thread自己)执行MyObject.func1
都转换成MyObject绑定的EventExecutor.thread来执行。都被一个线程执行有什么好处呢?如果将MyObject替换成SocketChannel,那么SocketChannel对应的pipeline中handler只会被一个线程执行,这个handler就可以是有状态的。同时也实现了文章开头提到的不能多个thread同时操作Socket(或SocketChannel)的要求,但其对外提供的调用形式则允许任意多个线程调用。
- EventLoop则是在EventExecutor的基础上,添加了selector成员(必然了,也有了注册到selector上的register方法)。EventExecutor只是抽象了一种(一个taskQueue + 一个thread)ScheduledExecutorService实现,EventLoop除此之外,则和selector绑定在一起,其绑定的thread(以下叫EventLoop.thread)run方法,除执行注册到taskqueue中的任务外,还监听selector中的事件,并处理到达的selectorkey。
bootstrap.connect
在main线程中,执行channel.register(向selector注册),channel检查下是不是自己EventLoop.thread引发的调用(此刻是main线程干的),如果不是,则调用channel.eventloo.submit
将register这个任务提交给eventloop.taskQueue。最终确保一个效果,所有的channel操作只有channel.eventloop.thread来做,其它的线程都不可以,确保了线程安全性。
小结
所以,netty的思路应该是
- 对java nio进行封装,包括使用模式的简化(pipeline,handler和事件驱动(事件驱动算是java nio就有的,但netty进一步细化了))和基本组件(ByteBuf和Channel)的重新实现
- 采取主从线程模型,将acceptable和read/write event分开。
- 一个持有selector的线程能干什么事呢?while(true){selector.select();…},如果可读,就从selectorkey中拿到socketchannel.read开始读,socketchannel.read触发pipeline的执行。如果可写,就从selectorkey中拿到socketchannel.doWrite(持有写缓冲区)开始写。
- 需要一个异步非阻塞架构为调用者线程和EventLoop.thread的通信提供支持。