技术

如何使用RedisTemplate访问Redis数据结构 MySQL重要知识点 OAuth2认证授授权流程 分布式锁 服务调用 MQ的介绍 SpringCloud 使用链 Eureka 的点对点通信 介绍Eureka RabbitMQ与其它MQ的对比 Springboot 启动过程分析 Springboot 入门 Linux内存管理 自定义CNI IPAM 扩展Kubernetes 副本一致性 spring redis 源码分析 kafka实践 spring kafka 源码分析 Linux进程调度 让kafka支持优先级队列 Codis源码分析 Redis源码分析 C语言学习 《趣谈Linux操作系统》笔记 Kubernetes安全机制 jvm crash分析 Prometheus 学习 Kubernetes监控 Kubernetes 控制器模型 容器日志采集 容器狂占cpu怎么办? 容器狂打日志怎么办? Kubernetes资源调度-scheduler 时序性数据库介绍及对比 influxdb入门 maven的基本概念 《Apache Kafka源码分析》——server Kubernetes objects之编排对象 源码分析体会 自动化mock AIOps说的啥 从DevOps中挖掘docker的价值 《数据结构与算法之美》——算法新解 Kubernetes源码分析——controller mananger Kubernetes源码分析——apiserver Kubernetes源码分析——kubelet Kubernetes整体结构 ansible学习 Kubernetes源码分析——从kubectl开始 jib源码分析之Step实现 kubernetes实践 线程排队 jib源码分析之细节 从一个签名框架看待机制和策略 跨主机容器通信 jib源码分析及应用 docker环境下的持续构建 docker环境下的持续发布 一个容器多个进程 kubernetes yaml配置 marathon-client 源码分析 《持续交付36讲》笔记 程序猿应该知道的 mybatis学习 无锁数据结构和算法 《Container-Networking-Docker-Kubernetes》笔记 活用linux 命令 为什么很多业务程序猿觉得数据结构和算法没用? 串一串一致性协议 当我在说PaaS时,我在说什么 《数据结构与算法之美》——数据结构笔记 swagger PouchContainer技术分享体会 harbor学习 用groovy 来动态化你的代码 《深入剖析kubernetes》笔记 精简代码的利器——lombok 学习 java 语言的动态性 rxjava3——背压 rxjava2——线程切换 spring cloud 初识 JVM4——《深入拆解java 虚拟机》笔记 《how tomcat works》笔记 commons-pipeline 源码分析 hystrix 学习 rxjava1——概念 Redis 学习 TIDB 学习 分布式计算系统的那些套路 Storm 学习 AQS3——论文学习 Unsafe Spark Stream 学习 linux 文件系统 mysql 批量操作优化 《自己动手写docker》笔记 java8 实践 中本聪比特币白皮书 细读 区块链泛谈 比特币 大杂烩 总纲——如何学习分布式系统 forkjoin 泛谈 hbase 泛谈 看不见摸不着的cdn是啥 《jdk8 in action》笔记 程序猿视角看网络 calico 问题排查 bgp初识 mesos 的一些tips mesos 集成 calico calico AQS2——粗略的代码分析 我们能用反射做什么 web 跨域问题 《clean code》笔记 compensable-transaction 源码分析 硬件对软件设计的影响 elasticsearch 初步认识 mockito简介及源码分析 线上用docker要解决的问题 《Apache Kafka源码分析》——Producer与Consumer 停止容器 dns隐藏的一个坑 《mysql技术内幕》笔记2 《mysql技术内幕》笔记1 log4j学习 为什么netty比较难懂? 回溯法 apollo client源码分析及看待面向对象设计 java系并发模型的发展 从一个marathon的问题开始的 docker 环境(主要运行java项目)常见问题 Scala的一些梗 OpenTSDB 入门 spring事务小结 事务一致性 javascript应用在哪里 netty中的future和promise 《netty in action》读书笔记 netty对http2协议的解析 ssl证书是什么东西 一些tricky的code http那些事 苹果APNs推送框架pushy apple 推送那些事儿 编写java框架的几大利器 JVM3——java内存模型 java concurrent 工具类 java exception java io涉及到的一些linux知识 network channel network byte buffer 测试环境docker化实践 通用transport层框架pigeon netty(七)netty在框架中的使用套路 Nginx简单使用 《Linux内核设计的艺术》小结 从Go并发编程模型想到的 mesos深入 Macvlan Linux网络源代码学习2 《docker源码分析》小结 对web系统的一些理解 docker中涉及到的一些linux知识 hystrix学习 Linux网络源代码学习 Docker网络五,docker网络的回顾 zookeeper三重奏 数据库的一些知识 Spark 泛谈 commons-chain netty(六)netty回顾 Thrift基本原理与实践(三) Thrift基本原理与实践(二) Thrift基本原理与实践(一) Future 回调 Docker0.1.0源码分析 基于spring boot和Docker搭建微服务 通过Docker Plugin来扩展Docker Engine java gc Docker网络四,基于Centos搭建Docker跨主机网络 google guava的一些理解 Jedis源码分析 Redis概述 Docker回顾 深度学习是个什么鬼 Docker网络三,基于OVS实现Docker跨主机网络 Linux网络命令操作 JTA与TCC 换个角度看待设计模式 Scala初识 netty(四)netty对http协议的实现(废弃) netty(三)netty框架泛谈 向Hadoop学习NIO的使用 以新的角度看数据结构 AQS1——并发相关的硬件与内核支持 使用Ubuntu要做的一些环境准备 Docker网络二,libnetwork systemd 简介 那些有用的sql语句 异构数据库表在线同步 spring aop 实现原理简述——背景知识 quartz 源码分析 基于docker搭建测试环境(二) spring aop 实现原理简述 我们编程的那些潜意识 自己动手写spring(八) 支持AOP 自己动手写spring(七) 类结构设计调整 分析log日志 一次代码调试的过程 自己动手写spring(六) 支持FactoryBean 自己动手写spring(九) 总结 自己动手写spring(五) bean的生命周期管理 自己动手写spring(四) 整合xml与注解方式 自己动手写spring(三) 支持注解方式 自己动手写spring(二) 创建一个bean工厂 自己动手写spring(一) 使用digester varnish 简单使用 docker volume 关于docker image的那点事儿 基于docker搭建测试环境 分布式配置系统 JVM2——JVM和传统OS对比 git spring rmi和thrift maven/ant/gradle使用 再看tcp mesos简介 缓存系统——具体组件 缓存系统 java nio的多线程扩展 多线程设计模式/《Concurrency Models》笔记 回头看Spring IOC IntelliJ IDEA使用 Java泛型 vagrant 使用 Go 常用的一些库 Netty(一)初步了解 java mina Golang开发环境搭建(Windows下) java nio入门 ibatis自动生成类和文件 Python初学 Goroutine 调度模型猜想 一些编程相关的名词 虚拟网络 《程序员的自我修养》小结 VPN(Virtual Private Network) Hadoop安装与调试 Kubernetes持久化存储 Kubernetes 其它特性 访问Kubernetes上的服务 Kubernetes副本管理 Kubernetes pod 组件 使用etcd + confd + nginx做动态负载均衡 nginx安装与简单使用 在CoreOS集群上搭建Kubernetes 如何通过fleet unit files 来构建灵活的服务 CoreOS 安装 定制自己的boot2docker.iso CoreOS 使用 Go初学 JVM1——jvm小结 硬币和扑克牌问题 LRU实现 virtualbox 使用 os->c->java 多线程 容器类概述 zabbix 使用 zabbix 安装 Linux中的一些点 关于集群监控 ThreadLocal小结 我对Hadoop的认识 haproxy安装 docker快速入门

标签


netty(六)netty回顾

2016年07月25日

前言

越长大越发现,人生其实没多少道理,小学课本上都学过:学而时习之,不亦乐乎。笔者通过对netty一段时间的学习和使用,从代码角度对netty有了一些新的体会。

2016-11-17补充:参见Redis与Reactor模式通过select/poll/epoll/kqueue这些I/O多路复用函数库,我们解决了一个线程处理多个连接的问题,我觉得这是nio与bio的根本区别。

netty更进一步,将accept时间与read/write事件分开处理。

几种io模型代码的直观感受

《Netty权威指南》开篇使用各种io模型实现了一个TimeServer和TimeClient

BIO的实现

public class TimeServer{
	public static void main(String[] args){
   		ServerSocket serverSocket = null;
        ...
        while(true){
        	Socket socket = serverSocket.accept();
            new Thread(new TimeServerHandler()).start();
        }
    }
}
public class TimeServerHandler implements Runnable{
	private Socket socket;
    public void run(){
    	BufferedReader in = null;
        PrintWriter out = null;
        try{...}catch(Exception e){...}
    }
}

NIO的实现

public class TimeServerHandler implements Runnable{
	private selector selector;
    private ServerSocketChannel servChannel
	public void run(){...}
}

AIO的实现

public class TimeServerHandler implements Runnable{
	AsynchronousServerSocketChannel asyncServerSocketChannel;
    public void run(){
    	CountDownLatch latch = new CountDownLatch(1);
        asyncServerSocketChannel.accept(this,new CompletionHandler(){
        	public void completed(AsynchronousSocketChannel channel,TimeServerHandler attachment){
            	channel opt...
            }
        });
    	latch.await();
    }
}

网络数据读写,一方是用户线程,一方是内核处理,AIO、NIO和BIO,正体现了生产者和消费者两方线程的几种交互方式。从TimeServerHandler类成员的不同,就可以看到使用方式的差异。AIO和NIO都需要我们显式的提供线程去驱动数据的读写和处理,AIO由jdk底层的线层池负责回调,并驱动读写操作。

java原生NIO类库

java nio类库的三个基本组件bytebuffer,channel,selector, 它们是spi接口,java并不提供详细的实现(由jvm提供),java只是将这三个组件赤裸裸的提供给你,线程模型由我们自己决定采用,数据协议由我们自己制定并解析。

首先我们要了解java nio原生的类体系。以Channel interface为例,Channel,InterruptibleChannel,SelectableChannel等interface逐步扩展了Channel的特性。

public interface Channel extends Closeable {
    public boolean isOpen();
    public void close() throws IOException;
}
// 并没有新增方法,只是说明,实现这个接口的类,要支持Interruptible特性。
public interface InterruptibleChannel
    extends Channel
    public void close() throws IOException;
}

A channel that can be asynchronously closed and interrupted. A channel that implements this interface is asynchronously closeable: If a thread is blocked in an I/O operation on an interruptible channel then another thread may invoke the channel’s close method. This will cause the blocked thread to receive an AsynchronousCloseException.

这就解释了,好多类携带Interruptible的含义。

public abstract class SelectableChannel extends AbstractInterruptibleChannel implements Channel{ // SelectorProvider,Service-provider class for selectors and selectable channels.
    public abstract SelectorProvider provider();
    public abstract int validOps();
    public abstract boolean isRegistered();
    public abstract SelectionKey register(Selector sel, int ops, Object att)
        throws ClosedChannelException;
    public final SelectionKey register(Selector sel, int ops)
        throws ClosedChannelException{
        return register(sel, ops, null);
    }
    public abstract SelectableChannel configureBlocking(boolean block)
        throws IOException;
    public abstract boolean isBlocking();
    public abstract Object blockingLock();
}

In order to be used with a selector, an instance of this class must first be registered via the register method. This method returns a new SelectionKey object that represents the channel’s registration with the selector.

通过以上接口定义,我们可以知道,Channel接口定义的比较宽泛,理论上bio也可以实现Channel接口。所以,我们在分析selector和Channel的关系时,准确的说是分析selector与selectableChannel的关系:它们是相互引用的。selector和selectableChannel是多对多的关系,数据库中表示多对多关系,需要一个中间表。面向对象表示多对多关系则需要一个中间对象,SelectionKey。selector和selectableChannel都持有这个selectionkey集合。

netty做了什么工作

java nio类库的三个基本组件bytebuffer、channel、selector。java只是将这三个组件赤裸裸的提供给你,线程模型由我们自己决定采用,数据协议由我们自己制定并解析。

这里说点题外话:我们常用netty的nio特性,但netty不只用了nio,一些较高层的接口,bio也可以实现。以nio或selectable为前缀的子类,才是针对nio的扩展。(java.nio.channels也是类似的道理)

  java.nio netty
bytebuf   netty的bytebuf提供的接口与nio的bytebuffer是一致的,只是功能的增强,bytebuf只有在编解码器中才会用到
selector   完全隐藏
channel   完全重写
线程模型   固定好了

channel如何重写呢?AbstractChannel类特别能说明问题。AbstractChannel聚合了所有channel使用到的能力对象,由AbstractChannel提供初始化和统一封装,如果功能和子类强相关,则定义成抽象方法,由子类具体实现。

AbstractChannel{
    Channel parent;
    Unsafe unsafe;
    // 读写操作全部转到pipeline上
    DefaultChannelPipeline pipeline;
    EventLoop eventloop;
    // 保有这么多future,这是要干啥
    SuccessedFuture,ClosedFuture,voidPromise,unsafeVoidPromise
    localAddress,remoteAddress
}

为什么重写后的Channel会有这么多成员呢?这事儿得慢慢说。

how netty works

笔者曾经读过一本书《how tomcat works》,从第一个例子十几行代码开始讲述tomcat是如何写出来的,此处也用类似的风格描述下。

我们先从一个最简单的NIOServer代码示例开始,单线程模型:

public class NIOServer {
    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.socket().bind(new InetSocketAddress(8080));
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        while (true) {
        	   // 所有连接、所有事件阻塞在一处 
            selector.select(1000);
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            Iterator<SelectionKey> it = selectedKeys.iterator();
            SelectionKey key = null;
            while (it.hasNext()) {
                key = it.next();
                it.remove();
                handleKey(key);
            }
        }
    }
    public static void handleKey(SelectionKey key) throws IOException {
        if (key.isAcceptable()) {
            // Accept the new connection
            ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
            SocketChannel sc = ssc.accept();
            sc.configureBlocking(false);
            // Add the new connection to the selector
            sc.register(key.selector(), SelectionKey.OP_READ | SelectionKey.OP_WRITE);
            System.out.println("accept...");
        } else if (key.isReadable()) {
            SocketChannel sc = (SocketChannel) key.channel();
            ByteBuffer readBuffer = ByteBuffer.allocate(1024);
            // handle buffer
            int count = sc.read(readBuffer);
            if (count > 0) {
                String receiveText = new String(readBuffer.array(), 0, count);
                System.out.println("服务器端接受客户端数据--:" + receiveText);
            }
        }
    }
}

以下忽略main方法的书写。

我们对上述代码进行简单的抽取,将while(it.hasNext()){..},handleKey(){...}抽取到一个worker线程中。这样的线程有个学名,叫eventloop,于是

class NIOServer{
	main(){
		ServerSocketChannel ...
   	while(true){
   		selector.select(1000);
   		new Worker(SelectionKey).start();
   	}
	}
}

当然了,大家都提倡将acceptable和read/write event分开,我们可以换个方式抽取原始代码:boss和worker线程都执行while(true){selector.select(1000);...},只不过boss专门处理acceptable事件,worker只处理r/w事件。

class NIOServer{
	ServerSocketChannel ...
    Selector selectror = ...
    new Boss(selector).start();
	 new Worker(selector).start();
}

boss和worker共享一个selector虽然简单,但是扩展性太低,因此让boss和worker各用各的selector,boss thread accept得到的socketchannel通过queue传给worker,worker从queue中取下socketChannel”消费”(将socketChannel注册到selector上,interest读写事件)。简单实现如下:

class NIOServer{
	Queue<SocketChannel> queue = ...
   new Boss(queue).start();
	new Worker(queue).start();
}

除了共享queue,传递新accept的socket channel另一种方法是,boss thread保有worker thread的引用,worker thread除了run方法,还提供registerSocketChannel等方法。这样,boos thread就可以通过worker.registerSocketChannel把得到的SocketChannel注册到worker thread 的selector。

说句题外话,笔者以前分解的代码都是静态的,简单的说就是将一个类分解为多个类。本例中,代码分解涉及到了线程,线程对象不只有一个run方法,还可以具备registerChannel的功能。所以,在nio中,线程模型与nio通信代码的结合,不只是new Thread(runnable).start()去驱动代码执行,还深入到了代码的分解与抽象中。

然后再将Boss和worker线程池化,是不是功德圆满了呢?还没有.

nio类库提供给用户的三个基本操作类bytebuffer,channel,selector,虽然抽象程度低,但简单明了,直接提供read/write data的接口。以我们目前的抽象,netty程序的驱动来自boss和worker thread,问题来了?读取的数据怎么处理(尤其是复杂的处理),我们如何主动地写入数据呢?总得给用户一个入口对象。(任何框架,总得有一个入口对象供用户使用,比如fastjson,JSON对象就是其对应的入口对应。比如rabbitMQ,messageListner是其读入口对象,rabbitTemplate是其写入口对象)。

netty选择将channel作为写的入口对象,将channel从worker thread中提取出来,channel提出来之后,worker thread便需要提供自己(内部的selector)与channel交互的手段,比如register方法。

channel提出来之后,读写数据的具体逻辑代码也要跟着channel提取出来,这样worker thread中的代码可以更简洁。但本质上还是worker.handlekey才知道什么时候读到了数据,什么时候可以写数据。因此,channel支持触发数据的读写,但读写数据的时机还是由work thread决定。我们要对channel作一定的封装。伪代码如下

ChannelFacade{
    channel	// 实际的channel
    writeBuffer	// 发送缓冲区 
    handleReadData(Buffer){}	// 如何处理读到的数据,由worker thread触发	
    write()					// 对外提供的写数据接口
    doWrite()			// 实际写数据,由workerThread触发
    workerThread		// 对应的Channel
}
class NIOServer{
	 ServerSocektChannel srvSocketChannel = ...
    new Boss(srvSocketChannel){};
    new Worker().start();
}
class Boss extends Thread{
	public void run(){
    	SocketChannel socketChannel = srvSocketChannel.accept();
        ChannelFacade cf = facade(socketChannel);
        worker.register(cf); //如果cf保有workerThread引用的话,也可以
        cf.register();
    }
} 

将channel与其对应的reactor线程剥离之后,一个重要的问题是:如何确保channel.read/write是线程安全的。一段代码总在一个线程下执行,那么这段代码就是线程安全的,每个channel(或channel对应的channelhandler,ChannelhandlerContext)持有其约定reactor线程的引用,每次执行时判断下:如果在绑定的reactor线程,则直接执行,如果不在约定线程,则向约定线程提交本任务。

channelhandler一门心思处理业务数据,channelhandlercontenxt触发事件函数的调用,并保证其在绑定的reactor线程下执行

这样,我们就以《how tomcat works》的方式,猜想了netty的主要实现思路,当然,netty的实现远较这个复杂。但复杂在提高健壮性、丰富特性上,主要的思路应该是这样的。

读写事件的处理

我们提到,将channel从work thread抽取出来后,channel和 work thread的交互方式。

  1. read由work thread驱动,work thread 通过select.select()得到selectkey中拿到channel和niosocketchannel(保存在attachment中),就可以调用netty socketchannel的读方法。
  2. write 由netty socketchannel直接驱动,但问题是,socketchannel作为入口对象,socketchanel.write可能在多个线程中被调用,多个线程同时执行channel.write,同样都是目的缓冲区,你写点,我写点,数据就乱套了。重复一下 解决方法就是,为每个channel绑定一个work thread(一个work thread可以处理多个channel,一个channel却只能被同一个work thread处理)即netty socketchannel持有了work thread引用,执行chanel.write时先判断现在是不是在自己绑定的work thread,是,则直接执行;如果不是,则向work thread提交一个任务,work thread在合适的时机处理(work thread有一个任务队列)。

read的处理过程:worker thread触发unsafe.read ==> pipeline.fireChannelRead ==> head(channelhandlercontext).fireChannelRead

 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
    if (!ch.isOpen()) {
        // Connection already closed - no need to handle write.
        return;
    }
}

write分为两条线:

  1. worker thread在可写的时候,调用unsafe.forceFlush() == AbstractUnsafe.flush0() ==> doWrite(outboundBuffer),将写缓冲区数据发出。
  2. 用户ctx.write的时候,一直运行到headContext.write ==> unsafe.write(),将数据加入到写缓冲区中。

     AbstractUnsafe{
         ChannelOutboundBuffer outboundBuffer	// 写缓冲区
         write(msg)		将数据加入到outboundBuffer中
         dowrite()	// 实际的发送数据
     }
    
     if ((readyOps & SelectionKey.OP_WRITE) != 0) {
         // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
         ch.unsafe().forceFlush();
     }
    

DefaultChannlePipeline有一个HeadContext和TailContext,是默认的pipeline的头和尾,outbound事件会从tail outbound context开始,一直到headcontenxt。

	@Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        unsafe.write(msg, promise);
    }

pipeline

filter能够以声明的方式插入到http请求响应的处理过程中。

inbound事件通常由io线程触发,outbound事件通常由用户主动发起。

ChannelPipeline的代码相对比较简单,内部维护了一个ChannelHandler的容器和迭代器(pipeline模式都是如此),可以方便的进行ChannelHandler的增删改查。

  1. ChannelPipeline
  2. DefaultChannelPipeline
  3. ChannelHandler
  4. ChannelHandlerContext,**Enables a ChannelHandler to interact with its ChannelPipeline and other handlers. ** A handler can notify the next ChannelHandler in the ChannelPipeline,modify the ChannelPipeline it belongs to dynamically.

几个类之间的关系

channelpipeline保有channelhandler的容器,这在java里实现办法可就多了

  1. channelpipeline直接保有一个list(底层实现可以是array或者list)
  2. 链表实现,Channelpipeline只保有一个header引用(想支持特性更多的话,就得加tail)。只不过这样有一个问题,handler本身要保有一个next引用。如果既想这么做,又想让handler干净点,那就得加一个channelhandlercontext类,替handler保有next引用。

代码如下

channelpipeline{
    channelhandlercontext header;
}
channelhandlercontext{
    channelhandler handler;
    channelhandlercontext next;
    EventExecutor executor;
    @Override
    public ChannelHandlerContext fireChannelActive() {
        final AbstractChannelHandlerContext next = findContextInbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelActive();
        } else {
            executor.execute(new OneTimeTask() {
                @Override
                public void run() {
                    next.invokeChannelActive();
                }
            });
        }
        return this;
    }
    private void invokeChannelActive() {
        try {
            ((ChannelInboundHandler) handler()).channelActive(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    }
}

从这就可以看到,Channelhandlercontext不只是替Channelhandler保有下next指针,将pipeline的fireChannelxxx 转化为channelhandler的channelxxx方法。

小结

回过头来再看,java nio类库的三个基本组件bytebuffer、channel、selector,数据的读写就是这三个组件的相互作用,线程模型的选择留给用户。netty则是使用eventloop隐藏了selector(将selector和线程绑在一起),使用pipeline封装了数据的处理,在它们复杂关系的背后,它们的起点,或许还是那个最简单的NIOServer程序。

2018.7.1 参考 异步编程

把书读薄

  1. 把死记硬背的细节转化为逻辑上可以理解的东西
  2. 很多知识都可以共用,比如此处的链式模式,如果以前知道链式模式的实现,那么理解这块就会很简单。