Scanner C++ API
grpc.h
1 /* Licensed under the Apache License, Version 2.0 (the "License");
2  * you may not use this file except in compliance with the License.
3  * You may obtain a copy of the License at
4  *
5  * http://www.apache.org/licenses/LICENSE-2.0
6  *
7  * Unless required by applicable law or agreed to in writing, software
8  * distributed under the License is distributed on an "AS IS" BASIS,
9  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10  * See the License for the specific language governing permissions and
11  * limitations under the License.
12  */
13 
14 #pragma once
15 
16 #include <cstdlib>
17 #include <unistd.h>
18 
19 namespace scanner {
20 
21 #define GRPC_BACKOFF(expression__, status__) \
22  GRPC_BACKOFF_TIMEOUT(expression__, status__, 64, 0)
23 
24 #define GRPC_BACKOFF_D(expression__, status__, deadline__) \
25  GRPC_BACKOFF_TIMEOUT(expression__, status__, 64, deadline__)
26 
27 #define GRPC_BACKOFF_TIMEOUT(expression__, status__, timeout__, deadline__) \
28  do { \
29  int sleep_debt__ = 1; \
30  while (true) { \
31  grpc::ClientContext ctx; \
32  if (deadline__ > 0) { \
33  std::chrono::system_clock::time_point deadline = \
34  std::chrono::system_clock::now() + \
35  std::chrono::seconds(deadline__); \
36  ctx.set_deadline(deadline); \
37  } \
38  const grpc::Status result__ = (expression__); \
39  if (result__.error_code() == grpc::StatusCode::UNAVAILABLE) { \
40  double sleep_time__ = \
41  (sleep_debt__ + (static_cast<double>(rand()) / RAND_MAX)); \
42  if (sleep_debt__ < (timeout__)) { \
43  sleep_debt__ *= 2; \
44  } else { \
45  LOG(WARNING) << "GRPC_BACKOFF: reached max backoff."; \
46  status__ = result__; \
47  break; \
48  } \
49  LOG(WARNING) << "GRPC_BACKOFF: transient failure, sleeping for " \
50  << sleep_time__ << " seconds."; \
51  usleep(sleep_time__ * 1000000); \
52  continue; \
53  } \
54  status__ = result__; \
55  break; \
56  } \
57  } while (0);
58 
59 template <class ServiceImpl>
60 struct BaseCall {
61  virtual ~BaseCall() {}
62 
63  virtual void Handle(ServiceImpl* service) = 0;
64 
65  class Tag {
66  public:
67  enum class State { Received, Sent, Cancelled };
68 
69  Tag(BaseCall* call, State state) : call_(call), state_(state) {}
70 
71  BaseCall* get_call() {
72  return call_;
73  }
74 
75  const State& get_state() {
76  return state_;
77  }
78 
79  void Advance(ServiceImpl* service) {
80  switch (state_) {
81  case State::Received: {
82  call_->Handle(service);
83  break;
84  }
85  case State::Sent: {
86  delete call_;
87  break;
88  }
89  case State::Cancelled: {
90  delete call_;
91  break;
92  }
93  }
94  }
95 
96  private:
97  BaseCall* call_;
98  State state_;
99  };
100 
101  std::string name;
102 };
103 
104 template <class ServiceImpl, class Request, class Reply>
105 struct Call : BaseCall<ServiceImpl> {
106  using HandleFunction =
107  void (ServiceImpl::*)(Call<ServiceImpl, Request, Reply>*);
108 
109  Call(const std::string& _name, HandleFunction _handler)
110  : handler(_handler), responder(&ctx) {
111  this->name = _name;
112  }
113 
114  void Handle(ServiceImpl* service) override {
115  (service->*handler)(this);
116  }
117 
118  void Respond(grpc::Status status) {
119  responder.Finish(reply, status, &sent_tag);
120  }
121 
122  HandleFunction handler;
123  grpc::ServerContext ctx;
124  Request request;
125  Reply reply;
126  grpc::ServerAsyncResponseWriter<Reply> responder;
127 
128  // Tags
129  using Tag = typename BaseCall<ServiceImpl>::Tag;
130  Tag received_tag{this, Tag::State::Received};
131  Tag sent_tag{this, Tag::State::Sent};
132  Tag cancelled_tag{this, Tag::State::Cancelled};
133 };
134 }
Definition: grpc.h:105
Definition: database.cpp:36
Definition: grpc.h:65
Definition: grpc.h:60