Scanner C++ API
worker.h
1 /* Copyright 2016 Carnegie Mellon University
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #pragma once
17 
18 #include "scanner/engine/metadata.h"
19 #include "scanner/engine/rpc.grpc.pb.h"
20 #include "scanner/engine/runtime.h"
21 
22 #include <grpc/grpc_posix.h>
23 #include <grpc/support/log.h>
24 #include <atomic>
25 #include <thread>
26 
27 namespace scanner {
28 namespace internal {
29 
30 class WorkerImpl final : public proto::Worker::Service {
31  public:
32  WorkerImpl(DatabaseParameters& db_params, std::string master_address,
33  std::string worker_port);
34 
35  ~WorkerImpl();
36 
37  grpc::Status NewJob(grpc::ServerContext* context,
38  const proto::BulkJobParameters* job_params,
39  proto::Result* job_result);
40 
41  grpc::Status Shutdown(grpc::ServerContext* context, const proto::Empty* empty,
42  Result* result);
43 
44  grpc::Status Ping(grpc::ServerContext* context, const proto::Empty* empty,
45  proto::PingReply* reply);
46 
47  void start_watchdog(grpc::Server* server, bool enable_timeout,
48  i32 timeout_ms = 300000);
49 
50  Result register_with_master();
51 
52  private:
53  void try_unregister();
54 
55  void load_op(const proto::OpPath* op_path);
56 
57  void register_op(const proto::OpRegistration* op_registration);
58 
59  void register_python_kernel(const proto::PythonKernelRegistration* python_kernel);
60 
61  void start_job_processor();
62 
63  void stop_job_processor();
64 
65  bool process_job(const proto::BulkJobParameters* job_params,
66  proto::Result* job_result);
67 
68  enum State {
69  INITIALIZING,
70  IDLE,
71  RUNNING_JOB,
72  SHUTTING_DOWN,
73  };
74 
75  Condition<State> state_;
76  std::atomic_flag unregistered_ = ATOMIC_FLAG_INIT;
77  std::set<std::string> so_paths_;
78 
79  std::thread watchdog_thread_;
80  std::atomic<bool> watchdog_awake_;
81  std::unique_ptr<proto::Master::Stub> master_;
82  storehouse::StorageConfig* storage_config_;
83  DatabaseParameters db_params_;
84  Flag trigger_shutdown_;
85  std::string master_address_;
86  std::string worker_port_;
87  i32 node_id_ = -1;
88  storehouse::StorageBackend* storage_;
89  bool memory_pool_initialized_ = false;
90  MemoryPoolConfig cached_memory_pool_config_;
91 
92  // True if the worker is executing a job
93  std::mutex active_mutex_;
94  std::condition_variable active_cv_;
95  bool active_bulk_job_ = false;
96  i32 active_bulk_job_id_ = -1;
97  proto::BulkJobParameters job_params_;
98 
99  // True if all work for job is done
100  std::mutex finished_mutex_;
101  std::condition_variable finished_cv_;
102  std::atomic<bool> finished_{true};
103  Result job_result_;
104 
105 
106  std::thread job_processor_thread_;
107  // Manages modification of all of the below structures
108  std::mutex work_mutex_;
109 };
110 }
111 }
Definition: util.h:181
Definition: worker.h:30
Definition: database.cpp:36