GRPC Stream调用

文章目录

proto文件定义

// Interface exported by the server.
service RouteGuide {
  // 一个标准的rpc调用
  rpc GetFeature(Point) returns (Feature) {}

  // 一个server返回给客户端一个stream的调用
  rpc ListFeatures(Rectangle) returns (stream Feature) {}

  // 一个client以stream方式调用server
  rpc RecordRoute(stream Point) returns (RouteSummary) {}

  // 双向stream调用
  rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
}

// 以经度,纬度的坐标点
message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

// 两个点定义的一片区域
message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

// 一个点所在名字
message Feature {
  string name = 1;
  Point location = 2;
}
message RouteNote {
  Point location = 1;
  string message = 2;
}
message RouteSummary {
  int32 point_count = 1;
  int32 feature_count = 2;
  int32 distance = 3;
  int32 elapsed_time = 4;
}

上面定义了4个方法,其它包含了三个stream类型的调用client to server, server to client和双向stream

stream调用用到的reader和writer

reader

template <class R>
class ClientReader final : public ClientReaderInterface<R> {
 public:
  /// Blocking create a stream and write the first request out.
  template <class W>
  ClientReader(ChannelInterface* channel, const RpcMethod& method,
               ClientContext* context, const W& request)
      : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
    CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
              CallOpClientSendClose>
        ops;
    ops.SendInitialMetadata(context->send_initial_metadata_,
                            context->initial_metadata_flags());
    // TODO(ctiller): don't assert
    GPR_CODEGEN_ASSERT(ops.SendMessage(request).ok());
    ops.ClientSendClose();
    call_.PerformOps(&ops);
    cq_.Pluck(&ops);
  }

  void WaitForInitialMetadata() override {
    GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);

    CallOpSet<CallOpRecvInitialMetadata> ops;
    ops.RecvInitialMetadata(context_);
    call_.PerformOps(&ops);
    cq_.Pluck(&ops);  /// status ignored
  }

  bool NextMessageSize(uint32_t* sz) override {
    *sz = call_.max_receive_message_size();
    return true;
  }

  bool Read(R* msg) override {
    CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
    if (!context_->initial_metadata_received_) {
      ops.RecvInitialMetadata(context_);
    }
    ops.RecvMessage(msg);
    call_.PerformOps(&ops);
    return cq_.Pluck(&ops) && ops.got_message;
  }

  Status Finish() override {
    CallOpSet<CallOpClientRecvStatus> ops;
    Status status;
    ops.ClientRecvStatus(context_, &status);
    call_.PerformOps(&ops);
    GPR_CODEGEN_ASSERT(cq_.Pluck(&ops));
    return status;
  }

 private:
  ClientContext* context_;
  CompletionQueue cq_;
  Call call_;
};

可以看出,当client stream调用时,reader在创建的时候,封装了一次rpc调用,然后再Next获取结果时,直接从CompletionQueue中得到;

template <class R>
class ServerReader final : public ServerReaderInterface<R> {
 public:
  ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}

  void SendInitialMetadata() override {
    GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);

    CallOpSet<CallOpSendInitialMetadata> ops;
    ops.SendInitialMetadata(ctx_->initial_metadata_,
                            ctx_->initial_metadata_flags());
    if (ctx_->compression_level_set()) {
      ops.set_compression_level(ctx_->compression_level());
    }
    ctx_->sent_initial_metadata_ = true;
    call_->PerformOps(&ops);
    call_->cq()->Pluck(&ops);
  }

  bool NextMessageSize(uint32_t* sz) override {
    *sz = call_->max_receive_message_size();
    return true;
  }

  bool Read(R* msg) override {
    CallOpSet<CallOpRecvMessage<R>> ops;
    ops.RecvMessage(msg);
    call_->PerformOps(&ops);
    return call_->cq()->Pluck(&ops) && ops.got_message;
  }

 private:
  Call* const call_;
  ServerContext* const ctx_;
};

ClientReader和ServerReader的区别在于它在创建时,ClientServer有CallOpSendMessage操作用于发送调用信息;

writer

template <class W>
class ClientWriter : public ClientWriterInterface<W> {
 public:
  /// Blocking create a stream.
  template <class R>
  ClientWriter(ChannelInterface* channel, const RpcMethod& method,
               ClientContext* context, R* response)
      : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
    finish_ops_.RecvMessage(response);
    finish_ops_.AllowNoMessage();

    CallOpSet<CallOpSendInitialMetadata> ops;
    ops.SendInitialMetadata(context->send_initial_metadata_,
                            context->initial_metadata_flags());
    call_.PerformOps(&ops);
    cq_.Pluck(&ops);
  }

  void WaitForInitialMetadata() {
    GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);

    CallOpSet<CallOpRecvInitialMetadata> ops;
    ops.RecvInitialMetadata(context_);
    call_.PerformOps(&ops);
    cq_.Pluck(&ops);  // status ignored
  }

  using WriterInterface<W>::Write;
  bool Write(const W& msg, const WriteOptions& options) override {
    CallOpSet<CallOpSendMessage> ops;
    if (!ops.SendMessage(msg, options).ok()) {
      return false;
    }
    call_.PerformOps(&ops);
    return cq_.Pluck(&ops);
  }

  bool WritesDone() override {
    CallOpSet<CallOpClientSendClose> ops;
    ops.ClientSendClose();
    call_.PerformOps(&ops);
    return cq_.Pluck(&ops);
  }

  /// Read the final response and wait for the final status.
  Status Finish() override {
    Status status;
    if (!context_->initial_metadata_received_) {
      finish_ops_.RecvInitialMetadata(context_);
    }
    finish_ops_.ClientRecvStatus(context_, &status);
    call_.PerformOps(&finish_ops_);
    GPR_CODEGEN_ASSERT(cq_.Pluck(&finish_ops_));
    return status;
  }

 private:
  ClientContext* context_;
  CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage,
            CallOpClientRecvStatus>
      finish_ops_;
  CompletionQueue cq_;
  Call call_;
};
template <class W>
class ServerWriter final : public ServerWriterInterface<W> {
 public:
  ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}

  void SendInitialMetadata() override {
    GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);

    CallOpSet<CallOpSendInitialMetadata> ops;
    ops.SendInitialMetadata(ctx_->initial_metadata_,
                            ctx_->initial_metadata_flags());
    if (ctx_->compression_level_set()) {
      ops.set_compression_level(ctx_->compression_level());
    }
    ctx_->sent_initial_metadata_ = true;
    call_->PerformOps(&ops);
    call_->cq()->Pluck(&ops);
  }

  using WriterInterface<W>::Write;
  bool Write(const W& msg, const WriteOptions& options) override {
    CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
    if (!ops.SendMessage(msg, options).ok()) {
      return false;
    }
    if (!ctx_->sent_initial_metadata_) {
      ops.SendInitialMetadata(ctx_->initial_metadata_,
                              ctx_->initial_metadata_flags());
      if (ctx_->compression_level_set()) {
        ops.set_compression_level(ctx_->compression_level());
      }
      ctx_->sent_initial_metadata_ = true;
    }
    call_->PerformOps(&ops);
    return call_->cq()->Pluck(&ops);
  }

 private:
  Call* const call_;
  ServerContext* const ctx_;
};

它通过构造函数创建一个call,然后通过writer来写数据;

server stream to client

生成的调用代码:

    std::unique_ptr< ::grpc::ClientReaderInterface< ::routeguide::Feature>> ListFeatures(::grpc::ClientContext* context, const ::routeguide::Rectangle& request) {
      return std::unique_ptr< ::grpc::ClientReaderInterface< ::routeguide::Feature>>(ListFeaturesRaw(context, request));
    }
::grpc::ClientReader< ::routeguide::Feature>* RouteGuide::Stub::ListFeaturesRaw(::grpc::ClientContext* context, const ::routeguide::Rectangle& request) {
  return new ::grpc::ClientReader< ::routeguide::Feature>(channel_.get(), rpcmethod_ListFeatures_, context, request);
}

客户端:

  void ListFeatures() {
    routeguide::Rectangle rect;
    Feature feature;
    ClientContext context;

    rect.mutable_lo()->set_latitude(400000000);
    rect.mutable_lo()->set_longitude(-750000000);
    rect.mutable_hi()->set_latitude(420000000);
    rect.mutable_hi()->set_longitude(-730000000);
    std::cout << "Looking for features between 40, -75 and 42, -73"
              << std::endl;

    std::unique_ptr<ClientReader<Feature> > reader(
        stub_->ListFeatures(&context, rect));
    while (reader->Read(&feature)) {
      std::cout << "Found feature called "
                << feature.name() << " at "
                << feature.location().latitude()/kCoordFactor_ << ", "
                << feature.location().longitude()/kCoordFactor_ << std::endl;
    }
    Status status = reader->Finish();
    if (status.ok()) {
      std::cout << "ListFeatures rpc succeeded." << std::endl;
    } else {
      std::cout << "ListFeatures rpc failed." << std::endl;
    }
  }

client通过Reader循环读;

服务器的实现代码:

Status ListFeatures(ServerContext* context,
                      const routeguide::Rectangle* rectangle,
                      ServerWriter<Feature>* writer) override {
    auto lo = rectangle->lo();
    auto hi = rectangle->hi();
    long left = (std::min)(lo.longitude(), hi.longitude());
    long right = (std::max)(lo.longitude(), hi.longitude());
    long top = (std::max)(lo.latitude(), hi.latitude());
    long bottom = (std::min)(lo.latitude(), hi.latitude());
    for (const Feature& f : feature_list_) {
      if (f.location().longitude() >= left &&
          f.location().longitude() <= right &&
          f.location().latitude() >= bottom &&
          f.location().latitude() <= top) {
        writer->Write(f);
      }
    }
    return Status::OK;
  }

server通过writer循环写;

它生成的接口代码参数不你是同步调用那种request, response和context三个参数了, 而是context,resquest,writer, 实现里面通过writer的write接口发送数据;

client stream to server

客户端生成的代码:

std::unique_ptr< ::grpc::ClientWriterInterface< ::routeguide::Point>> RecordRoute(::grpc::ClientContext* context, ::routeguide::RouteSummary* response) {
      return std::unique_ptr< ::grpc::ClientWriterInterface< ::routeguide::Point>>(RecordRouteRaw(context, response));
    }
::grpc::ClientWriter< ::routeguide::Point>* RouteGuide::Stub::RecordRouteRaw(::grpc::ClientContext* context, ::routeguide::RouteSummary* response) {
  return new ::grpc::ClientWriter< ::routeguide::Point>(channel_.get(), rpcmethod_RecordRoute_, context, response);
}

客户端调用代码:

  void RecordRoute() {
    Point point;
    RouteSummary stats;
    ClientContext context;
    const int kPoints = 10;
    unsigned seed = std::chrono::system_clock::now().time_since_epoch().count();

    std::default_random_engine generator(seed);
    std::uniform_int_distribution<int> feature_distribution(
        0, feature_list_.size() - 1);
    std::uniform_int_distribution<int> delay_distribution(
        500, 1500);

    std::unique_ptr<ClientWriter<Point> > writer(
        stub_->RecordRoute(&context, &stats));
    for (int i = 0; i < kPoints; i++) {
      const Feature& f = feature_list_[feature_distribution(generator)];
      std::cout << "Visiting point "
                << f.location().latitude()/kCoordFactor_ << ", "
                << f.location().longitude()/kCoordFactor_ << std::endl;
      if (!writer->Write(f.location())) {
        // Broken stream.
        break;
      }
      std::this_thread::sleep_for(std::chrono::milliseconds(
          delay_distribution(generator)));
    }
    writer->WritesDone();
    Status status = writer->Finish();
    if (status.ok()) {
      std::cout << "Finished trip with " << stats.point_count() << " points\n"
                << "Passed " << stats.feature_count() << " features\n"
                << "Travelled " << stats.distance() << " meters\n"
                << "It took " << stats.elapsed_time() << " seconds"
                << std::endl;
    } else {
      std::cout << "RecordRoute rpc failed." << std::endl;
    }
  }

client通过writer循环写;

server实现:

  Status RecordRoute(ServerContext* context, ServerReader<Point>* reader,
                     RouteSummary* summary) override {
    Point point;
    int point_count = 0;
    int feature_count = 0;
    float distance = 0.0;
    Point previous;

    system_clock::time_point start_time = system_clock::now();
    while (reader->Read(&point)) {
      point_count++;
      if (!GetFeatureName(point, feature_list_).empty()) {
        feature_count++;
      }
      if (point_count != 1) {
        distance += GetDistance(previous, point);
      }
      previous = point;
    }
    system_clock::time_point end_time = system_clock::now();
    summary->set_point_count(point_count);
    summary->set_feature_count(feature_count);
    summary->set_distance(static_cast<long>(distance));
    auto secs = std::chrono::duration_cast<std::chrono::seconds>(
        end_time - start_time);
    summary->set_elapsed_time(secs.count());

    return Status::OK;
  }

server端通过reader循环读;
注意reader与writer在调用时,server写client端调用方式不同:
ClientReader在调用读时,先循环读next,完成时要调用Finish;而ServerReader只用调用next;
ClientWriter在写时,先循环调用Write,完成时调用WritesDone,最后还要调用Finish;而ServerWriter只调用Write;
这是原为对于server来说,每个生成的rpc调用都会有一个status作为返回值,在函数返回时,就可以去关闭相应的writer与reader了,而client则不同;grpc不知道什么时候才是正确的关闭时机,所以需要手动关闭;

Bidirectional stream

客户端调用:

void RouteChat() {
    ClientContext context;

    std::shared_ptr<ClientReaderWriter<RouteNote, RouteNote> > stream(
        stub_->RouteChat(&context));

    std::thread writer([stream]() {
      std::vector<RouteNote> notes{
        MakeRouteNote("First message", 0, 0),
        MakeRouteNote("Second message", 0, 1),
        MakeRouteNote("Third message", 1, 0),
        MakeRouteNote("Fourth message", 0, 0)};
      for (const RouteNote& note : notes) {
        std::cout << "Sending message " << note.message()
                  << " at " << note.location().latitude() << ", "
                  << note.location().longitude() << std::endl;
        stream->Write(note);
      }
      stream->WritesDone();
    });

    RouteNote server_note;
    while (stream->Read(&server_note)) {
      std::cout << "Got message " << server_note.message()
                << " at " << server_note.location().latitude() << ", "
                << server_note.location().longitude() << std::endl;
    }
    writer.join();
    Status status = stream->Finish();
    if (!status.ok()) {
      std::cout << "RouteChat rpc failed." << std::endl;
    }
  }

这里创建了一个线程,向server循环发送消息;然后在主线程主循环读消息;它用的是ClientReaderWriter

服务端实现:

  Status RouteChat(ServerContext* context,
                   ServerReaderWriter<RouteNote, RouteNote>* stream) override {
    std::vector<RouteNote> received_notes;
    RouteNote note;
    while (stream->Read(&note)) {
      for (const RouteNote& n : received_notes) {
        if (n.location().latitude() == note.location().latitude() &&
            n.location().longitude() == note.location().longitude()) {
          stream->Write(n);
        }
      }
      received_notes.push_back(note);
    }

    return Status::OK;
  }

它使用的是ServerReaderWriter来同时执行reader和writer;

应用场景

股票,聊天,推送,上报;
client stream to server:消息上报类应用;
server stream to client: 推送类应用;
Bidirectional stream: 聊天类,游戏通信;

原文链接:,转发请注明来源!

发表评论