Scanner C++ API
master.h
1 /* Copyright 2016 Carnegie Mellon University
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #pragma once
17 
18 #include <grpc/support/log.h>
19 #include <grpc++/alarm.h>
20 #include "scanner/engine/rpc.grpc.pb.h"
21 #include "scanner/engine/runtime.h"
22 #include "scanner/engine/sampler.h"
23 #include "scanner/engine/dag_analysis.h"
24 #include "scanner/util/util.h"
25 #include "scanner/util/grpc.h"
26 #include "scanner/util/thread_pool.h"
27 #include "scanner/util/profiler.h"
28 
29 #include <mutex>
30 #include <thread>
31 
32 namespace scanner {
33 namespace internal {
34 
35 using JobID = i64;
36 using WorkerID = i64;
37 
38 template <class Request, class Reply>
39 using MCall = Call<MasterServerImpl, Request, Reply>;
40 
41 class MasterServerImpl final : public proto::Master::Service {
42  public:
43  MasterServerImpl(DatabaseParameters& params, const std::string& port);
44 
46 
47  void run();
48 
49  void handle_rpcs(i32 watchdog_timeout_ms = 50000);
50 
51  private:
52  void ShutdownHandler(MCall<proto::Empty, proto::Result>* call);
53 
54  // Database query methods
55  void ListTablesHandler(MCall<proto::Empty, proto::ListTablesResult>* call);
56 
57  void GetTablesHandler(
59 
60  void DeleteTablesHandler(
62 
63  void NewTableHandler(
65 
66  void GetVideoMetadataHandler(MCall<proto::GetVideoMetadataParams,
67  proto::GetVideoMetadataResult>* call);
68 
69  void IngestVideosHandler(
71 
72  // Worker methods
73  void RegisterWorkerHandler(
75 
76  void UnregisterWorkerHandler(
78 
79  void ActiveWorkersHandler(
81 
82  // Op and Kernel methods
83 
84  void GetOpInfoHandler(MCall<proto::OpInfoArgs, proto::OpInfo>* call);
85 
86  void GetSourceInfoHandler(
88 
89  void GetEnumeratorInfoHandler(
91 
92  void GetSinkInfoHandler(MCall<proto::SinkInfoArgs, proto::SinkInfo>* call);
93 
94  void LoadOpHandler(MCall<proto::OpPath, Result>* call);
95 
96  void RegisterOpHandler(MCall<proto::OpRegistration, proto::Result>* call);
97 
98  void RegisterPythonKernelHandler(
100 
101  void ListLoadedOpsHandler(MCall<proto::Empty, proto::ListLoadedOpsReply>* call);
102 
103  void ListRegisteredOpsHandler(MCall<proto::Empty, proto::ListRegisteredOpsReply>* call);
104 
105  void ListRegisteredPythonKernelsHandler(
107 
108  void NextWorkHandler(
110 
111  void FinishedWorkHandler(
113 
114  void FinishedJobHandler(MCall<proto::FinishedJobRequest, proto::Empty>* call);
115 
116  void NewJobHandler(MCall<proto::BulkJobParameters, proto::NewJobReply>* call);
117 
118  void GetJobsHandler(MCall<proto::GetJobsRequest, proto::GetJobsReply>* call);
119 
120  void GetJobStatusHandler(
122 
123  // Misc methods
124  void PingHandler(MCall<proto::Empty, proto::Empty>* call);
125 
126  void PokeWatchdogHandler(MCall<proto::Empty, proto::Empty>* call);
127 
128  // Expects context->peer() to return a string in the format
129  // ipv4:<peer_address>:<random_port>
130  // Returns the <peer_address> from the above format.
131  static std::string get_worker_address_from_grpc_context(
132  grpc::ServerContext* context);
133 
134  void recover_and_init_database();
135 
136  void start_job_processor();
137 
138  void stop_job_processor();
139 
140  bool process_job(const proto::BulkJobParameters* job_params,
141  proto::Result* job_result);
142 
143  void start_worker_pinger();
144 
145  void stop_worker_pinger();
146 
147  void start_job_on_workers(const std::vector<i32>& worker_ids);
148 
149  void stop_job_on_worker(i32 node_id);
150 
151  void remove_worker(i32 node_id);
152 
153  void blacklist_job(i64 job_id);
154 
155  void start_shutdown();
156 
157  void write_profiler(int bulk_job_id, timepoint_t job_start, timepoint_t job_end);
158 
159  DatabaseParameters db_params_;
160  const std::string port_;
161 
162  std::unique_ptr<ThreadPool> pool_;
163  std::thread pinger_thread_;
164  std::atomic<bool> pinger_active_;
165  std::condition_variable pinger_wake_cv_;
166  std::mutex pinger_wake_mutex_;
167 
168  std::atomic<std::chrono::high_resolution_clock::duration> last_watchdog_poke_;
169  Flag trigger_shutdown_;
170  grpc::Alarm* shutdown_alarm_ = nullptr;
171  storehouse::StorageBackend* storage_;
172  DatabaseMetadata meta_;
173  std::unique_ptr<TableMetaCache> table_metas_;
174  std::vector<std::string> so_paths_;
175  std::vector<proto::OpRegistration> op_registrations_;
176  std::vector<proto::PythonKernelRegistration> py_kernel_registrations_;
177 
178  // Worker state
179  std::atomic<WorkerID> next_worker_id_{0};
180 
181  struct WorkerState {
182  enum State {
183  IDLE, // Waiting for a new job
184  RUNNING_JOB, // Executing a job
185  UNREGISTERED // Unregistered and can be deleted
186  };
187 
188  WorkerState(WorkerID _id, std::unique_ptr<proto::Worker::Stub> _stub,
189  const std::string& _address)
190  : id(_id), stub(std::move(_stub)), address(_address) {}
191 
193  const WorkerID id;
195  std::atomic<State> state;
197  const std::unique_ptr<proto::Worker::Stub> stub;
199  const std::string address;
201  std::atomic<i64> failed_pings{0};
202  };
203 
205  std::map<WorkerID, std::shared_ptr<WorkerState>> workers_;
206 
207  // True if the master is executing a job
208  std::mutex active_mutex_;
209  std::condition_variable active_cv_;
210  bool active_bulk_job_ = false;
211  i32 active_bulk_job_id_ = 0;
213  proto::BulkJobParameters job_params_;
214 
215  // True if all work for job is done
216  std::mutex finished_mutex_;
217  std::condition_variable finished_cv_;
218  std::atomic<bool> finished_{true};
219 
220  std::thread job_processor_thread_;
221  // Manages modification of all of the below structures
222  std::mutex work_mutex_;
223 
224  enum struct BulkJobState {
225  RUNNING,
226  FINISHED
227  };
228 
229  struct BulkJob {
230  proto::BulkJobParameters job_params;
231  BulkJobState state;
232 
233  //============================================================================
234  // Preprocessed metadata about the supplied DAG
235  //============================================================================
236  DAGAnalysisInfo dag_info;
237  // Mapping from jobs to table ids
238  std::map<i64, std::vector<i64>> job_to_table_ids;
239  // Slice input rows for each job at each slice op
240  std::vector<std::map<i64, i64>> slice_input_rows_per_job;
241  // Output rows for each job
242  std::vector<i64> total_output_rows_per_job;
243 
244  //============================================================================
245  // Management of outstanding and completed jobs and tasks
246  //============================================================================
247  // The next job to use to generate tasks
248  i64 next_job = 0;
249  // Total number of jobs
250  i64 num_jobs = -1;
251  // Next sample index in the current task
252  i64 next_task = 0;
253  // Total samples in the current task
254  i64 num_tasks = -1;
255  // All job task output rows
256  // Job -> Task -> task output rows
257  std::vector<std::vector<std::vector<i64>>> job_tasks;
258  // Outstanding set of generated tasks that are waiting to or are being
259  // processed
260  std::set<std::tuple<i64, i64>> active_job_tasks;
261  // Queue of tasks that need to be assigned to a worker
262  std::deque<std::tuple<i64, i64>> to_assign_job_tasks;
263  // The total number of tasks that have been completed
264  std::atomic<i64> total_tasks_used{0};
265  // The total number of tasks for this bulk job
266  i64 total_tasks = 0;
267  // The total number of tasks that have been completed for each job
268  std::vector<i64> tasks_used_per_job;
269 
270  Result task_result;
271 
272  //============================================================================
273  // Assignment of tasks to workers
274  //============================================================================
275  // Tracks tasks assigned to worker so they can be reassigned if the worker
276  // fails
277  // Worker id -> (job_id, task_id)
278  std::map<i64, std::set<std::tuple<i64, i64>>> worker_job_tasks;
279  // (Worker id, job_id, task_id) -> start_time
280  std::map<std::tuple<i64, i64, i64>, double> worker_job_tasks_starts;
281  // Tracks number of times a task has been failed so that a job can be
282  // removed if it is causing consistent failures job_id -> task_id ->
283  // num_failures
284  std::map<i64, std::map<i64, i64>> job_tasks_num_failures;
285  // Tracks the jobs that have failed too many times and should be ignored
286  std::set<i64> blacklisted_jobs;
287 
288  struct WorkerHistory {
289  timepoint_t start_time;
290  timepoint_t end_time;
291  i64 tasks_assigned;
292  i64 tasks_retired;
293  };
294  std::map<i64, WorkerHistory> worker_histories;
295  std::map<i32, bool> unfinished_workers;
296  std::vector<i32> unstarted_workers;
297  std::atomic<i64> num_failed_workers{0};
298  std::vector<std::vector<i32>> job_uncommitted_tables;
299 
300  Result job_result;
301  };
302 
303  std::map<JobID, std::shared_ptr<BulkJob>> bulk_jobs_state_;
304 
305  std::unique_ptr<grpc::ServerCompletionQueue> cq_;
306  proto::Master::AsyncService service_;
307  std::unique_ptr<grpc::Server> server_;
308 
309  Profiler profiler_;
310  std::unordered_map<BaseCall<MasterServerImpl>*, timepoint_t> tag_start_times_;
311 };
312 
313 }
314 }
Definition: dag_analysis.h:43
Definition: grpc.h:105
Definition: util.h:181
Definition: profiler.h:40
Definition: database.cpp:36
Definition: metadata.h:106