前言
了解netty及其相关框架,最好事先了解下异步编程思想异步编程
relayrides/pushy是一个 Java library for sending APNs (iOS, OS X, and Safari) push notifications. 揉和了netty和http2协议。因此,除了它本身的功能外,从中也可以学到许多对netty框架以及http2协议的使用技巧。
其它基于http2的apns客户端linkedkeeper/apns-http2,文章浅谈基于HTTP2推送消息到APNs
对外接口
// 构建payload
ApnsPayloadBuilder payloadBuilder = new ApnsPayloadBuilder();
payloadBuilder.setAlertBody("Example!");
String payload = payloadBuilder.buildWithDefaultMaximumLength();
String token = TokenUtil.sanitizeTokenString("<efc7492 bdbd8209>");
// 构建推送model
SimpleApnsPushNotification pushNotification = new SimpleApnsPushNotification(token, "com.example.myApp", payload);
// 发送
Future<PushNotificationResponse<SimpleApnsPushNotification>> sendNotificationFuture = apnsClient.sendNotification(pushNotification);
从pushy中学到的
既写又读的handler
一般框架Client,通过一个id关联request和response(通过future来获取),这个future要在多个类之间传递。
class Client{
Map<id,future> map;
Future<Response> send(request){
future = new xxx;
map.put(id,future);
channel.write(request).addListener(new xx{
onComplete(){
xxx
future.set(xx)
}
});
return future;
}
}
class ReceiveHandler{
Map<id,future> map;
onDataReceive(response){
future = map.get(response.getId());
future.set(xx)
}
}
而pushy则是
class ApnsClient{
Future<Response> send(request){
future = new xx();
channel.write(new Composite(request,future));
return future;
}
}
ApnsClientHandler{
Map<id,future> map;
write(composite){
request = composite.getRequest()
future = composite.getFuture();
map.put(id,future)
write(request).addListener(){
future.set(xx)
}
}
onDataReceive(response){
future = map.get(response.getId());
future.set(xx);
}
}
从代码上讲,write和receive都写在一个类里(这里是),因为write和read要共享很多数据,这些数据作为ApnsClientHandler的成员,代码上就可以更紧凑。
future/promise的转换
这个其实也不是什么技巧,promise是可写的future。
Future operation1(){
Promise promise = xx
xxFuture future2 = operation2();
future2.addListener(new xxListener(){
onComplete(){
promise.setxx
}
});
return promise;
}
在本例中,future2并不能直接返回,需要转换成promise(具体的说,通过future2的listener为promise赋值)。pushy一个常见的场景,ApnsClient.sendNotification()
的发送流程:
- 检查ApnsClient 是否是关闭状态
- 从channel pool中acquire channel,可能会失败。acquire 返回异步future
- write success,当发送缓冲区拥堵、网络断开时,会write fail。write返回异步future
- read response success
promise关注的是operation1整个过程是否会失败,因此不能直接返回future2.也就是说,一个异步操作的结果,可能是多个异步操作结果的叠加。在更复杂的例子中,要为operation2传入promise,介入更深层次的异步操作。可以使用netty的io.netty.util.concurrent.PromiseCombiner
来简化一些叠加操作。
有的promise为什么要传入Executor,或者根据Executor生成?还以ApnsClient.sendNotification()
流程为例:对于后两个步骤,其listener是“事件驱动引擎”在执行promise.setFail/setSuccess
时执行的,对于前两个步骤,则由调用线程执行,而promise listener的逻辑有时会和channel相关,由调用线程执行有线程安全问题,最好也交给“事件驱动引擎”执行。
操作接口的演进
netty对外提供的操作接口有以下几个问题,当然,这也不怨netty
- 操作对象有多个,比如bootstrap、channel。并且,操作对象间存在依赖关系,channel不是直接初始化,而是通过bootstrap获得。
- 启动逻辑复杂,必须由用户显式编写代码。有的框架本身仅仅业务逻辑,故无需启动(显式调用构造函数、或执行
xx.start()
),比如fastjson。有的本身用到了线程,故需显式启动,比如common-pool。但common-pool的启动逻辑非常简单,netty则较为复杂 -
操作接口不简洁,对于io操作,简介的接口应该是
- 同步,
response send(request)
- 异步,
Future<response> send(request)
而netty则无明确封装
- 同步,
通过pushy,我们可以学习到如何封装netty,同时学习如何以纯异步的方式新增自定义逻辑,比如线程池等。
最佳实践
2018.6.18 补充 relayrides/pushy
-
Flow control。pushy 是纯异步接口,pushy 有个控制,实现原理估计是1500个future 未返回时,则将后续的请求 缓冲起来,等等这个1500个inflight 的reqeust。
当数据量很大时,必然触发1500 的上线,进而大量请求 缓存在netty buffer 中,然后耗尽内存。此时,需要一个 flow control layer ,可以使用CountDown 或 Semaphore
其它
当我们有一个新的协议
- 像pushy一样,http2协议独立于netty(如何基于netty独立的实现第三方协议,以及这个协议与netty的结合方式),pushy + http2 + netty 组合。从代码上看,协议的encoder/decoder上直接继承MessageToMessageEncoder/ByteToMessageDecoder。
- 像zk等一样,将netty 作为一个transport层。
model | 编解码 | ||
---|---|---|---|
业务层 | 业务model | 继承message,实现encode、decode | 调用transport层提供的send接口 |
transport层 | message | 抽象方法 encode、decode | transport 只是实现message的交互 |
如何基于netty实现一个自定义协议,从客户端方面来说,基本可以找到所有必要的细节。