GPRC同步调用分析

文章目录

GRPC

对于写过rpc底层框架的人来说,都会去考虑怎么怎么通过方法名去执行service中对应函数的方式,对于java这样反射能力强的语言,可以很方便的通过一个字符串得到一个类的Method对象,然后再去调用invoke方法来执行;但是对于像c/cpp这样没有反射的语言,它怎么去通过方法名执行具体的方法呢?
最简单的想法是:

class Hello {
 public:
  void say_hello(std::string name) {
    std::cout << "hello:" << name << std::endl;
  }
  int ret_int(int x) {
    return x;
  }
  
  auto Invoke(std::string method, ...) {
    if (method == "say_hello") {
      va_list args;
      va_start(args, method);
      char* name = va_arg(args,char*);
      std::string n(name);
      va_end(args);
      say_hello(n);
    }
    if (method == "ret_int") {
      va_list args;
      va_start(args, method);
      int x = va_arg(args,int);
      va_end(args);
      return ret_int(x);
    }
  }
};

int main(int argc, char *argv[]) {
  Hello h;
  h.Invoke("say_hello","xu");
  auto v = h.Invoke("ret_int",1);
  printf("ret_int method value:%d\n", v);
  return 0;
}

像上面这样(可以把service的实例放到一个map中,然后根据客户端的servicename来找到实例,再通过invoke来调用方法),需要针对每一个service中的方法都生成一段适配代码(这里由于要支持不同的方法的返回值,需要用到auto类型函数返回值为auto类型在c++14中才支持,当然也可以为每个方式都生成一个相同的)。

grpc和protobuf自带的service也是类似的方式,它会为每个方法都生成一段适配的代码,让我们可以通过函数名来调用方法,后面会详细介绍。

grpc一个很好的是方是它支持元数据的传输,它不用在接口层面去定义,这样对于认证或跟踪,依赖数据交换等提供了一个很好的传输方式;

整体来看grpc分成三层:
Sub、Channel、Transport

Sub

Stub由Protoc生成,它给用户提供了所要调用的大部分接口;写proto文件定义直接相关;

Channel

Channel是在Tansport之上,主要给Sub层暴露接口;适合拦截器/装饰器。通过它可以实现如日志,监控,认证等。流程控制也在这个层上暴露。

Transport

Transport层承担在线上发送和获取字节的繁重工作。由于grpc是基于http2,对于c++,它自己实现了http2的协议;而对于java,它有三种实现:基于Netty,OkHttp ,inProcess。

代码跟踪

grpc它有一些环境变量可用于跟踪它的调用流程:

https://github.com/grpc/grpc/blob/master/doc/environment_variables.md

GRPC_TRACE:可以包含一些由逗号分隔的列表:api, channel等
GRPC_VERBOSITY:日志的级别,默认是error级别,要想跟踪流程,要把日志级别设置低一些;
比如:

export GRPC_TRACE=api,channel
export GRPC_VERBOSITY=INFO

生成的代码分析

从proto生成代码

syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "HLW";

package helloworld;

// The greeting service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}

由于它不像其它基于protobuf自带的service做grpc框架那样需要protobuf生成services的定义 ,所以它没有包含以下申明:
option cc_generic_services=true;

生成protobuf的基本编码解码代码

这个方式和正常的生成protobuf的代码相同:

protoc -I ./ --cpp_out=. ./helloworld.proto 

这样就生成了helloworld.pb.h和helloworld.pb.cc

生成grpc的service结构

protoc -I . --grpc_out=. --plugin=protoc-gen-grpc=`which
grpc_cpp_plugin` ./helloworld.proto 

这样生成了helloworld.grpc.pb.h和helloworld.grpc.pb.cc

生成的代码质量高

通过cpplint,flycheck,cppcheck等工具检测它生成的代码,几乎不能发现任何警告和提示;

客户端调用

Unary RPC

class GreeterClient {
 public:
  GreeterClient(std::shared_ptr<Channel> channel)
      : stub_(Greeter::NewStub(channel)) {}

  // Assembles the client's payload, sends it and presents the response back
  // from the server.
  std::string SayHello(const std::string& user) {
    // Data we are sending to the server.
    HelloRequest request;
    request.set_name(user);

    // Container for the data we expect from the server.
    HelloReply reply;

    // Context for the client. It could be used to convey extra information to
    // the server and/or tweak certain RPC behaviors.
    ClientContext context;

    // The actual RPC.
    Status status = stub_->SayHello(&context, request, &reply);

    // Act upon its status.
    if (status.ok()) {
      return reply.message();
    } else {
      std::cout << status.error_code() << ": " << status.error_message()
                << std::endl;
      return "RPC failed";
    }
  }

 private:
  std::unique_ptr<Greeter::Stub> stub_;
};
int main(int argc, char** argv) {
  GreeterClient greeter(grpc::CreateChannel(
      "localhost:50051", grpc::InsecureChannelCredentials()));
  std::string user("world");
  std::string reply = greeter.SayHello(user);
  std::cout << "Greeter received: " << reply << std::endl;

  return 0;
}

它通过创建了一个channel用于通信,内部的数据一直围绕着channel在处理,它表示一个与远程的连接。

Server streaming RPC

Client streaming RPC

Bidirectional streaming RPC

生成的代码分析

它生成了Stub,Service和SayHello方法的同步,异步调用类;
要想能调用服务器的一个rpc方法,我们要能知道类,方法,参数,返回值这些标识(如果对于像java这样有反射功能的语言,参数和返回值可以不用,但是由于grpc对应的是通用的语言,所以它需要)。

调用流程:

当通过前面说到的宏开启跟踪日志:

xu:helloworld sailsxu$ export GRPC_TRACE=api
xu:helloworld sailsxu$ export GRPC_VERBOSITY=DEBUG
xu:helloworld sailsxu$ ./greeter_client
I1227 15:26:50.401405000 140736243278784 init.c:163] grpc_register_plugin(init=0x10a25cef0, destroy=0x10a25cf30)
I1227 15:26:50.402051000 140736243278784 init.c:163] grpc_register_plugin(init=0x10a27fef0, destroy=0x10a280040)
I1227 15:26:50.402058000 140736243278784 init.c:163] grpc_register_plugin(init=0x10a285760, destroy=0x10a285790)
I1227 15:26:50.402063000 140736243278784 init.c:163] grpc_register_plugin(init=0x10a28b520, destroy=0x10a28b540)
I1227 15:26:50.402068000 140736243278784 init.c:163] grpc_register_plugin(init=0x10a28c380, destroy=0x10a28c3b0)
I1227 15:26:50.402073000 140736243278784 init.c:163] grpc_register_plugin(init=0x10a28d850, destroy=0x10a28d870)
I1227 15:26:50.402078000 140736243278784 init.c:163] grpc_register_plugin(init=0x10a28e080, destroy=0x10a28e0b0)
I1227 15:26:50.402083000 140736243278784 init.c:163] grpc_register_plugin(init=0x10a28e6d0, destroy=0x10a28e7b0)
I1227 15:26:50.402088000 140736243278784 init.c:163] grpc_register_plugin(init=0x10a28fbc0, destroy=0x10a28fd00)
D1227 15:26:50.402270000 140736243278784 ev_posix.c:105] Using polling engine: poll
I1227 15:26:50.402337000 140736243278784 init.c:218] grpc_init(void)
I1227 15:26:50.402376000 140736243278784 channel_create.c:97] grpc_insecure_channel_create(target=0x7fff55a3f201, args=0x7fff55a3f098, reserved=0x0)
I1227 15:26:50.402446000 140736243278784 init.c:218] grpc_init(void)
I1227 15:26:50.402470000 140736243278784 channel.c:272] grpc_channel_register_call(channel=0x7fe271c06500, method=/helloworld.Greeter/SayHello, host=(null), reserved=0x0)
I1227 15:26:50.402902000 140736243278784 init.c:223] grpc_shutdown(void)
I1227 15:26:50.402955000 140736243278784 init.c:218] grpc_init(void)
I1227 15:26:50.402967000 140736243278784 completion_queue.c:137] grpc_completion_queue_create(reserved=0x0)
I1227 15:26:50.402981000 140736243278784 channel.c:300] grpc_channel_create_registered_call(channel=0x7fe271c06500, parent_call=0x0, propagation_mask=ffff, completion_queue=0x7fe271d00170, registered_call_handle=0x7fe271c07b50, deadline=gpr_timespec { tv_sec: 9223372036854775807, tv_nsec: 0, clock_type: 1 }, reserved=0x0)
I1227 15:26:50.403020000 140736243278784 grpc_context.c:41] grpc_census_call_set_context(call=0x7fe272803c00, census_context=0x0)
I1227 15:26:50.403473000 140736243278784 call.c:1694] grpc_call_start_batch(call=0x7fe272803c00, ops=0x7fff55a3e9e0, nops=6, tag=0x7fff55a3ed28, reserved=0x0)
I1227 15:26:50.414042000 140736243278784 call.c:1355] ops[0]: SEND_INITIAL_METADATA
I1227 15:26:50.414063000 140736243278784 call.c:1355] ops[1]: SEND_MESSAGE ptr=0x7fe271d008e0
I1227 15:26:50.414073000 140736243278784 call.c:1355] ops[2]: RECV_INITIAL_METADATA ptr=0x7fff55a3ed80
I1227 15:26:50.414082000 140736243278784 call.c:1355] ops[3]: RECV_MESSAGE ptr=0x7fff55a3eda8
I1227 15:26:50.414089000 140736243278784 call.c:1355] ops[4]: SEND_CLOSE_FROM_CLIENT
I1227 15:26:50.414099000 140736243278784 call.c:1355] ops[5]: RECV_STATUS_ON_CLIENT metadata=0x7fff55a3edc8 status=0x7fff55a3ede0 details=0x7fff55a3ede8
I1227 15:26:50.414280000 140736243278784 completion_queue.c:556] grpc_completion_queue_pluck(cc=0x7fe271d00170, tag=0x7fff55a3ed28, deadline=gpr_timespec { tv_sec: 9223372036854775807, tv_nsec: 0, clock_type: 1 }, reserved=0x0)
I1227 15:26:50.416816000 140736243278784 completion_queue.c:251] grpc_cq_end_op(exec_ctx=0x7fff55a3eb60, cc=0x7fe271d00170, tag=0x7fff55a3ed28, error="No Error", done=0x10a250870, done_arg=0x7fe272803c90, storage=0x7fe272803c98)
I1227 15:26:50.416836000 140736243278784 completion_queue.c:661] RETURN_EVENT[0x7fe271d00170]: OP_COMPLETE: tag:0x7fff55a3ed28 OK
I1227 15:26:50.416851000 140736243278784 metadata_array.c:47] grpc_metadata_array_destroy(array=0x7fff55a3ed80)
I1227 15:26:50.416858000 140736243278784 metadata_array.c:42] grpc_metadata_array_init(array=0x7fff55a3ed80)
I1227 15:26:50.416890000 140736243278784 metadata_array.c:47] grpc_metadata_array_destroy(array=0x7fff55a3edc8)
I1227 15:26:50.416896000 140736243278784 metadata_array.c:42] grpc_metadata_array_init(array=0x7fff55a3edc8)
I1227 15:26:50.416908000 140736243278784 completion_queue.c:696] grpc_completion_queue_destroy(cc=0x7fe271d00170)
I1227 15:26:50.416913000 140736243278784 completion_queue.c:676] grpc_completion_queue_shutdown(cc=0x7fe271d00170)
I1227 15:26:50.417043000 140736243278784 init.c:223] grpc_shutdown(void)
I1227 15:26:50.417058000 140736243278784 call.c:717] grpc_call_destroy(c=0x7fe272803c00)
Greeter received: Hello world
I1227 15:26:50.417151000 140736243278784 channel.c:349] grpc_channel_destroy(channel=0x7fe271c06500)
I1227 15:26:50.417326000 140736243278784 init.c:223] grpc_shutdown(void)

更方便的是通过valgrind来分析调用

 valgrind --tool=callgrind ./greeter_client

sfsaf

Sub类:

  class Stub final : public StubInterface {
   public:
    Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel);
    ::grpc::Status SayHello(::grpc::ClientContext* context, const ::helloworld::HelloRequest& request, ::helloworld::HelloReply* response) override;
    std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::helloworld::HelloReply>> AsyncSayHello(::grpc::ClientContext* context, const ::helloworld::HelloRequest& request, ::grpc::CompletionQueue* cq) {
      return std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::helloworld::HelloReply>>(AsyncSayHelloRaw(context, request, cq));
    }

   private:
    std::shared_ptr< ::grpc::ChannelInterface> channel_;
    ::grpc::ClientAsyncResponseReader< ::helloworld::HelloReply>* AsyncSayHelloRaw(::grpc::ClientContext* context, const ::helloworld::HelloRequest& request, ::grpc::CompletionQueue* cq) override;
    const ::grpc::RpcMethod rpcmethod_SayHello_;
  };
  static std::unique_ptr<Stub> NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options = ::grpc::StubOptions());

实现:

static const char* Greeter_method_names[] = {
  "/helloworld.Greeter/SayHello",
};

std::unique_ptr< Greeter::Stub> Greeter::NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options) {
  std::unique_ptr< Greeter::Stub> stub(new Greeter::Stub(channel));
  return stub;
}

Greeter::Stub::Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel)
  : channel_(channel), rpcmethod_SayHello_(Greeter_method_names[0], ::grpc::RpcMethod::NORMAL_RPC, channel)
  {}

::grpc::Status Greeter::Stub::SayHello(::grpc::ClientContext* context, const ::helloworld::HelloRequest& request, ::helloworld::HelloReply* response) {
  return ::grpc::BlockingUnaryCall(channel_.get(), rpcmethod_SayHello_, context, request, response);
}

::grpc::ClientAsyncResponseReader< ::helloworld::HelloReply>* Greeter::Stub::AsyncSayHelloRaw(::grpc::ClientContext* context, const ::helloworld::HelloRequest& request, ::grpc::CompletionQueue* cq) {
  return new ::grpc::ClientAsyncResponseReader< ::helloworld::HelloReply>(channel_.get(), cq, rpcmethod_SayHello_, context, request);
}

它的同步方法调用了 grpc的BlockingUnaryCall方法;异步调用和protobuf原生的rpc不同,它们的实现后面再分析,protobuf原生的rpc异步是通过传入一个callback。
可以看出,生成的stub整上比较简单,基本就是为了方便用户调用GRPC的库。它为每个方法都生成了一个const ::grpc::RpcMethod对象:

class RpcMethod {
 public:
  enum RpcType {
    NORMAL_RPC = 0,
    CLIENT_STREAMING,  // request streaming
    SERVER_STREAMING,  // response streaming
    BIDI_STREAMING
  };

  RpcMethod(const char* name, RpcType type)
      : name_(name), method_type_(type), channel_tag_(NULL) {}

  RpcMethod(const char* name, RpcType type,
            const std::shared_ptr<ChannelInterface>& channel)
      : name_(name),
        method_type_(type),
        channel_tag_(channel->RegisterMethod(name)) {}

  const char* name() const { return name_; }
  RpcType method_type() const { return method_type_; }
  void SetMethodType(RpcType type) { method_type_ = type; }
  void* channel_tag() const { return channel_tag_; }

 private:
  const char* const name_;
  RpcType method_type_;
  void* const channel_tag_;
};

Service类

class Service : public ::grpc::Service { ... };
template <class BaseClass>
class WithAsyncMethod_SayHello : public BaseClass {...  };
typedef WithAsyncMethod_SayHello<Service > AsyncService;

template <class BaseClass>
class WithGenericMethod_SayHello : public BaseClass {... };

template <class BaseClass>
class WithStreamedUnaryMethod_SayHello : public BaseClass {...  };
typedef WithStreamedUnaryMethod_SayHello<Service > StreamedUnaryService;

typedef Service SplitStreamedService;
typedef WithStreamedUnaryMethod_SayHello<Service > StreamedService;

它对于一个service生成了三个不过的实现,一个是AsynService,一个是StreamedUnaryService/StreamedService(这里很不明白),它会为每一个method都分成对应的类.(这里不是很明白,后面分析框架时再来看看)

SayHello类

框架代码分析

代码结构

src/core

包含其它语言(C++, Ruby, Python, PHP, NodeJS, Objective-C)会用到的grpc核心组件(channel, compression, debug, http, json, profiling, security, transport,tsi).由于为了能方便其它语言使用,这层的代码都是c语言实现;

src/compiler

不同语言protoc的grpc的插件,表示用来处理从proto文件产生service的实现;

src/cpp目录

c++实现的grpc,后面会详细分析实现;
1. client
包含了客户端会用到的channel, client_context, stub, credentials
2. codegen
codegen的初始化,主要实现在include中,用于处理产生的代码,它把不同的调用形式最后处理成client/channel中使用的最底层api;
3. common
包含了channel_filter, completion_queue, auth_context
4. ext
包含server反射相关实现
5. server
包含service, 线程池, server_context, credentials, server
6. test

7. thread_manager
基于c++11的线程实现的线程管理类
8. util
一些工具类

include目录

它也分成两个部分1:grpc和grpc++,其中grpc包含了其它语言也会用到的c语言代码,与core对应;而grpc++则是c++版本的实现。
特别要注意的是codegen目录,它不是指用来产生代码的,而是用来处理产生的代码的。比如处理unary_call,async_unary_call的过程,不过他和其它代码没有那么明显的分界线.

客户端流程

同步调用

首先是通过proto文件生成的代码:

::grpc::Status Greeter::Stub::SayHello(::grpc::ClientContext* context, const ::helloworld::HelloRequest& request, ::helloworld::HelloReply* response) {
  return ::grpc::BlockingUnaryCall(channel_.get(), rpcmethod_SayHello_, context, request, response);
}

它调用到了grpc内部框架代码BlockingUnaryCall(在./include/grpc++/impl/codegen/client_unary_call.h中):

template <class InputMessage, class OutputMessage>
Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
                         ClientContext* context, const InputMessage& request,
                         OutputMessage* result) {
  CompletionQueue cq;
  Call call(channel->CreateCall(method, context, &cq));
  CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
            CallOpRecvInitialMetadata, CallOpRecvMessage<OutputMessage>,
            CallOpClientSendClose, CallOpClientRecvStatus>
      ops;
  // 把request通过protobuf规则编译成buffer
  Status status = ops.SendMessage(request);
  if (!status.ok()) {
    return status;
  }
  // 只是把context->send_initial_metadata_这样一个map结构体放入grpc_metadata结构体中
  ops.SendInitialMetadata(context->send_initial_metadata_,
                          context->initial_metadata_flags());
  // 初始化CallOpRecvInitialMetadata中的recv_initial_metadata_为空
  ops.RecvInitialMetadata(context);
  // 只要设置了以后接入到的message指向result,这样客户端才能直接使用
  ops.RecvMessage(result);
  // 设置send标识
  ops.ClientSendClose();
  // 设置把结果状态指向status中
  ops.ClientRecvStatus(context, &status);
  // 先调用每个模板中的AddOp进行操作合并,再依次执行
  call.PerformOps(&ops);
  GPR_CODEGEN_ASSERT((cq.Pluck(&ops) && ops.got_message) || !status.ok());
  return status;
}

这里分成几个部分:SendMessage,SendInitalMetadata,RecvInitalMetadata,RecvMessage和PerformOpts;它的意思是通过CallOpSet定义一个操作集,然后调用PerformOps时,再真正的去按这个操作集一步一步执行(这也验证了上面所说的codegen目录的描述,用于处理产生的不同形式的调用代码,产生形式统一代码调用底层),这种方式的好处是结构清晰,按照不同的功能把它放到不同的类中;

CallOpSetInterface, CallOpSet

./include/grpc++/impl/codegen/call.h

class CallOpSetInterface : public CompletionQueueTag {
 public:
  CallOpSetInterface() : max_receive_message_size_(0) {}
  /// Fills in grpc_op, starting from ops[*nops] and moving
  /// upwards.
  virtual void FillOps(grpc_op* ops, size_t* nops) = 0;

  void set_max_receive_message_size(int max_receive_message_size) {
    max_receive_message_size_ = max_receive_message_size;
  }

  /// Mark this as belonging to a collection if needed
  void SetCollection(std::shared_ptr<CallOpSetCollectionInterface> collection) {
    collection_ = collection;
  }

 protected:
  int max_receive_message_size_;
  std::shared_ptr<CallOpSetCollectionInterface> collection_;
};

template <class Op1 = CallNoOp<1>, class Op2 = CallNoOp<2>,
          class Op3 = CallNoOp<3>, class Op4 = CallNoOp<4>,
          class Op5 = CallNoOp<5>, class Op6 = CallNoOp<6>>
class CallOpSet : public CallOpSetInterface,
                  public Op1,
                  public Op2,
                  public Op3,
                  public Op4,
                  public Op5,
                  public Op6 {
 public:
  CallOpSet() : return_tag_(this) {}
  void FillOps(grpc_op* ops, size_t* nops) override {
    this->Op1::AddOp(ops, nops);
    this->Op2::AddOp(ops, nops);
    this->Op3::AddOp(ops, nops);
    this->Op4::AddOp(ops, nops);
    this->Op5::AddOp(ops, nops);
    this->Op6::AddOp(ops, nops);
  }

  bool FinalizeResult(void** tag, bool* status) override {
    this->Op1::FinishOp(status, max_receive_message_size_);
    this->Op2::FinishOp(status, max_receive_message_size_);
    this->Op3::FinishOp(status, max_receive_message_size_);
    this->Op4::FinishOp(status, max_receive_message_size_);
    this->Op5::FinishOp(status, max_receive_message_size_);
    this->Op6::FinishOp(status, max_receive_message_size_);
    *tag = return_tag_;
    collection_.reset();  // drop the ref at this point
    return true;
  }

  void set_output_tag(void* return_tag) { return_tag_ = return_tag; }

 private:
  void* return_tag_;
};

它通过模板让CallOpSet类继承Op1,Op2,Op3,Op4,Op5,Op6;正如其名,实现了操作方法的整合;下面来看一下CallOpSet的第二个模板参数CallOpSendMessage:
./include/grpc++/impl/codegen/call.h(CallOpSet其它几个模板参数类型也在这个文件中定义)

class CallOpSendMessage {
 public:
  CallOpSendMessage() : send_buf_(nullptr), own_buf_(false) {}

  /// Send \a message using \a options for the write. The \a options are cleared
  /// after use.
  template <class M>
  Status SendMessage(const M& message,
                     const WriteOptions& options) GRPC_MUST_USE_RESULT;

  template <class M>
  Status SendMessage(const M& message) GRPC_MUST_USE_RESULT;

 protected:
  void AddOp(grpc_op* ops, size_t* nops) {
    if (send_buf_ == nullptr) return;
    grpc_op* op = &ops[(*nops)++];
    op->op = GRPC_OP_SEND_MESSAGE;
    op->flags = write_options_.flags();
    op->reserved = NULL;
    op->data.send_message = send_buf_;
    // Flags are per-message: clear them after use.
    write_options_.Clear();
  }
  void FinishOp(bool* status, int max_receive_message_size) {
    if (own_buf_) g_core_codegen_interface->grpc_byte_buffer_destroy(send_buf_);
    send_buf_ = nullptr;
  }

 private:
  grpc_byte_buffer* send_buf_;
  WriteOptions write_options_;
  bool own_buf_;
};

template <class M>
Status CallOpSendMessage::SendMessage(const M& message,
                                      const WriteOptions& options) {
  write_options_ = options;
  return SerializationTraits<M>::Serialize(message, &send_buf_, &own_buf_);
}

template <class M>
Status CallOpSendMessage::SendMessage(const M& message) {
  return SendMessage(message, WriteOptions());
}

可以看出,它的SendMessage只是把消息准备好(把Request消息通过protobuf序列化成通用的字节,return SerializationTraits::Serialize(message, &send_buf_,&own_buf_)),而AddOp就是把它的方式统一成grpc_op的动作(在Channel::PerformOpsOnCall中调用FillOps合并成一个grpc_op的数组),但是它并不执行,而最后再统一执行grpc_op中的所有操作:

template <class T>
class SerializationTraits<T, typename std::enable_if<std::is_base_of<
                                 grpc::protobuf::Message, T>::value>::type> {
 public:
  static Status Serialize(const grpc::protobuf::Message& msg,
                          grpc_byte_buffer** bp, bool* own_buffer) {
    *own_buffer = true;
    int byte_size = msg.ByteSize();
    if (byte_size <= internal::kGrpcBufferWriterMaxBufferLength) {
      grpc_slice slice = g_core_codegen_interface->grpc_slice_malloc(byte_size);
      GPR_CODEGEN_ASSERT(
          GRPC_SLICE_END_PTR(slice) ==
          msg.SerializeWithCachedSizesToArray(GRPC_SLICE_START_PTR(slice)));
      *bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(&slice, 1);
      g_core_codegen_interface->grpc_slice_unref(slice);
      return g_core_codegen_interface->ok();
    } else {
      internal::GrpcBufferWriter writer(
          bp, internal::kGrpcBufferWriterMaxBufferLength);
      return msg.SerializeToZeroCopyStream(&writer)
                 ? g_core_codegen_interface->ok()
                 : Status(StatusCode::INTERNAL, "Failed to serialize message");
    }
  }

  static Status Deserialize(grpc_byte_buffer* buffer,
                            grpc::protobuf::Message* msg,
                            int max_receive_message_size) {
    if (buffer == nullptr) {
      return Status(StatusCode::INTERNAL, "No payload");
    }
    Status result = g_core_codegen_interface->ok();
    {
      internal::GrpcBufferReader reader(buffer);
      if (!reader.status().ok()) {
        return reader.status();
      }
      ::grpc::protobuf::io::CodedInputStream decoder(&reader);
      if (max_receive_message_size > 0) {
        decoder.SetTotalBytesLimit(max_receive_message_size,
                                   max_receive_message_size);
      }
      if (!msg->ParseFromCodedStream(&decoder)) {
        result = Status(StatusCode::INTERNAL, msg->InitializationErrorString());
      }
      if (!decoder.ConsumedEntireMessage()) {
        result = Status(StatusCode::INTERNAL, "Did not read entire message");
      }
    }
    g_core_codegen_interface->grpc_byte_buffer_destroy(buffer);
    return result;
  }
};
Call, CallHook
class Call final {
 public:
  /* call is owned by the caller */
  Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq)
      : call_hook_(call_hook),
        cq_(cq),
        call_(call),
        max_receive_message_size_(-1) {}

  Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq,
       int max_receive_message_size)
      : call_hook_(call_hook),
        cq_(cq),
        call_(call),
        max_receive_message_size_(max_receive_message_size) {}

  void PerformOps(CallOpSetInterface* ops) {
    if (max_receive_message_size_ > 0) {
      ops->set_max_receive_message_size(max_receive_message_size_);
    }
    call_hook_->PerformOpsOnCall(ops, this);
  }

  grpc_call* call() const { return call_; }
  CompletionQueue* cq() const { return cq_; }

  int max_receive_message_size() { return max_receive_message_size_; }

 private:
  CallHook* call_hook_;
  CompletionQueue* cq_;
  grpc_call* call_;
  int max_receive_message_size_;
};

/// Channel and Server implement this to allow them to hook performing ops
class CallHook {
 public:
  virtual ~CallHook() {}
  virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0;
};
xu:grpc sailsxu$ find . -name "*.cc" | xargs grep "PerformOpsOnCall"
./src/cpp/client/channel_cc.cc:void Channel::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
./src/cpp/server/server_cc.cc:void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
xu:grpc sailsxu$

./src/cpp/client/channel_cc.cc

void Channel::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
  static const size_t MAX_OPS = 8;
  size_t nops = 0;
  grpc_op cops[MAX_OPS];
  ops->FillOps(cops, &nops);
  GPR_ASSERT(GRPC_CALL_OK ==
             grpc_call_start_batch(call->call(), cops, nops, ops, nullptr));
}

跟踪到这里,基本上流程已经出来了,它就是通过grpc_op数组把所有要执行的操作收集起来,然后调用 grpc_call_start_batch 来执行(其实也就是要发送什么数据和要接收什么数据和它们的回调处理函数是什么):
src/core/lib/surface/call.c

grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
                                      size_t nops, void *tag, void *reserved) {
  grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
  grpc_call_error err;

  GRPC_API_TRACE(
      "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, "
      "reserved=%p)",
      5, (call, ops, (unsigned long)nops, tag, reserved));

  if (reserved != NULL) {
    err = GRPC_CALL_ERROR;
  } else {
    err = call_start_batch(&exec_ctx, call, ops, nops, tag, 0);
  }

  grpc_exec_ctx_finish(&exec_ctx);
  return err;
}

call_start_batch 中直接的执行,由于Channel层不直接发送数据,所以它会把要发送的数据合并到transport op对象中,再并给下一层:

 /* rewrite batch ops into a transport op */
 for (i = 0; i < nops; i++) {
    op = &ops[i];
    if (op->reserved != NULL) {
      error = GRPC_CALL_ERROR;
      goto done_with_error;
    }
    switch (op->op) {
      ......
      case GRPC_OP_SEND_MESSAGE:
        if (!are_write_flags_valid(op->flags)) {
          error = GRPC_CALL_ERROR_INVALID_FLAGS;
          goto done_with_error;
        }
        if (op->data.send_message == NULL) {
          error = GRPC_CALL_ERROR_INVALID_MESSAGE;
          goto done_with_error;
        }
        if (call->sending_message) {
          error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
          goto done_with_error;
        }
        bctl->send_message = 1;
        call->sending_message = 1;
        grpc_slice_buffer_stream_init(
            &call->sending_stream,
            &op->data.send_message->data.raw.slice_buffer, op->flags);
        /* If the outgoing buffer is already compressed, mark it as so in the
           flags. These will be picked up by the compression filter and further
           (wasteful) attempts at compression skipped. */
        if (op->data.send_message->data.raw.compression > GRPC_COMPRESS_NONE) {
          call->sending_stream.base.flags |= GRPC_WRITE_INTERNAL_COMPRESS;
        }
        stream_op->send_message = &call->sending_stream.base;
        break;
      case GRPC_OP_RECV_MESSAGE:
        /* Flag validation: currently allow no flags */
        if (op->flags != 0) {
          error = GRPC_CALL_ERROR_INVALID_FLAGS;
          goto done_with_error;
        }
        if (call->receiving_message) {
          error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
          goto done_with_error;
        }
        call->receiving_message = 1;
        bctl->recv_message = 1;
        call->receiving_buffer = op->data.recv_message;
        stream_op->recv_message = &call->receiving_stream;
        grpc_closure_init(&call->receiving_stream_ready, receiving_stream_ready,
                          bctl);
        stream_op->recv_message_ready = &call->receiving_stream_ready;
        num_completion_callbacks_needed++;
        break;
      ....
  }
  .....
  execute_op(exec_ctx, call, stream_op);

它把所有的操作都封装到grpc_transport_stream_op和batch_control对象中,方便transport执行。

typedef struct grpc_transport_stream_op {
  grpc_closure *on_complete;
  bool covered_by_poller;
  grpc_metadata_batch *send_initial_metadata;
  uint32_t send_initial_metadata_flags;
  grpc_metadata_batch *send_trailing_metadata;
  grpc_byte_stream *send_message;
  grpc_metadata_batch *recv_initial_metadata;
  bool *recv_idempotent_request;
  bool *recv_cacheable_request;
  grpc_closure *recv_initial_metadata_ready;
  grpc_byte_stream **recv_message;
  grpc_closure *recv_message_ready;
  grpc_metadata_batch *recv_trailing_metadata;
  grpc_transport_stream_stats *collect_stats;
  grpc_error *cancel_error;
  grpc_error *close_error;
  grpc_call_context_element *context;
  grpc_transport_private_op_data transport_private;
} grpc_transport_stream_op;
typedef struct batch_control {
  grpc_call *call;
  grpc_cq_completion cq_completion;
  grpc_closure finish_batch;
  void *notify_tag;
  gpr_refcount steps_to_complete;
  grpc_error *error;

  uint8_t send_initial_metadata;
  uint8_t send_message;
  uint8_t send_final_op;
  uint8_t recv_initial_metadata;
  uint8_t recv_message;
  uint8_t recv_final_op;
  uint8_t is_notify_tag_closure;

  /* TODO(ctiller): now that this is inlined, figure out how much of the above
                    state can be eliminated */
  grpc_transport_stream_op op;
} batch_control;

这样两个对象,可以描述一次调用的详细信息(发送的数据,metadata, 接收消息使用的stream,回调)。

虽然看起来从一个request到调用transport层去处理数据比较复杂(比如我自己写,可能就没有中间那些optset了),但是它这样让每层之间的结构更加简单,通过一个grpc_transport_stream_op对象,封装了数据和操作,让transport不用去了解channel层的信息;

服务器

class GreeterServiceImpl final : public Greeter::Service {
  Status SayHello(ServerContext* context, const HelloRequest* request,
                  HelloReply* reply) override {
    std::string prefix("Hello ");
    reply->set_message(prefix + request->name());
    return Status::OK;
  }
};

void RunServer() {
  std::string server_address("0.0.0.0:50051");
  GreeterServiceImpl service;

  // 创建server,绑定本地端口
  ServerBuilder builder;
  builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
  // 注册service
  builder.RegisterService(&service);
  // 开始运行服务器
  std::unique_ptr<Server> server(builder.BuildAndStart());
  std::cout << "Server listening on " << server_address << std::endl;

  // Wait for the server to shutdown. Note that some other thread must be
  // responsible for shutting down the server for this call to ever return.
  server->Wait();
}

int main(int argc, char** argv) {
  grpc_api_trace = 1;
  RunServer();

  return 0;
}

服务器service给应用层的提供出来接口很简单,只要注册一下就ok,像其它基于google protobuf的rpc也基本上是这个样子。

原文链接:,转发请注明来源!

发表评论