GRPC Channel

文章目录

Channel

Channel层中有关的几个类:
grpc_channel, grpc_channel_stack, grpc_channel_element, grpc_call_stack, grpc_call_element, grpc_channel_filter:
其中grpc_channel_stack与grpc_channel_element用于保存一个channel中会用到的所有过滤器;
grpc_call_stack, grpc_call_element用于保存一次调用中会用到的所有过滤器;
grpc_channel_filter 过滤器,它表示在调用过程中需要执行的动作;

结构

文件结构

include目录中的:

./grpc++/channel.h // 定义了channel类接口,没有实现
./grpc++/create_channel.h  // 定义了CreateChannel方法的接口
./grpc++/create_channel_posix.h  // 从一个socket fd创建一个channel
./grpc++/impl/codegen/channel_interface.h  // Channel类的父接口,主要定义了channel状态变更时的处理接口(channel与一个endpoint的连接状态)
./grpc++/support/channel_arguments.h  // 创建channel时的一些选项封装

core目录(包含了channel的内部实现):

./core/ext/client_channel  // 这个目录中主要包含了client channel的一些实现
./core/ext/client_channel/channel_connectivity.c  // 通过调用client channel来监控channel的连通性来
./core/ext/client_channel/client_channel.c
./core/ext/client_channel/client_channel.h  // 可以设置client channel状态改变时的回调
./core/ext/client_channel/client_channel_factory.c
./core/ext/client_channel/client_channel_factory.h  // 创建client_channel的factory
./core/ext/client_channel/client_channel_plugin.c
./core/ext/client_channel/subchannel.c
./core/ext/client_channel/subchannel.h  // subchannel用于客户端负载均衡,比如一个url对应多个ip,它会有多个subchannel连接所有的ip,然后根据策略把请求发到对应的ip上(现在的有两个策略:pick_first(得到一个可用的就一直用,直到它不可用了,再换下一个连接),round_robin(轮询所有可用的连接),不过现在这里还没有稳定下来)。
./core/ext/client_channel/subchannel_index.c
./core/ext/client_channel/subchannel_index.h
./core/ext/transport/chttp2/client/insecure/channel_create.c
./core/ext/transport/chttp2/client/insecure/channel_create_posix.c
./core/ext/transport/chttp2/client/secure/secure_channel_create.c
./core/ext/transport/cronet/client/secure/cronet_channel_create.c
// grpc_channel的内部实现
./core/lib/channel  // 包含了channel中的stack的实现
./core/lib/channel/channel_args.c
./core/lib/channel/channel_args.h
./core/lib/channel/channel_stack.c
./core/lib/channel/channel_stack.h
./core/lib/channel/channel_stack_builder.c
./core/lib/channel/channel_stack_builder.h
./core/lib/channel/connected_channel.c
./core/lib/channel/connected_channel.h
// grpc_channel的实现(与客户端的Channel不同)
./core/lib/surface/channel.c
./core/lib/surface/channel.h
./core/lib/surface/channel_init.c
./core/lib/surface/channel_init.h
./core/lib/surface/channel_ping.c
./core/lib/surface/channel_stack_type.c
./core/lib/surface/channel_stack_type.h

c++ 目录:

./cpp/client/channel_cc.cc  // channel对client端的实现(对应./grpc++/channel.h)
./cpp/client/create_channel.cc // 对应./grpc++/create_channel.h的实现
./cpp/client/create_channel_internal.cc
./cpp/client/create_channel_internal.h
./cpp/client/create_channel_posix.cc  // 对应create_channel_posix.h
./cpp/common/channel_arguments.cc  // 对应./grpc++/support/channel_arguments.h
./cpp/common/channel_filter.cc
./cpp/common/channel_filter.h  // channel_filter的一
./cpp/common/secure_channel_arguments.cc

Channel Stack

channel stack表示一个channel中的全局设置:
src/core/lib/channel/channel_stack.h

/* A channel stack tracks a set of related filters for one channel, and
   guarantees they live within a single malloc() allocation */
struct grpc_channel_stack {
  grpc_stream_refcount refcount;
  size_t count;
  /* Memory required for a call stack (computed at channel stack
     initialization) */
  size_t call_stack_size;
};

与channel_stack关联的有Channel Element,它用来保存一个过滤器;

/* A channel_element tracks its filter and the filter requested memory within
   a channel allocation */
struct grpc_channel_element {
  const grpc_channel_filter *filter;
  void *channel_data;
};

而Channel Element Args则保存了过滤器中的一些参数:

typedef struct {
  grpc_channel_stack *channel_stack;
  const grpc_channel_args *channel_args;
  /** Transport, iff it is known */
  grpc_transport *optional_transport;
  int is_first;
  int is_last;
} grpc_channel_element_args;

Call Stack

call stack表示一次调用的过滤器集合:

/* A call stack tracks a set of related filters for one call, and guarantees
   they live within a single malloc() allocation */
struct grpc_call_stack {
  /* shared refcount for this channel stack.
     MUST be the first element: the underlying code calls destroy
     with the address of the refcount, but higher layers prefer to think
     about the address of the call stack itself. */
  grpc_stream_refcount refcount;
  size_t count;
};

每次调用call stack中过滤器时的设置:

/* A call_element tracks its filter, the filter requested memory within
   a channel allocation, and the filter requested memory within a call
   allocation */
struct grpc_call_element {
  const grpc_channel_filter *filter;
  void *channel_data;
  void *call_data;
};

每次调用过滤器时的参数:

typedef struct {
  grpc_call_stack *call_stack;
  const void *server_transport_data;
  grpc_call_context_element *context;
  grpc_mdstr *path;
  gpr_timespec start_time;
  gpr_timespec deadline;
} grpc_call_element_args;

Channel Filter

上而的grpc_channel_element和grpc_call_element都包含了grpc_channel_filter:

/* Channel filters specify:
   1. the amount of memory needed in the channel & call (via the sizeof_XXX
      members)
   2. functions to initialize and destroy channel & call data
      (init_XXX, destroy_XXX)
   3. functions to implement call operations and channel operations (call_op,
      channel_op)
   4. a name, which is useful when debugging

   Members are laid out in approximate frequency of use order. */
typedef struct {
  /* Called to eg. send/receive data on a call.
     See grpc_call_next_op on how to call the next element in the stack */
  void (*start_transport_stream_op)(grpc_exec_ctx *exec_ctx,
                                    grpc_call_element *elem,
                                    grpc_transport_stream_op *op);
  /* Called to handle channel level operations - e.g. new calls, or transport
     closure.
     See grpc_channel_next_op on how to call the next element in the stack */
  void (*start_transport_op)(grpc_exec_ctx *exec_ctx,
                             grpc_channel_element *elem, grpc_transport_op *op);
  ......
  
  /* Initialize per-channel data.
     elem is initialized at the creating of the channel, and elem->channel_data
     is what needs initializing.
     is_first, is_last designate this elements position in the stack, and are
     useful for asserting correct configuration by upper layer code.
     The filter does not need to do any chaining */
  grpc_error *(*init_channel_elem)(grpc_exec_ctx *exec_ctx,
                                   grpc_channel_element *elem,
                                   grpc_channel_element_args *args);
  /* Destroy per channel data.
     The filter does not need to do any chaining */
  void (*destroy_channel_elem)(grpc_exec_ctx *exec_ctx,
                               grpc_channel_element *elem);

  /* Implement grpc_call_get_peer() */
  char *(*get_peer)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem);

  /* Implement grpc_channel_get_info() */
  void (*get_channel_info)(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
                           const grpc_channel_info *channel_info);

  /* The name of this filter */
  const char *name;
} grpc_channel_filter;

Channel的创建过程

它包含了channel, channel_stack,channel_element和过滤器等的初始化:

grpc_channel在src/core/lib/surface/channel.c中定义:

struct grpc_channel {
  int is_client;
  grpc_compression_options compression_options;
  grpc_mdelem *default_authority;

  gpr_mu registered_call_mu;
  registered_call *registered_calls;

  char *target;
};

从客户端入手:

int main(int argc, char** argv) {
  // Instantiate the client. It requires a channel, out of which the actual RPCs
  // are created. This channel models a connection to an endpoint (in this case,
  // localhost at port 50051). We indicate that the channel isn't authenticated
  // (use of InsecureChannelCredentials()).
  GreeterClient greeter(grpc::CreateChannel(
      "localhost:50051", grpc::InsecureChannelCredentials()));
  std::string user("world");
  std::string reply = greeter.SayHello(user);
  std::cout << "Greeter received: " << reply << std::endl;

  return 0;
}

它调用了grpc::CreateChannel:
其中

// 这里的ChannelArguments()说明它没有设置其它参数,只是设置了
// SetString(GRPC_ARG_PRIMARY_USER_AGENT_STRING, "grpc-c++/" + Version());
std::shared_ptr<Channel> CreateChannel(
    const grpc::string& target,
    const std::shared_ptr<ChannelCredentials>& creds) {
  return CreateCustomChannel(target, creds, ChannelArguments());
}

std::shared_ptr<Channel> CreateCustomChannel(
    const grpc::string& target,
    const std::shared_ptr<ChannelCredentials>& creds,
    const ChannelArguments& args) {
  internal::GrpcLibrary
      init_lib;  // We need to call init in case of a bad creds.
  return creds
             ? creds->CreateChannel(target, args)
             : CreateChannelInternal("", grpc_lame_client_channel_create(
                                             NULL, GRPC_STATUS_INVALID_ARGUMENT,
                                             "Invalid credentials."));
}
  std::shared_ptr<grpc::Channel> CreateChannel(
      const string& target, const grpc::ChannelArguments& args) override {
    grpc_channel_args channel_args;
    // SetChannelArgs是把args中的内容放到channel_args中
    args.SetChannelArgs(&channel_args);
    return CreateChannelInternal(
        "",
        grpc_insecure_channel_create(target.c_str(), &channel_args, nullptr));
  }
// 最后调用到了,而grpc_channel是通过上的面grpc_insecure_channel_create创建
std::shared_ptr<Channel> CreateChannelInternal(const grpc::string& host,
                                               grpc_channel* c_channel) {
  return std::shared_ptr<Channel>(new Channel(host, c_channel));
}

src/core/ext/transport/chttp2/client/insecure/channel_create.c:

grpc_channel *grpc_insecure_channel_create(const char *target,
                                           const grpc_channel_args *args,
                                           void *reserved) {
  grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
  GRPC_API_TRACE(
      "grpc_insecure_channel_create(target=%p, args=%p, reserved=%p)", 3,
      (target, args, reserved));
  GPR_ASSERT(reserved == NULL);
  grpc_client_channel_factory *factory =
      (grpc_client_channel_factory *)&client_channel_factory;
  // Add channel arg containing the client channel factory.
  grpc_arg arg = grpc_client_channel_factory_create_channel_arg(factory);
  grpc_channel_args *new_args = grpc_channel_args_copy_and_add(args, &arg, 1);
  // Create channel.
  grpc_channel *channel = client_channel_factory_create_channel(
      &exec_ctx, factory, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, new_args);
  // Clean up.
  grpc_channel_args_destroy(new_args);
  grpc_client_channel_factory_unref(&exec_ctx, factory);
  grpc_exec_ctx_finish(&exec_ctx);
  return channel != NULL ? channel : grpc_lame_client_channel_create(
                                         target, GRPC_STATUS_INTERNAL,
                                         "Failed to create client channel");
}

static grpc_channel *client_channel_factory_create_channel(
    grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
    const char *target, grpc_client_channel_type type,
    const grpc_channel_args *args) {
  // Add channel arg containing the server URI.
  grpc_arg arg;
  arg.type = GRPC_ARG_STRING;
  arg.key = GRPC_ARG_SERVER_URI;
  arg.value.string = (char *)target;
  grpc_channel_args *new_args = grpc_channel_args_copy_and_add(args, &arg, 1);
  grpc_channel *channel = grpc_channel_create(exec_ctx, target, new_args,
                                              GRPC_CLIENT_CHANNEL, NULL);
  grpc_channel_args_destroy(new_args);
  return channel;
}

src/core/lib/surface/channel.c:

grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target,
                                  const grpc_channel_args *input_args,
                                  grpc_channel_stack_type channel_stack_type,
                                  grpc_transport *optional_transport) {
  grpc_channel_stack_builder *builder = grpc_channel_stack_builder_create();
  grpc_channel_stack_builder_set_channel_arguments(builder, input_args);
  grpc_channel_stack_builder_set_target(builder, target);
  grpc_channel_stack_builder_set_transport(builder, optional_transport);
  // 这里会根据channel_stack_type把内置的filter加入builder中
  if (!grpc_channel_init_create_stack(exec_ctx, builder, channel_stack_type)) {
    grpc_channel_stack_builder_destroy(builder);
    return NULL;
  }
  grpc_channel_args *args = grpc_channel_args_copy(
      grpc_channel_stack_builder_get_channel_arguments(builder));
  grpc_channel *channel;
  
  // grpc_channel_stack_builder_finish用builder中的filter初始化grpc_channel
  grpc_error *error = grpc_channel_stack_builder_finish(
      exec_ctx, builder, sizeof(grpc_channel), 1, destroy_channel, NULL,
      (void **)&channel);
  if (error != GRPC_ERROR_NONE) {
    const char *msg = grpc_error_string(error);
    gpr_log(GPR_ERROR, "channel stack builder failed: %s", msg);
    grpc_error_free_string(msg);
    GRPC_ERROR_UNREF(error);
    goto done;
  }

  memset(channel, 0, sizeof(*channel));
  channel->target = gpr_strdup(target);
  channel->is_client = grpc_channel_stack_type_is_client(channel_stack_type);
  gpr_mu_init(&channel->registered_call_mu);
  channel->registered_calls = NULL;

  grpc_compression_options_init(&channel->compression_options);

  for (size_t i = 0; i < args->num_args; i++) {
    if (0 == strcmp(args->args[i].key, GRPC_ARG_DEFAULT_AUTHORITY)) {
      if (args->args[i].type != GRPC_ARG_STRING) {
        gpr_log(GPR_ERROR, "%s ignored: it must be a string",
                GRPC_ARG_DEFAULT_AUTHORITY);
      } else {
        if (channel->default_authority) {
          /* setting this takes precedence over anything else */
          GRPC_MDELEM_UNREF(channel->default_authority);
        }
        channel->default_authority =
            grpc_mdelem_from_strings(":authority", args->args[i].value.string);
      }
    } else if (0 ==
               strcmp(args->args[i].key, GRPC_SSL_TARGET_NAME_OVERRIDE_ARG)) {
      if (args->args[i].type != GRPC_ARG_STRING) {
        gpr_log(GPR_ERROR, "%s ignored: it must be a string",
                GRPC_SSL_TARGET_NAME_OVERRIDE_ARG);
      } else {
        if (channel->default_authority) {
          /* other ways of setting this (notably ssl) take precedence */
          gpr_log(GPR_ERROR,
                  "%s ignored: default host already set some other way",
                  GRPC_SSL_TARGET_NAME_OVERRIDE_ARG);
        } else {
          channel->default_authority = grpc_mdelem_from_strings(
              ":authority", args->args[i].value.string);
        }
      }
    } else if (0 == strcmp(args->args[i].key,
                           GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL)) {
      channel->compression_options.default_level.is_set = true;
      GPR_ASSERT(args->args[i].value.integer >= 0 &&
                 args->args[i].value.integer < GRPC_COMPRESS_LEVEL_COUNT);
      channel->compression_options.default_level.level =
          (grpc_compression_level)args->args[i].value.integer;
    } else if (0 == strcmp(args->args[i].key,
                           GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM)) {
      channel->compression_options.default_algorithm.is_set = true;
      GPR_ASSERT(args->args[i].value.integer >= 0 &&
                 args->args[i].value.integer < GRPC_COMPRESS_ALGORITHMS_COUNT);
      channel->compression_options.default_algorithm.algorithm =
          (grpc_compression_algorithm)args->args[i].value.integer;
    } else if (0 ==
               strcmp(args->args[i].key,
                      GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET)) {
      channel->compression_options.enabled_algorithms_bitset =
          (uint32_t)args->args[i].value.integer |
          0x1; /* always support no compression */
    }
  }

done:
  grpc_channel_args_destroy(args);
  return channel;
}

grpc_channel_filter

内置的plugins:

src/core/lib/surface/init.c:

void grpc_init(void) {
  int i;
  gpr_once_init(&g_basic_init, do_basic_init);
  .....
}
static void do_basic_init(void) {
  gpr_log_verbosity_init();
  gpr_mu_init(&g_init_mu);
  grpc_register_built_in_plugins();
  g_initializations = 0;
}

src/core/ext/client_channel/client_channel_plugin.c:

void grpc_client_channel_init(void) {
  grpc_lb_policy_registry_init();
  grpc_resolver_registry_init();
  grpc_subchannel_index_init();
  grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MIN,
                                   set_default_host_if_unset, NULL);
  grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MAX, append_filter,
                                   (void *)&grpc_client_channel_filter);
}

调用了grpc_register_built_in_plugins方法:
src/core/plugin_registry/grpc_plugin_registry.c:

void grpc_register_built_in_plugins(void) {
  grpc_register_plugin(grpc_chttp2_plugin_init,
                       grpc_chttp2_plugin_shutdown);
  grpc_register_plugin(grpc_client_channel_init,
                       grpc_client_channel_shutdown);
  grpc_register_plugin(grpc_lb_policy_grpclb_init,
                       grpc_lb_policy_grpclb_shutdown);
  grpc_register_plugin(grpc_lb_policy_pick_first_init,
                       grpc_lb_policy_pick_first_shutdown);
  grpc_register_plugin(grpc_lb_policy_round_robin_init,
                       grpc_lb_policy_round_robin_shutdown);
  grpc_register_plugin(grpc_resolver_dns_native_init,
                       grpc_resolver_dns_native_shutdown);
  grpc_register_plugin(grpc_resolver_sockaddr_init,
                       grpc_resolver_sockaddr_shutdown);
  grpc_register_plugin(grpc_load_reporting_plugin_init,
                       grpc_load_reporting_plugin_shutdown);
  grpc_register_plugin(census_grpc_plugin_init,
                       census_grpc_plugin_shutdown);
}

注意其中的grpc_client_channel_init,它也给GRPC_CLIENT_CHANNEL类型的channel增加了filter:
src/core/ext/client_channel/client_channel_plugin.c:

void grpc_client_channel_init(void) {
  grpc_lb_policy_registry_init();
  grpc_resolver_registry_init();
  grpc_subchannel_index_init();
  grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MIN,
                                   set_default_host_if_unset, NULL);
  grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MAX, append_filter,
                                   (void *)&grpc_client_channel_filter);
}
const grpc_channel_filter grpc_client_channel_filter = {
    cc_start_transport_stream_op,
    cc_start_transport_op,
    sizeof(call_data),
    cc_init_call_elem,
    cc_set_pollset_or_pollset_set,
    cc_destroy_call_elem,
    sizeof(channel_data),
    cc_init_channel_elem,
    cc_destroy_channel_elem,
    cc_get_peer,
    cc_get_channel_info,
    "client-channel",
};

在grpc中有几类内置的channel设置,比如前面客户端用到的client_channel_factory_create_channel中是GRPC_CLIENT_CHANNEL类型,所以它只有一个grpc_compress_filter,对数据进行压缩(它的压缩是在protobuf编码之后的进一步):
src/core/lib/surface/init.c:

void grpc_channel_init_register_stage(grpc_channel_stack_type type,
                                      int priority,
                                      grpc_channel_init_stage stage,
                                      void *stage_arg) {
  GPR_ASSERT(!g_finalized);
  if (g_slots[type].cap_slots == g_slots[type].num_slots) {
    g_slots[type].cap_slots = GPR_MAX(8, 3 * g_slots[type].cap_slots / 2);
    g_slots[type].slots =
        gpr_realloc(g_slots[type].slots,
                    g_slots[type].cap_slots * sizeof(*g_slots[type].slots));
  }
  stage_slot *s = &g_slots[type].slots[g_slots[type].num_slots++];
  s->insertion_order = g_slots[type].num_slots;
  s->priority = priority;
  s->fn = stage;
  s->arg = stage_arg;
}

static void register_builtin_channel_init() {
  grpc_channel_init_register_stage(
      GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
      prepend_filter, (void *)&grpc_client_deadline_filter);
  grpc_channel_init_register_stage(
      GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter,
      (void *)&grpc_server_deadline_filter);
  grpc_channel_init_register_stage(
      GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
      prepend_filter, (void *)&grpc_message_size_filter);
  grpc_channel_init_register_stage(
      GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
      prepend_filter, (void *)&grpc_message_size_filter);
  grpc_channel_init_register_stage(
      GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter,
      (void *)&grpc_message_size_filter);
  grpc_channel_init_register_stage(
      GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter,
      (void *)&grpc_compress_filter);
  grpc_channel_init_register_stage(
      GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
      prepend_filter, (void *)&grpc_compress_filter);
  grpc_channel_init_register_stage(
      GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter,
      (void *)&grpc_compress_filter);
  grpc_channel_init_register_stage(
      GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
      maybe_add_http_filter, (void *)&grpc_http_client_filter);
  grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
                                   GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
                                   grpc_add_connected_filter, NULL);
  grpc_channel_init_register_stage(
      GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
      maybe_add_http_filter, (void *)&grpc_http_client_filter);
  grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL,
                                   GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
                                   grpc_add_connected_filter, NULL);
  grpc_channel_init_register_stage(
      GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
      maybe_add_http_filter, (void *)&grpc_http_server_filter);
  grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL,
                                   GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
                                   grpc_add_connected_filter, NULL);
  grpc_channel_init_register_stage(GRPC_CLIENT_LAME_CHANNEL,
                                   GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
                                   append_filter, (void *)&grpc_lame_filter);
  grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX, prepend_filter,
                                   (void *)&grpc_server_top_filter);
}

而在前面grpc_channel_create中,调用了grpc_channel_init_create_stack,这样,就把内置的filter放到了build中:

bool grpc_channel_init_create_stack(grpc_exec_ctx *exec_ctx,
                                    grpc_channel_stack_builder *builder,
                                    grpc_channel_stack_type type) {
  GPR_ASSERT(g_finalized);

  grpc_channel_stack_builder_set_name(builder, name_for_type(type));

  for (size_t i = 0; i < g_slots[type].num_slots; i++) {
    const stage_slot *slot = &g_slots[type].slots[i];
    if (!slot->fn(builder, slot->arg)) {
      return false;
    }
  }

  return true;
}

grpc_channel_stack的初始化

在上面的grpc_channel_create中,就会去初始化grpc_channel_stack:
src/core/lib/surface/channel_init.c:

grpc_error *grpc_channel_stack_builder_finish(
    grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder,
    size_t prefix_bytes, int initial_refs, grpc_iomgr_cb_func destroy,
    void *destroy_arg, void **result) {
  // count the number of filters
  size_t num_filters = 0;
  for (filter_node *p = builder->begin.next; p != &builder->end; p = p->next) {
    num_filters++;
  }

  // create an array of filters 过滤器的设置
  const grpc_channel_filter **filters =
      gpr_malloc(sizeof(*filters) * num_filters);
  size_t i = 0;
  for (filter_node *p = builder->begin.next; p != &builder->end; p = p->next) {
    filters[i++] = p->filter;
  }

  // calculate the size of the channel stack
  size_t channel_stack_size = grpc_channel_stack_size(filters, num_filters);

  // 初始化grpc_channel_stack,它紧挨着channel
  // allocate memory, with prefix_bytes followed by channel_stack_size
  *result = gpr_malloc(prefix_bytes + channel_stack_size);
  // fetch a pointer to the channel stack
  grpc_channel_stack *channel_stack =
      (grpc_channel_stack *)((char *)(*result) + prefix_bytes);
  // and initialize it
  grpc_error *error = grpc_channel_stack_init(
      exec_ctx, initial_refs, destroy,
      destroy_arg == NULL ? *result : destroy_arg, filters, num_filters,
      builder->args, builder->transport, builder->name, channel_stack);

  if (error != GRPC_ERROR_NONE) {
    grpc_channel_stack_destroy(exec_ctx, channel_stack);
    gpr_free(*result);
    *result = NULL;
  } else {
    // run post-initialization functions
    i = 0;
    for (filter_node *p = builder->begin.next; p != &builder->end;
         p = p->next) {
      if (p->init != NULL) {
        p->init(channel_stack, grpc_channel_stack_element(channel_stack, i),
                p->init_arg);
      }
      i++;
    }
  }

  grpc_channel_stack_builder_destroy(builder);
  gpr_free((grpc_channel_filter **)filters);

  return error;
}

而grpc_channel_stack_init就是把stack, filters, args放到一起,从下面的CHANNEL_ELEMS_FROM_STACK可知,grpc_channel_element是紧挨着stack的:
它的结构是:

    +------------+------------+---------------------------+---------------------------+
    |            |            |                           |                           |
    |  channel   |   stack    | grpc_channel_element list |      user_data list       |
    |            |            |                           |                           |
    +------------+------------+---------------------------+---------------------------+
/* Memory layouts.

   Channel stack is laid out as: {
     grpc_channel_stack stk;
     padding to GPR_MAX_ALIGNMENT
     grpc_channel_element[stk.count];
     per-filter memory, aligned to GPR_MAX_ALIGNMENT
   }

   Call stack is laid out as: {
     grpc_call_stack stk;
     padding to GPR_MAX_ALIGNMENT
     grpc_call_element[stk.count];
     per-filter memory, aligned to GPR_MAX_ALIGNMENT
   } */
grpc_error *grpc_channel_stack_init(
    grpc_exec_ctx *exec_ctx, int initial_refs, grpc_iomgr_cb_func destroy,
    void *destroy_arg, const grpc_channel_filter **filters, size_t filter_count,
    const grpc_channel_args *channel_args, grpc_transport *optional_transport,
    const char *name, grpc_channel_stack *stack) {
  size_t call_size =
      ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)) +
      ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_call_element));
  grpc_channel_element *elems;
  grpc_channel_element_args args;
  char *user_data;
  size_t i;

  stack->count = filter_count;
  GRPC_STREAM_REF_INIT(&stack->refcount, initial_refs, destroy, destroy_arg,
                       name);
  elems = CHANNEL_ELEMS_FROM_STACK(stack);
  user_data =
      ((char *)elems) +
      ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_channel_element));

  /* init per-filter data */
  grpc_error *first_error = GRPC_ERROR_NONE;
  for (i = 0; i < filter_count; i++) {
    args.channel_stack = stack;
    args.channel_args = channel_args;
    args.optional_transport = optional_transport;
    args.is_first = i == 0;
    args.is_last = i == (filter_count - 1);
    elems[i].filter = filters[i];
    elems[i].channel_data = user_data;
    grpc_error *error =
        elems[i].filter->init_channel_elem(exec_ctx, &elems[i], &args);
    if (error != GRPC_ERROR_NONE) {
      if (first_error == GRPC_ERROR_NONE) {
        first_error = error;
      } else {
        GRPC_ERROR_UNREF(error);
      }
    }
    user_data += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_channel_data);
    call_size += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_call_data);
  }

  GPR_ASSERT(user_data > (char *)stack);
  GPR_ASSERT((uintptr_t)(user_data - (char *)stack) ==
             grpc_channel_stack_size(filters, filter_count));

  stack->call_stack_size = call_size;
  return first_error;
}

grpc_compress_filter

const grpc_channel_filter grpc_compress_filter = {
    compress_start_transport_stream_op,
    grpc_channel_next_op,
    sizeof(call_data),
    init_call_elem,
    grpc_call_stack_ignore_set_pollset_or_pollset_set,
    destroy_call_elem,
    sizeof(channel_data),
    init_channel_elem,
    destroy_channel_elem,
    grpc_call_next_get_peer,
    grpc_channel_next_get_info,
    "compress"};

通过上面的分析,client默认只会使用一个grpc_compress_filter过滤器,那么下面看看它是怎么调用这个过滤器的;前面已经看到,一次调用会通过grpc_call_start_batch 把它的所有操作放到一个grpc_transport_stream_op对象中,然后调用execute_op过滤器执行具体操作(但它不一定执行,有选项来开启):
src/core/lib/surface/call.c

execute_op(exec_ctx, call, stream_op);
static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
                       grpc_transport_stream_op *op) {
  grpc_call_element *elem;

  GPR_TIMER_BEGIN("execute_op", 0);
  elem = CALL_ELEM_FROM_CALL(call, 0);
  op->context = call->context;
  elem->filter->start_transport_stream_op(exec_ctx, elem, op);
  GPR_TIMER_END("execute_op", 0);
}

而这里的start_transport_stream_op函数与grpc_call_element中的filter有关,这里是compress_filter,所以执行:

static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
                                               grpc_call_element *elem,
                                               grpc_transport_stream_op *op) {
  call_data *calld = elem->call_data;

  GPR_TIMER_BEGIN("compress_start_transport_stream_op", 0);

  if (op->send_initial_metadata) {
    process_send_initial_metadata(elem, op->send_initial_metadata);
  }
  if (op->send_message != NULL &&
      !skip_compression(elem, op->send_message->flags)) {
    calld->send_op = op;
    calld->send_length = op->send_message->length;
    calld->send_flags = op->send_message->flags;
    continue_send_message(exec_ctx, elem);
  } else {
    /* pass control down the stack */
    grpc_call_next_op(exec_ctx, elem, op);
  }

  GPR_TIMER_END("compress_start_transport_stream_op", 0);
}

而这里就会去执行continue_send_message发送数据:
src/core/lib/channel/compress_filter.c

static void continue_send_message(grpc_exec_ctx *exec_ctx,
                                  grpc_call_element *elem) {
  call_data *calld = elem->call_data;
  while (grpc_byte_stream_next(exec_ctx, calld->send_op->send_message,
                               &calld->incoming_slice, ~(size_t)0,
                               &calld->got_slice)) {
    grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
    if (calld->send_length == calld->slices.length) {
      finish_send_message(exec_ctx, elem);
      break;
    }
  }
}
static void finish_send_message(grpc_exec_ctx *exec_ctx,
                                grpc_call_element *elem) {
  call_data *calld = elem->call_data;
  int did_compress;
  grpc_slice_buffer tmp;
  grpc_slice_buffer_init(&tmp);
  did_compress =
      grpc_msg_compress(calld->compression_algorithm, &calld->slices, &tmp);
  if (did_compress) {
    if (grpc_compression_trace) {
      char *algo_name;
      const size_t before_size = calld->slices.length;
      const size_t after_size = tmp.length;
      const float savings_ratio = 1.0f - (float)after_size / (float)before_size;
      GPR_ASSERT(grpc_compression_algorithm_name(calld->compression_algorithm,
                                                 &algo_name));
      gpr_log(GPR_DEBUG, "Compressed[%s] %" PRIuPTR " bytes vs. %" PRIuPTR
                         " bytes (%.2f%% savings)",
              algo_name, before_size, after_size, 100 * savings_ratio);
    }
    grpc_slice_buffer_swap(&calld->slices, &tmp);
    calld->send_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
  } else {
    if (grpc_compression_trace) {
      char *algo_name;
      GPR_ASSERT(grpc_compression_algorithm_name(calld->compression_algorithm,
                                                 &algo_name));
      gpr_log(GPR_DEBUG,
              "Algorithm '%s' enabled but decided not to compress. Input size: "
              "%" PRIuPTR,
              algo_name, calld->slices.length);
    }
  }

  grpc_slice_buffer_destroy(&tmp);

  grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices,
                                calld->send_flags);
  calld->send_op->send_message = &calld->replacement_stream.base;
  calld->post_send = calld->send_op->on_complete;
  calld->send_op->on_complete = &calld->send_done;

  grpc_call_next_op(exec_ctx, elem, calld->send_op);
}

上面会调用grpc_msg_compress进行压缩,支持gzip:

static int compress_inner(grpc_compression_algorithm algorithm,
                          grpc_slice_buffer* input, grpc_slice_buffer* output) {
  switch (algorithm) {
    case GRPC_COMPRESS_NONE:
      /* the fallback path always needs to be send uncompressed: we simply
         rely on that here */
      return 0;
    case GRPC_COMPRESS_DEFLATE:
      return zlib_compress(input, output, 0);
    case GRPC_COMPRESS_GZIP:
      return zlib_compress(input, output, 1);
    case GRPC_COMPRESS_ALGORITHMS_COUNT:
      break;
  }
  gpr_log(GPR_ERROR, "invalid compression algorithm %d", algorithm);
  return 0;
}

int grpc_msg_compress(grpc_compression_algorithm algorithm,
                      grpc_slice_buffer* input, grpc_slice_buffer* output) {
  if (!compress_inner(algorithm, input, output)) {
    copy(input, output);
    return 0;
  }
  return 1;
}

call stack的创建过程

它包含了call, call_stack, call_element和filter等的初始化:

在最开始的时候,就会创建一个Call对象:

Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
                         ClientContext* context, const InputMessage& request,
                         OutputMessage* result) {
  CompletionQueue cq;
  Call call(channel->CreateCall(method, context, &cq));
  CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
            CallOpRecvInitialMetadata, CallOpRecvMessage<OutputMessage>,
            CallOpClientSendClose, CallOpClientRecvStatus>
      ops;
  Status status = ops.SendMessage(request);
  if (!status.ok()) {
    return status;
  }
  ops.SendInitialMetadata(context->send_initial_metadata_,
                          context->initial_metadata_flags());
  ops.RecvInitialMetadata(context);
  ops.RecvMessage(result);
  ops.ClientSendClose();
  ops.ClientRecvStatus(context, &status);
  call.PerformOps(&ops);
  GPR_CODEGEN_ASSERT((cq.Pluck(&ops) && ops.got_message) || !status.ok());
  return status;
}

然后它会创建一个call对象:
./src/cpp/client/channel_cc.cc

Call Channel::CreateCall(const RpcMethod& method, ClientContext* context,
                         CompletionQueue* cq) {
  const bool kRegistered = method.channel_tag() && context->authority().empty();
  grpc_call* c_call = NULL;
  if (kRegistered) {
    c_call = grpc_channel_create_registered_call(
        c_channel_, context->propagate_from_call_,
        context->propagation_options_.c_bitmask(), cq->cq(),
        method.channel_tag(), context->raw_deadline(), nullptr);
  } else {
    const char* host_str = NULL;
    if (!context->authority().empty()) {
      host_str = context->authority_.c_str();
    } else if (!host_.empty()) {
      host_str = host_.c_str();
    }
    c_call = grpc_channel_create_call(c_channel_, context->propagate_from_call_,
                                      context->propagation_options_.c_bitmask(),
                                      cq->cq(), method.name(), host_str,
                                      context->raw_deadline(), nullptr);
  }
  grpc_census_call_set_context(c_call, context->census_context());
  context->set_call(c_call, shared_from_this());
  return Call(c_call, this, cq);
}
grpc_call *grpc_channel_create_registered_call(
    grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask,
    grpc_completion_queue *completion_queue, void *registered_call_handle,
    gpr_timespec deadline, void *reserved) {
  registered_call *rc = registered_call_handle;
  GRPC_API_TRACE(
      "grpc_channel_create_registered_call("
      "channel=%p, parent_call=%p, propagation_mask=%x, completion_queue=%p, "
      "registered_call_handle=%p, "
      "deadline=gpr_timespec { tv_sec: %" PRId64
      ", tv_nsec: %d, clock_type: %d }, "
      "reserved=%p)",
      9, (channel, parent_call, (unsigned)propagation_mask, completion_queue,
          registered_call_handle, deadline.tv_sec, deadline.tv_nsec,
          (int)deadline.clock_type, reserved));
  GPR_ASSERT(!reserved);
  return grpc_channel_create_call_internal(
      channel, parent_call, propagation_mask, completion_queue, NULL,
      GRPC_MDELEM_REF(rc->path),
      rc->authority ? GRPC_MDELEM_REF(rc->authority) : NULL, deadline);
}
static grpc_call *grpc_channel_create_call_internal(
    grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask,
    grpc_completion_queue *cq, grpc_pollset_set *pollset_set_alternative,
    grpc_mdelem *path_mdelem, grpc_mdelem *authority_mdelem,
    gpr_timespec deadline) {
  grpc_mdelem *send_metadata[2];
  size_t num_metadata = 0;

  GPR_ASSERT(channel->is_client);
  GPR_ASSERT(!(cq != NULL && pollset_set_alternative != NULL));

  send_metadata[num_metadata++] = path_mdelem;
  if (authority_mdelem != NULL) {
    send_metadata[num_metadata++] = authority_mdelem;
  } else if (channel->default_authority != NULL) {
    send_metadata[num_metadata++] = GRPC_MDELEM_REF(channel->default_authority);
  }

  grpc_call_create_args args;
  memset(&args, 0, sizeof(args));
  args.channel = channel;
  args.parent_call = parent_call;
  args.propagation_mask = propagation_mask;
  args.cq = cq;
  args.pollset_set_alternative = pollset_set_alternative;
  args.server_transport_data = NULL;
  args.add_initial_metadata = send_metadata;
  args.add_initial_metadata_count = num_metadata;
  args.send_deadline = deadline;

  grpc_call *call;
  GRPC_LOG_IF_ERROR("call_create", grpc_call_create(&args, &call));
  return call;
}

src/core/lib/surface/call.c:

grpc_error *grpc_call_create(const grpc_call_create_args *args,
                             grpc_call **out_call) {
  size_t i, j;
  grpc_channel_stack *channel_stack =
      grpc_channel_get_channel_stack(args->channel);
  grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
  grpc_call *call;
  GPR_TIMER_BEGIN("grpc_call_create", 0);
  call = gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size);
  *out_call = call;
  memset(call, 0, sizeof(grpc_call));
  gpr_mu_init(&call->mu);
  call->channel = args->channel;
  call->cq = args->cq;
  call->parent = args->parent_call;
  call->start_time = gpr_now(GPR_CLOCK_MONOTONIC);
  /* Always support no compression */
  GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
  call->is_client = args->server_transport_data == NULL;
  grpc_mdstr *path = NULL;
  if (call->is_client) {
    GPR_ASSERT(args->add_initial_metadata_count <
               MAX_SEND_EXTRA_METADATA_COUNT);
    for (i = 0; i < args->add_initial_metadata_count; i++) {
      call->send_extra_metadata[i].md = args->add_initial_metadata[i];
      if (args->add_initial_metadata[i]->key == GRPC_MDSTR_PATH) {
        path = GRPC_MDSTR_REF(args->add_initial_metadata[i]->value);
      }
    }
    call->send_extra_metadata_count = (int)args->add_initial_metadata_count;
  } else {
    GPR_ASSERT(args->add_initial_metadata_count == 0);
    call->send_extra_metadata_count = 0;
  }
  for (i = 0; i < 2; i++) {
    for (j = 0; j < 2; j++) {
      call->metadata_batch[i][j].deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
    }
  }
  gpr_timespec send_deadline =
      gpr_convert_clock_type(args->send_deadline, GPR_CLOCK_MONOTONIC);

  if (args->parent_call != NULL) {
    GRPC_CALL_INTERNAL_REF(args->parent_call, "child");
    GPR_ASSERT(call->is_client);
    GPR_ASSERT(!args->parent_call->is_client);

    gpr_mu_lock(&args->parent_call->mu);

    if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) {
      send_deadline = gpr_time_min(
          gpr_convert_clock_type(send_deadline,
                                 args->parent_call->send_deadline.clock_type),
          args->parent_call->send_deadline);
    }
    /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with
     * GRPC_PROPAGATE_STATS_CONTEXT */
    /* TODO(ctiller): This should change to use the appropriate census start_op
     * call. */
    if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) {
      GPR_ASSERT(args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT);
      grpc_call_context_set(
          call, GRPC_CONTEXT_TRACING,
          args->parent_call->context[GRPC_CONTEXT_TRACING].value, NULL);
    } else {
      GPR_ASSERT(args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT);
    }
    if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
      call->cancellation_is_inherited = 1;
    }

    if (args->parent_call->first_child == NULL) {
      args->parent_call->first_child = call;
      call->sibling_next = call->sibling_prev = call;
    } else {
      call->sibling_next = args->parent_call->first_child;
      call->sibling_prev = args->parent_call->first_child->sibling_prev;
      call->sibling_next->sibling_prev = call->sibling_prev->sibling_next =
          call;
    }

    gpr_mu_unlock(&args->parent_call->mu);
  }

  call->send_deadline = send_deadline;

  GRPC_CHANNEL_INTERNAL_REF(args->channel, "call");
  /* initial refcount dropped by grpc_call_destroy */
  grpc_error *error = grpc_call_stack_init(
      &exec_ctx, channel_stack, 1, destroy_call, call, call->context,
      args->server_transport_data, path, call->start_time, send_deadline,
      CALL_STACK_FROM_CALL(call));
  if (error != GRPC_ERROR_NONE) {
    grpc_status_code status;
    const char *error_str;
    grpc_error_get_status(error, &status, &error_str);
    close_with_status(&exec_ctx, call, status, error_str);
  }
  if (args->cq != NULL) {
    GPR_ASSERT(
        args->pollset_set_alternative == NULL &&
        "Only one of 'cq' and 'pollset_set_alternative' should be non-NULL.");
    GRPC_CQ_INTERNAL_REF(args->cq, "bind");
    call->pollent =
        grpc_polling_entity_create_from_pollset(grpc_cq_pollset(args->cq));
  }
  if (args->pollset_set_alternative != NULL) {
    call->pollent = grpc_polling_entity_create_from_pollset_set(
        args->pollset_set_alternative);
  }
  if (!grpc_polling_entity_is_empty(&call->pollent)) {
    grpc_call_stack_set_pollset_or_pollset_set(
        &exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent);
  }

  if (path != NULL) GRPC_MDSTR_UNREF(path);

  grpc_exec_ctx_finish(&exec_ctx);
  GPR_TIMER_END("grpc_call_create", 0);
  return error;
}

src/core/lib/channel/channel_stack.c:

grpc_error *grpc_call_stack_init(
    grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
    int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg,
    grpc_call_context_element *context, const void *transport_server_data,
    grpc_mdstr *path, gpr_timespec start_time, gpr_timespec deadline,
    grpc_call_stack *call_stack) {
  grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack);
  grpc_call_element_args args;
  size_t count = channel_stack->count;
  grpc_call_element *call_elems;
  char *user_data;
  size_t i;

  call_stack->count = count;
  GRPC_STREAM_REF_INIT(&call_stack->refcount, initial_refs, destroy,
                       destroy_arg, "CALL_STACK");
  call_elems = CALL_ELEMS_FROM_STACK(call_stack);
  user_data = ((char *)call_elems) +
              ROUND_UP_TO_ALIGNMENT_SIZE(count * sizeof(grpc_call_element));

  /* init per-filter data */
  grpc_error *first_error = GRPC_ERROR_NONE;
  args.start_time = start_time;
  for (i = 0; i < count; i++) {
    args.call_stack = call_stack;
    args.server_transport_data = transport_server_data;
    args.context = context;
    args.path = path;
    args.deadline = deadline;
    call_elems[i].filter = channel_elems[i].filter;
    call_elems[i].channel_data = channel_elems[i].channel_data;
    call_elems[i].call_data = user_data;
    grpc_error *error =
        call_elems[i].filter->init_call_elem(exec_ctx, &call_elems[i], &args);
    if (error != GRPC_ERROR_NONE) {
      if (first_error == GRPC_ERROR_NONE) {
        first_error = error;
      } else {
        GRPC_ERROR_UNREF(error);
      }
    }
    user_data +=
        ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data);
  }
  return first_error;
}

从上面可以看出,call_stack是从channel_stack的element中得到filter和channel_data的。

总结

同步调用的过程:创建Channel(初始化stack, filters)-》然后调用stub对应的接口-》调用grpc的同步方法-》创建call_stack-》protobuf序列化-》封装成grpc_transport_stream_op对象-》通过filters预处理-》transport发送。这里只是简单的走了一遍逻辑,不过代码细节处的调用层非常深,不容易理清逻辑 。

原文链接:,转发请注明来源!

发表评论