18 #include <grpc/support/log.h> 19 #include <grpc++/alarm.h> 20 #include "scanner/engine/rpc.grpc.pb.h" 21 #include "scanner/engine/runtime.h" 22 #include "scanner/engine/sampler.h" 23 #include "scanner/engine/dag_analysis.h" 24 #include "scanner/util/util.h" 25 #include "scanner/util/grpc.h" 26 #include "scanner/util/thread_pool.h" 27 #include "scanner/util/profiler.h" 38 template <
class Request,
class Reply>
39 using MCall = Call<MasterServerImpl, Request, Reply>;
49 void handle_rpcs(i32 watchdog_timeout_ms = 50000);
57 void GetTablesHandler(
60 void DeleteTablesHandler(
66 void GetVideoMetadataHandler(
MCall<proto::GetVideoMetadataParams,
67 proto::GetVideoMetadataResult>* call);
69 void IngestVideosHandler(
73 void RegisterWorkerHandler(
76 void UnregisterWorkerHandler(
79 void ActiveWorkersHandler(
86 void GetSourceInfoHandler(
89 void GetEnumeratorInfoHandler(
98 void RegisterPythonKernelHandler(
105 void ListRegisteredPythonKernelsHandler(
108 void NextWorkHandler(
111 void FinishedWorkHandler(
120 void GetJobStatusHandler(
131 static std::string get_worker_address_from_grpc_context(
132 grpc::ServerContext* context);
134 void recover_and_init_database();
136 void start_job_processor();
138 void stop_job_processor();
140 bool process_job(
const proto::BulkJobParameters* job_params,
141 proto::Result* job_result);
143 void start_worker_pinger();
145 void stop_worker_pinger();
147 void start_job_on_workers(
const std::vector<i32>& worker_ids);
149 void stop_job_on_worker(i32 node_id);
151 void remove_worker(i32 node_id);
153 void blacklist_job(i64 job_id);
155 void start_shutdown();
157 void write_profiler(
int bulk_job_id, timepoint_t job_start, timepoint_t job_end);
160 const std::string port_;
162 std::unique_ptr<ThreadPool> pool_;
163 std::thread pinger_thread_;
164 std::atomic<bool> pinger_active_;
165 std::condition_variable pinger_wake_cv_;
166 std::mutex pinger_wake_mutex_;
168 std::atomic<std::chrono::high_resolution_clock::duration> last_watchdog_poke_;
169 Flag trigger_shutdown_;
170 grpc::Alarm* shutdown_alarm_ =
nullptr;
171 storehouse::StorageBackend* storage_;
173 std::unique_ptr<TableMetaCache> table_metas_;
174 std::vector<std::string> so_paths_;
175 std::vector<proto::OpRegistration> op_registrations_;
176 std::vector<proto::PythonKernelRegistration> py_kernel_registrations_;
179 std::atomic<WorkerID> next_worker_id_{0};
188 WorkerState(WorkerID _id, std::unique_ptr<proto::Worker::Stub> _stub,
189 const std::string& _address)
190 : id(_id), stub(std::move(_stub)), address(_address) {}
195 std::atomic<State> state;
197 const std::unique_ptr<proto::Worker::Stub> stub;
199 const std::string address;
201 std::atomic<i64> failed_pings{0};
205 std::map<WorkerID, std::shared_ptr<WorkerState>> workers_;
208 std::mutex active_mutex_;
209 std::condition_variable active_cv_;
210 bool active_bulk_job_ =
false;
211 i32 active_bulk_job_id_ = 0;
213 proto::BulkJobParameters job_params_;
216 std::mutex finished_mutex_;
217 std::condition_variable finished_cv_;
218 std::atomic<bool> finished_{
true};
220 std::thread job_processor_thread_;
222 std::mutex work_mutex_;
224 enum struct BulkJobState {
230 proto::BulkJobParameters job_params;
238 std::map<i64, std::vector<i64>> job_to_table_ids;
240 std::vector<std::map<i64, i64>> slice_input_rows_per_job;
242 std::vector<i64> total_output_rows_per_job;
257 std::vector<std::vector<std::vector<i64>>> job_tasks;
260 std::set<std::tuple<i64, i64>> active_job_tasks;
262 std::deque<std::tuple<i64, i64>> to_assign_job_tasks;
264 std::atomic<i64> total_tasks_used{0};
268 std::vector<i64> tasks_used_per_job;
278 std::map<i64, std::set<std::tuple<i64, i64>>> worker_job_tasks;
280 std::map<std::tuple<i64, i64, i64>,
double> worker_job_tasks_starts;
284 std::map<i64, std::map<i64, i64>> job_tasks_num_failures;
286 std::set<i64> blacklisted_jobs;
289 timepoint_t start_time;
290 timepoint_t end_time;
294 std::map<i64, WorkerHistory> worker_histories;
295 std::map<i32, bool> unfinished_workers;
296 std::vector<i32> unstarted_workers;
297 std::atomic<i64> num_failed_workers{0};
298 std::vector<std::vector<i32>> job_uncommitted_tables;
303 std::map<JobID, std::shared_ptr<BulkJob>> bulk_jobs_state_;
305 std::unique_ptr<grpc::ServerCompletionQueue> cq_;
306 proto::Master::AsyncService service_;
307 std::unique_ptr<grpc::Server> server_;
310 std::unordered_map<BaseCall<MasterServerImpl>*, timepoint_t> tag_start_times_;
Definition: dag_analysis.h:43
Definition: profiler.h:40
Definition: database.cpp:36