18 #include <grpc/support/log.h>    19 #include <grpc++/alarm.h>    20 #include "scanner/engine/rpc.grpc.pb.h"    21 #include "scanner/engine/runtime.h"    22 #include "scanner/engine/sampler.h"    23 #include "scanner/engine/dag_analysis.h"    24 #include "scanner/util/util.h"    25 #include "scanner/util/grpc.h"    26 #include "scanner/util/thread_pool.h"    27 #include "scanner/util/profiler.h"    38 template <
class Request, 
class Reply>
    39 using MCall = Call<MasterServerImpl, Request, Reply>;
    49   void handle_rpcs(i32 watchdog_timeout_ms = 50000);
    57   void GetTablesHandler(
    60   void DeleteTablesHandler(
    66   void GetVideoMetadataHandler(
MCall<proto::GetVideoMetadataParams,
    67                                      proto::GetVideoMetadataResult>* call);
    69   void IngestVideosHandler(
    73   void RegisterWorkerHandler(
    76   void UnregisterWorkerHandler(
    79   void ActiveWorkersHandler(
    86   void GetSourceInfoHandler(
    89   void GetEnumeratorInfoHandler(
    98   void RegisterPythonKernelHandler(
   105   void ListRegisteredPythonKernelsHandler(
   108   void NextWorkHandler(
   111   void FinishedWorkHandler(
   120   void GetJobStatusHandler(
   131   static std::string get_worker_address_from_grpc_context(
   132       grpc::ServerContext* context);
   134   void recover_and_init_database();
   136   void start_job_processor();
   138   void stop_job_processor();
   140   bool process_job(
const proto::BulkJobParameters* job_params,
   141                    proto::Result* job_result);
   143   void start_worker_pinger();
   145   void stop_worker_pinger();
   147   void start_job_on_workers(
const std::vector<i32>& worker_ids);
   149   void stop_job_on_worker(i32 node_id);
   151   void remove_worker(i32 node_id);
   153   void blacklist_job(i64 job_id);
   155   void start_shutdown();
   157   void write_profiler(
int bulk_job_id, timepoint_t job_start, timepoint_t job_end);
   160   const std::string port_;
   162   std::unique_ptr<ThreadPool> pool_;
   163   std::thread pinger_thread_;
   164   std::atomic<bool> pinger_active_;
   165   std::condition_variable pinger_wake_cv_;
   166   std::mutex pinger_wake_mutex_;
   168   std::atomic<std::chrono::high_resolution_clock::duration> last_watchdog_poke_;
   169   Flag trigger_shutdown_;
   170   grpc::Alarm* shutdown_alarm_ = 
nullptr;
   171   storehouse::StorageBackend* storage_;
   173   std::unique_ptr<TableMetaCache> table_metas_;
   174   std::vector<std::string> so_paths_;
   175   std::vector<proto::OpRegistration> op_registrations_;
   176   std::vector<proto::PythonKernelRegistration> py_kernel_registrations_;
   179   std::atomic<WorkerID> next_worker_id_{0};
   188     WorkerState(WorkerID _id, std::unique_ptr<proto::Worker::Stub> _stub,
   189                 const std::string& _address)
   190         : id(_id), stub(std::move(_stub)), address(_address) {}
   195     std::atomic<State> state;
   197     const std::unique_ptr<proto::Worker::Stub> stub;
   199     const std::string address;
   201     std::atomic<i64> failed_pings{0};
   205   std::map<WorkerID, std::shared_ptr<WorkerState>> workers_;
   208   std::mutex active_mutex_;
   209   std::condition_variable active_cv_;
   210   bool active_bulk_job_ = 
false;
   211   i32 active_bulk_job_id_ = 0;
   213   proto::BulkJobParameters job_params_;
   216   std::mutex finished_mutex_;
   217   std::condition_variable finished_cv_;
   218   std::atomic<bool> finished_{
true};
   220   std::thread job_processor_thread_;
   222   std::mutex work_mutex_;
   224   enum struct BulkJobState {
   230     proto::BulkJobParameters job_params;
   238     std::map<i64, std::vector<i64>> job_to_table_ids;
   240     std::vector<std::map<i64, i64>> slice_input_rows_per_job;
   242     std::vector<i64> total_output_rows_per_job;
   257     std::vector<std::vector<std::vector<i64>>> job_tasks;
   260     std::set<std::tuple<i64, i64>> active_job_tasks;
   262     std::deque<std::tuple<i64, i64>> to_assign_job_tasks;
   264     std::atomic<i64> total_tasks_used{0};
   268     std::vector<i64> tasks_used_per_job;
   278     std::map<i64, std::set<std::tuple<i64, i64>>> worker_job_tasks;
   280     std::map<std::tuple<i64, i64, i64>, 
double> worker_job_tasks_starts;
   284     std::map<i64, std::map<i64, i64>> job_tasks_num_failures;
   286     std::set<i64> blacklisted_jobs;
   289       timepoint_t start_time;
   290       timepoint_t end_time;
   294     std::map<i64, WorkerHistory> worker_histories;
   295     std::map<i32, bool> unfinished_workers;
   296     std::vector<i32> unstarted_workers;
   297     std::atomic<i64> num_failed_workers{0};
   298     std::vector<std::vector<i32>> job_uncommitted_tables;
   303   std::map<JobID, std::shared_ptr<BulkJob>> bulk_jobs_state_;
   305   std::unique_ptr<grpc::ServerCompletionQueue> cq_;
   306   proto::Master::AsyncService service_;
   307   std::unique_ptr<grpc::Server> server_;
   310   std::unordered_map<BaseCall<MasterServerImpl>*, timepoint_t> tag_start_times_;
 Definition: dag_analysis.h:43
Definition: profiler.h:40
Definition: database.cpp:36