【夜读源码】grpc-java 客户端发送 (一)

·

4 min read

【夜读源码】grpc-java  客户端发送 (一)

快速回顾

接上文,一次 grpc 的请求包括四个步骤,客户端发送请求到网络,是第一步。发送过程,Stub 把请求传给了ClientCall, ClientCall 传给了 ClientStream,ClientStream 有Netty 的实现,内部会把数据传输到网络上。

本文主要介绍 blockingUnaryCall 是如何使用 ClientCall ClientChannel 工作的, waitAndDrain 和 ClientCall 的方法如何对应。

Stub 外层简述

最外层的Stub,echo方法是生成的代码,里面只有一行,使用静态方法ClientCalls.blockingUnaryCall

blockingUnaryCall 的实现,

  1. 使用ClientChannel 创建 ClientCall, 这个ClientCall 的类型就是 Pendingcall;
  2. 使用ClientCall 创建 ListenableFuture 存放 response
  3. 执行发送过程
  4. response 取出结果

无论这里时序如何,还是会执行 PendingCall 发消息,最后通过观察者模式,把结果取回来。

小结:Pendingcall 发送客户端消息

ClientCall 做了什么

ClientCall 有四个主要的方法,start,request,sendMessage, halfClose

Channel 相当于 ClientCall 的工厂,下面具体到类型进行分析。

PendingCall 结构

PendingCall 的实现如下图

image.png

PendingCall 是一个 DelayClientCall,通过realChannel 创建了 ClientCallImpl 作为成员变量,因此 PendingCall 的 最终会执行 ClientCallImpl 的对应方法。

小结: PendingCall 代理了 ClientCallImpl,PendingCall 借助 ManagedChannelImpl 的RealChannel 创建realCall

ManagedChannelImpl 的创建

Channel 那一块,由一下代码创建出 ManagedChannelImpl

ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080).usePlaintext().build();

ManagedChannelImpl 里面包含 ManagedChannelImpl.RealChannel 和上图串起来了。

https://cdn.hashnode.com/res/hashnode/image/upload/v1662278195441/NnO2uNjhW.png

image.png 小结: 静态方法通过SPI 创建了ManagedChannelImpl

PendingCall 如何创建的

blockingUnaryCallchannel.newCall 因此是 ManagedChannelOrphanWrapper 创建了 PendingCall, ManagedChannelOrphanWrapper 委托给 ManageChannelImpl , ManageChannelImpl 委托给 interceptorChannelinterceptorChannel 的创建方式是

ClientInterceptors.intercept(channel, interceptors)interceptorChannel 就是包了realchannel,

ManagedChannelImpl 的是实现中,

  @Override
  public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
      CallOptions callOptions) {
    return interceptorChannel.newCall(method, callOptions);
  }

** 小结: ManagedChannelImpl 使用带拦截的RealChannel 创建的 PendingCall

PendingCall 为什么要包 ClientCallImpl

大概思考一下,如果底层是Channel,Channel通常要进行网络连接,这个会放在另外一个线程里。 createChannel 之后有一个netty 线程进行连,等到ready 之后才把pending 的任务都执行一下。

接口上,这里主要是包了一个reprocess 方法, 可以和 InternalConfigSelector 进行配合。(todo) reprocess 的作用就是 给 PendingCall 初始化 ClientCallImpl

PendingCall 的大部分方法还是 DelayClientCall,那么主要看 DelayClientCall 的作用

A call that queues requests before a real call is ready to be delegated to.

再 realCall 准备完成之前,把请求放到队列里。

- List<Runnable> pendingRunnables
- boolean passThrough

请求指的是 Runnable 的任务, 当任务执行完成之后,标志位 passThrough 为 true;再执行 DelayClientCall, 就会执行 ClientCallImpl;

通过断点可以看到, Runnable 任务有四个,分别是执行ClientCallImpl 的 start,request,sendMessage, halfClose。

因此,PendingCall 执行方法的时候时候,不会真的执行,而且把方法放入list 中,等到万事具备,执行 DelayClientCall 的 setCall 方法返回一个Runable的 drainPendingCalls,可以依次执行这些任务。

TODO passThrough 什么时候 为true ?

哪个线程池执行了 pendingRunnables

Stub 中会有线程池,ClientCall 和 channel 也会用到线程池,下面梳理一下线程池,用于后面分析这些pending任务的时序。下面发现有两个

线程池创建

ManageChannelImpl 里有有 executor ,默认值是 executorPool get出来的。

executorPool 相当于一个ExecutorFactory, 默认值在GrpcUtil里有一个static 成员变量

return Executors.newCachedThreadPool(getThreadFactory(grpc-default-executor + "-%d", true));

这样,就可以用在ManageChannelImpl 一些任务的执行。

任务1

new Runnable() {
        @Override
        public void run() {
          exitIdleMode();
        }
      }

另外netty 有两个线程 , ThreadPerTaskExecutor 这个是,NettyChannelBuilder 先弄了一个 executorPool,再从 executorPool 里创建出来的。 executorPool 的默认值是 SharedResourcePool.forResource(Utils.*DEFAULT_WORKER_EVENT_LOOP_GROUP*); 名称是 grpc-default-worker-ELG, 这个是

因此 DelayedClientCall 的 pendings 任务还是 main 主线程执行的,用了一个无线程的线程池.

  • 先制造任务
  • 再执行任务

blockingUnaryCall 二次分析

目前已经了解到Channel 和 ClientCall 的创建和相互关系,也了解到线程池和时序。

那么futureUnaryCall 这个异步 和 drain 的过程是怎么实现的呢?

ListenableFuture<RespT> responseFuture = futureUnaryCall(call, req);

相当于 创建了四个 Runnable的任务, 这四个任务,pendingCall 执行了四个方法,但是最终的ClientCallImpl 并没有执行这四个方法,而且变成 Runnable 放入List 中。

executor.waitAndDrain();

executor 的 drain 方法,才会执行这四个方法

getUnchecked(responseFuture);
  • ThreadlessExecutor 怎么工作
  • Runnable 任务怎么构造,放在哪个实例的List 里了,后续Drain 是怎么把Runnable 取出来的
  • ListenableFuture 怎么接住结果的

ThreadlessExecutor 代码分析

execute 方法把runnable 放进队列里, waiter 在RPC 结束后会变成SHUTDOWN 常量

  1. 如果 SHUTDOWN ,就从队列移除
  2. 如果 不是SHUTDOWN, 就会让 waiter unpark, 如果wait 是null ,就什么都不会发生。

execute 就是一个 unpark通知,那waiter 是进入park 状态, waiter 醒来之后会干什么?

    @Override
    public void execute(Runnable runnable) {
      add(runnable);
      Object waiter = this.waiter;
      if (waiter != SHUTDOWN) {
        LockSupport.unpark((Thread) waiter); // no-op if null
      } else if (remove(runnable) && rejectRunnableOnExecutor) {
        throw new RejectedExecutionException();
      }
    }

看 waitAndDrain 方法

1 先队列取任务 2.如果没有任务,waiter = Thread.currentThread(); 并且继续取任务,进入park 状态,循环等待,取到了任务,就把waiter 设置为空

  1. 只要有任务,就把队列里的任务执行完,当前线程执行,并且吞异常.

如果一直没有任务,就会hang住,除非中断,也没有new thread;

用法应该先

  1. 用execute 把任务放队列里,然后waiter 为null, 就一直放。 比如放了四个任务
  2. 用waitAndDrain 拿到任务了

或者

  1. 用waitAndDrain 等待,另外一个线程,用execute 把任务放进去, 然后唤醒了.

小结: ThreadlessExecutor 的作用是使用unpark 协调两个线程的任务,形成生产消费的模式

ThreadlessExecutor 结合上下文

execute方法 是放任务并唤醒,waiterAndDrain方法是等待,一次性取完,

虽然waiterAndDrain 是一次性执行完,但 responseFuture 只要不是Done, 就会一直执行下去。

根据断点,执行了以下五个任务,分别由不同的线程生产

任务1. penndingCall的reprocess 里的组合任务 DelayedPendingCall.drainPendingCalls 和 PendingCallRemoval (grpc-default-executor-0 生产 main 执行) ???

任务2. DelayedPendingCall里的drainPendingCalls 的最后一步 DrainListenerRunnable 任务 (main 生产 main 执行) 貌似是main 执行上一个任务的最后产生的,就是让listener也清空一下call

任务3. DelayedClientTransport.reprocess 里 stream.createRealStream(transport); (grpc-nio-worker-ELG-1-2)

任务4. ClientCallImpl$ClientStreamListenerImpl 的 headersRead 的任务 (grpc-nio-worker-ELG-1-2)

任务5 ClientCallImpl$ClientStreamListenerImpl 的 MessagesAvailable 的任务 (grpc-nio-worker-ELG-1-2)

3,4,5 都是传输时候的,一个建stream连接,两个读消息, 读完了消息需要 drain 一下任务,返回给listener,

可以看到这些任务里并没有 clientCall 的四个任务, 这四个任务是在任务1里执行。

放入时机: Main 线程 ListenableFuture<RespT> responseFuture = futureUnaryCall(call, req); Stub 直接使用 DelayedClientCall 实例依次 执行了 四个方法,放入了四个任务 取出时机: Main 线程 Stub 在 waitAndDrain 的时候,执行了任务1, DelayedPendingCall.drainPendingCalls , DelayedPendingCall.drainPendingCalls 会把四个方法都执行.

blockingUnaryCall 小结

串起来起来,就是运行的时候,

创建Channel 套娃, Channel 套娃创建 ClientCall套娃 外层,PendingCall 在 reprocess 的时候, 使用realChannel 创建clientCallImpl , clientCallImpl 有四个方法,发起客户端的请求.

时序上大体:

  1. Main 在准备Listener的时候, 在PendingCall 的字段里准备四个方法的任务
  2. grpc-default 执行Pending 的reprocess, 完成 创建clientCallImpl的创建,并通过 ThreadlessExecutor 给 Main 线程传递 drainPendingCalls 的任务
  3. main 醒来执行 ThreadlessExecutor 的任务, 也就是 DelayedPendingCall 的 drainPendingCalls 方法,

1和2 是同时执行的,但是3 和 1 同一段代码里,顺序已经保证了, 3 会等2, 因此 ,3 最后执行就行。

断点日志如下:

trace 线程 main  添加任务  realCall.start()
trace 线程 grpc-default-executor-0  添加任务 drainingPendingCall
trace 线程 main  添加任务  realCall.send()
trace 线程 main  执行任务 drainingPendingCall

因此,PendingCall有四个方法要执行,主线程准备好任务,grpc 线程创建 创建clientCallImpl 之后,通知主线程执行 drainPendingCalls。 借助于 ThreadlessExecutor 完成了 IPC

// TODO 画一个时序图

总结

  1. 上一篇,梳理了客户端发送包括 Stub, ClientCall,ClientStream
  2. 见类图,ClientCall 和ClientChannel 创建的过程见类图,多层嵌套之后,最底层有一个realChannel,一切源于Channel的创建
  3. 见时序图,ClientCall 四个方法包装成任务,借助 ThreadlessExecutor 实现了线程的协调,即 grpc-default-executor线程准备任务, main线程 执行任务。

MORE:

整理清楚了,其实还挺简单的。

更深入, Stub 层: 为什么需要 grpc-default-executor 来准备任务,grpc-default 应该是有一些准备底层的工作,把 clientCallImpl 设置好之后,比如联通了,才让调用开始执行。

  • DelayCall 什么时候会passthrough -Stream 层:可以看一下 Stream这一层是做什么,