快速回顾
接上文,一次 grpc 的请求包括四个步骤,客户端发送请求到网络,是第一步。发送过程,Stub 把请求传给了ClientCall, ClientCall 传给了 ClientStream,ClientStream 有Netty 的实现,内部会把数据传输到网络上。
本文主要介绍 blockingUnaryCall 是如何使用 ClientCall ClientChannel 工作的, waitAndDrain 和 ClientCall 的方法如何对应。
Stub 外层简述
最外层的Stub,echo方法是生成的代码,里面只有一行,使用静态方法ClientCalls.blockingUnaryCall
blockingUnaryCall 的实现,
- 使用ClientChannel 创建 ClientCall, 这个ClientCall 的类型就是 Pendingcall;
- 使用ClientCall 创建
ListenableFuture
存放 response - 执行发送过程
- response 取出结果
无论这里时序如何,还是会执行 PendingCall 发消息,最后通过观察者模式,把结果取回来。
小结:Pendingcall 发送客户端消息
ClientCall 做了什么
ClientCall 有四个主要的方法,start,request,sendMessage, halfClose
Channel 相当于 ClientCall 的工厂,下面具体到类型进行分析。
PendingCall 结构
PendingCall 的实现如下图
PendingCall 是一个 DelayClientCall,通过realChannel 创建了 ClientCallImpl 作为成员变量,因此 PendingCall 的 最终会执行 ClientCallImpl 的对应方法。
小结: PendingCall 代理了 ClientCallImpl,PendingCall 借助 ManagedChannelImpl 的RealChannel 创建realCall
ManagedChannelImpl 的创建
Channel 那一块,由一下代码创建出 ManagedChannelImpl
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080).usePlaintext().build();
ManagedChannelImpl
里面包含 ManagedChannelImpl.RealChannel
和上图串起来了。
https://cdn.hashnode.com/res/hashnode/image/upload/v1662278195441/NnO2uNjhW.png
小结: 静态方法通过SPI 创建了ManagedChannelImpl
PendingCall 如何创建的
在 blockingUnaryCall
里 channel.newCall
因此是 ManagedChannelOrphanWrapper 创建了 PendingCall, ManagedChannelOrphanWrapper 委托给 ManageChannelImpl , ManageChannelImpl 委托给 interceptorChannel
, interceptorChannel
的创建方式是
ClientInterceptors.intercept(channel, interceptors)
, interceptorChannel
就是包了realchannel,
ManagedChannelImpl 的是实现中,
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions) {
return interceptorChannel.newCall(method, callOptions);
}
** 小结: ManagedChannelImpl 使用带拦截的RealChannel 创建的 PendingCall
PendingCall 为什么要包 ClientCallImpl
大概思考一下,如果底层是Channel,Channel通常要进行网络连接,这个会放在另外一个线程里。 createChannel 之后有一个netty 线程进行连,等到ready 之后才把pending 的任务都执行一下。
接口上,这里主要是包了一个reprocess 方法, 可以和 InternalConfigSelector 进行配合。(todo) reprocess 的作用就是 给 PendingCall 初始化 ClientCallImpl
PendingCall 的大部分方法还是 DelayClientCall,那么主要看 DelayClientCall 的作用
A call that queues requests before a real call is ready to be delegated to.
再 realCall 准备完成之前,把请求放到队列里。
- List<Runnable> pendingRunnables
- boolean passThrough
请求指的是 Runnable 的任务, 当任务执行完成之后,标志位 passThrough 为 true;再执行 DelayClientCall, 就会执行 ClientCallImpl;
通过断点可以看到, Runnable 任务有四个,分别是执行ClientCallImpl 的 start,request,sendMessage, halfClose。
因此,PendingCall 执行方法的时候时候,不会真的执行,而且把方法放入list 中,等到万事具备,执行 DelayClientCall 的 setCall 方法返回一个Runable的 drainPendingCalls,可以依次执行这些任务。
TODO passThrough 什么时候 为true ?
哪个线程池执行了 pendingRunnables
Stub 中会有线程池,ClientCall 和 channel 也会用到线程池,下面梳理一下线程池,用于后面分析这些pending任务的时序。下面发现有两个
线程池创建
ManageChannelImpl 里有有 executor
,默认值是 executorPool
get出来的。
executorPool
相当于一个ExecutorFactory, 默认值在GrpcUtil里有一个static 成员变量
return Executors.newCachedThreadPool(getThreadFactory(grpc-default-executor + "-%d", true));
这样,就可以用在ManageChannelImpl
一些任务的执行。
任务1
new Runnable() {
@Override
public void run() {
exitIdleMode();
}
}
另外netty 有两个线程 , ThreadPerTaskExecutor
这个是,NettyChannelBuilder 先弄了一个 executorPool
,再从 executorPool
里创建出来的。 executorPool
的默认值是 SharedResourcePool.forResource(Utils.*DEFAULT_WORKER_EVENT_LOOP_GROUP*);
名称是 grpc-default-worker-ELG, 这个是
因此 DelayedClientCall 的 pendings 任务还是 main 主线程执行的,用了一个无线程的线程池.
- 先制造任务
- 再执行任务
blockingUnaryCall 二次分析
目前已经了解到Channel 和 ClientCall 的创建和相互关系,也了解到线程池和时序。
那么futureUnaryCall 这个异步 和 drain 的过程是怎么实现的呢?
ListenableFuture<RespT> responseFuture = futureUnaryCall(call, req);
相当于 创建了四个 Runnable的任务, 这四个任务,pendingCall 执行了四个方法,但是最终的ClientCallImpl 并没有执行这四个方法,而且变成 Runnable 放入List 中。
executor.waitAndDrain();
executor 的 drain 方法,才会执行这四个方法
getUnchecked(responseFuture);
ThreadlessExecutor
怎么工作- Runnable 任务怎么构造,放在哪个实例的List 里了,后续Drain 是怎么把Runnable 取出来的
ListenableFuture
怎么接住结果的
ThreadlessExecutor 代码分析
execute 方法把runnable 放进队列里, waiter 在RPC 结束后会变成SHUTDOWN 常量
- 如果 SHUTDOWN ,就从队列移除
- 如果 不是SHUTDOWN, 就会让 waiter unpark, 如果wait 是null ,就什么都不会发生。
execute 就是一个 unpark通知,那waiter 是进入park 状态, waiter 醒来之后会干什么?
@Override
public void execute(Runnable runnable) {
add(runnable);
Object waiter = this.waiter;
if (waiter != SHUTDOWN) {
LockSupport.unpark((Thread) waiter); // no-op if null
} else if (remove(runnable) && rejectRunnableOnExecutor) {
throw new RejectedExecutionException();
}
}
看 waitAndDrain 方法
1 先队列取任务 2.如果没有任务,waiter = Thread.currentThread(); 并且继续取任务,进入park 状态,循环等待,取到了任务,就把waiter 设置为空
- 只要有任务,就把队列里的任务执行完,当前线程执行,并且吞异常.
如果一直没有任务,就会hang住,除非中断,也没有new thread;
用法应该先
- 用execute 把任务放队列里,然后waiter 为null, 就一直放。 比如放了四个任务
- 用waitAndDrain 拿到任务了
或者
- 用waitAndDrain 等待,另外一个线程,用execute 把任务放进去, 然后唤醒了.
小结: ThreadlessExecutor 的作用是使用unpark 协调两个线程的任务,形成生产消费的模式
ThreadlessExecutor 结合上下文
execute
方法 是放任务并唤醒,waiterAndDrain
方法是等待,一次性取完,
虽然waiterAndDrain 是一次性执行完,但 responseFuture 只要不是Done, 就会一直执行下去。
根据断点,执行了以下五个任务,分别由不同的线程生产
任务1. penndingCall的reprocess 里的组合任务 DelayedPendingCall.drainPendingCalls 和 PendingCallRemoval (grpc-default-executor-0 生产 main 执行) ???
任务2. DelayedPendingCall里的drainPendingCalls 的最后一步 DrainListenerRunnable 任务 (main 生产 main 执行) 貌似是main 执行上一个任务的最后产生的,就是让listener也清空一下call
任务3. DelayedClientTransport.reprocess 里 stream.createRealStream(transport); (grpc-nio-worker-ELG-1-2)
任务4. ClientCallImpl$ClientStreamListenerImpl 的 headersRead 的任务 (grpc-nio-worker-ELG-1-2)
任务5 ClientCallImpl$ClientStreamListenerImpl 的 MessagesAvailable 的任务 (grpc-nio-worker-ELG-1-2)
3,4,5 都是传输时候的,一个建stream连接,两个读消息, 读完了消息需要 drain 一下任务,返回给listener,
可以看到这些任务里并没有 clientCall 的四个任务, 这四个任务是在任务1里执行。
放入时机: Main 线程 ListenableFuture<RespT> responseFuture = futureUnaryCall(call, req);
Stub 直接使用 DelayedClientCall 实例依次 执行了 四个方法,放入了四个任务
取出时机: Main 线程 Stub 在 waitAndDrain 的时候,执行了任务1, DelayedPendingCall.drainPendingCalls , DelayedPendingCall.drainPendingCalls 会把四个方法都执行.
blockingUnaryCall 小结
串起来起来,就是运行的时候,
创建Channel 套娃, Channel 套娃创建 ClientCall套娃 外层,PendingCall 在 reprocess 的时候, 使用realChannel 创建clientCallImpl , clientCallImpl 有四个方法,发起客户端的请求.
时序上大体:
- Main 在准备Listener的时候, 在PendingCall 的字段里准备四个方法的任务
- grpc-default 执行Pending 的reprocess, 完成 创建clientCallImpl的创建,并通过 ThreadlessExecutor 给 Main 线程传递 drainPendingCalls 的任务
- main 醒来执行 ThreadlessExecutor 的任务, 也就是 DelayedPendingCall 的 drainPendingCalls 方法,
1和2 是同时执行的,但是3 和 1 同一段代码里,顺序已经保证了, 3 会等2, 因此 ,3 最后执行就行。
断点日志如下:
trace 线程 main 添加任务 realCall.start()
trace 线程 grpc-default-executor-0 添加任务 drainingPendingCall
trace 线程 main 添加任务 realCall.send()
trace 线程 main 执行任务 drainingPendingCall
因此,PendingCall有四个方法要执行,主线程准备好任务,grpc 线程创建 创建clientCallImpl 之后,通知主线程执行 drainPendingCalls。 借助于 ThreadlessExecutor 完成了 IPC
// TODO 画一个时序图
总结
- 上一篇,梳理了客户端发送包括 Stub, ClientCall,ClientStream
- 见类图,ClientCall 和ClientChannel 创建的过程见类图,多层嵌套之后,最底层有一个realChannel,一切源于Channel的创建
- 见时序图,ClientCall 四个方法包装成任务,借助 ThreadlessExecutor 实现了线程的协调,即 grpc-default-executor线程准备任务, main线程 执行任务。
MORE:
整理清楚了,其实还挺简单的。
更深入, Stub 层: 为什么需要 grpc-default-executor 来准备任务,grpc-default 应该是有一些准备底层的工作,把 clientCallImpl 设置好之后,比如联通了,才让调用开始执行。
- DelayCall 什么时候会passthrough -Stream 层:可以看一下 Stream这一层是做什么,