Scanner C++ API
evaluate_worker.h
1 /* Copyright 2016 Carnegie Mellon University
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #pragma once
17 
18 #include "scanner/engine/kernel_factory.h"
19 #include "scanner/engine/runtime.h"
20 #include "scanner/engine/sampler.h"
21 #include "scanner/util/common.h"
22 #include "scanner/util/queue.h"
23 #include "scanner/video/decoder_automata.h"
24 #include "scanner/video/video_encoder.h"
25 
26 #ifdef HAVE_HWANG
27 #include "hwang/decoder_automata.h"
28 #endif
29 
30 namespace scanner {
31 namespace internal {
32 
33 void move_if_different_address_space(Profiler& profiler,
34  DeviceHandle current_handle,
35  DeviceHandle target_handle,
36  BatchedElements& columns);
37 
41  // Uniform arguments
42  i32 node_id;
43  i32 num_cpus;
44  i32 decoder_cpus;
45  i32 work_packet_size;
46 
47  // Per worker arguments
48  i32 worker_id;
49  DeviceHandle device_handle;
50  Profiler& profiler;
51  proto::Result& result;
52 };
53 
55  public:
57 
59 
60  void feed(EvalWorkEntry& entry, bool is_first_in_task);
61 
62  bool yield(i32 item_size, EvalWorkEntry& output);
63 
64  private:
65  const i32 node_id_;
66  const i32 worker_id_;
67  const DeviceHandle device_handle_;
68  const i32 num_cpus_;
69  const i32 decoder_cpus_;
70 
71  Profiler& profiler_;
72  Result& result_;
73 
74  i32 last_job_idx_ = -1;
75 
76  DeviceHandle decoder_output_handle_;
77  std::vector<std::unique_ptr<DecoderAutomata>> decoders_;
78 #ifdef HAVE_HWANG
79  std::vector<std::unique_ptr<hwang::DecoderAutomata>> inplace_decoders_;
80 #endif
81 
82  // Continuation state
83  bool first_item_;
84  bool needs_configure_;
85  bool needs_reset_;
86  EvalWorkEntry entry_;
87  i64 current_row_;
88  i64 total_rows_;
89 
90  std::vector<std::vector<proto::DecodeArgs>> decode_args_;
91 };
92 
93 struct OpArgGroup {
94  std::vector<std::string> op_names;
95  std::vector<bool> is_source;
96  std::vector<bool> is_sink;
98  // Op -> Job -> slice
99  std::map<i64, std::vector<std::vector<proto::SamplingArgs>>> sampling_args;
101  // Op -> Job -> slice
102  std::map<i64, std::vector<i64>> slice_input_rows;
103  std::map<i64, std::vector<std::vector<i64>>> slice_output_rows;
105  // Op -> Job -> slice
106  std::map<i64, std::vector<std::vector<i64>>> unslice_input_rows;
108  std::vector<std::tuple<KernelFactory*, KernelConfig>> kernel_factories;
109  // Number of rows in the input domain for this op
110  // Op -> Job -> slice -> rows
111  std::map<i64, std::vector<std::vector<i64>>> op_input_domain_size;
112  // Op -> Job -> slice -> args
113  std::map<i64, std::vector<std::vector<std::vector<u8>>>> op_args;
114  std::vector<std::vector<std::tuple<i32, std::string>>> live_columns;
115  // Discarded after kernel use
116  std::vector<std::vector<i32>> dead_columns;
117  // Discarded immediately after kernel execute
118  std::vector<std::vector<i32>> unused_outputs;
119  // Index in columns for inputs
120  std::vector<std::vector<i32>> column_mapping;
121  // Stencil needed by kernels
122  // Op -> elemented need in stencil (e.g. [-1, 0, 1])
123  std::vector<std::vector<i32>> kernel_stencils;
124  // Batch size needed by kernels
125  std::vector<i32> kernel_batch_sizes;
126 };
127 
129  // Uniform arguments
130  i32 node_id;
131  std::mutex& startup_lock;
132  std::condition_variable& startup_cv;
133  i32& startup_count;
134  int& resources_fetched_count;
135  std::mutex& resources_fetched_lock;
136  std::condition_variable& resources_fetched_cv;
137  int num_kernel_groups;
138 
139  // Per worker arguments
140  i32 ki;
141  i32 kg;
142  OpArgGroup arg_group;
143  proto::BulkJobParameters::BoundaryCondition boundary_condition;
144 
145  Profiler& profiler;
146  proto::Result& result;
147 };
148 
149 
151  public:
152  EvaluateWorker(const EvaluateWorkerArgs& args);
153 
154  ~EvaluateWorker();
155 
156  void new_task(i64 job_idx, i64 task_idx,
157  const std::vector<TaskStream>& task_streams);
158 
159  void feed(EvalWorkEntry& entry);
160 
161  bool yield(i32 item_size, EvalWorkEntry& output);
162 
163  private:
164  void clear_stencil_cache();
165 
166  const i32 node_id_;
167  const i32 worker_id_;
168 
169  Profiler& profiler_;
170 
171  OpArgGroup arg_group_;
172  std::vector<DeviceHandle> kernel_devices_;
173  std::vector<std::vector<DeviceHandle>> kernel_input_devices_;
174  std::vector<std::vector<DeviceHandle>> kernel_output_devices_;
175  std::vector<i32> kernel_num_outputs_;
176  std::vector<std::unique_ptr<BaseKernel>> kernels_;
177 
178  // Used for computing complement of column mapping
179  std::vector<std::set<i32>> column_mapping_set_;
180 
182  i64 job_idx_;
183  i64 task_idx_;
184  i64 slice_group_;
185  std::map<i64, std::unique_ptr<Partitioner>> partitioners_;
186  std::map<i64, std::unique_ptr<DomainSampler>> domain_samplers_;
187 
188  // Inputs
189  std::vector<std::set<i64>> valid_input_rows_set_;
190  std::vector<std::vector<i64>> valid_input_rows_;
191  // Tracks which input we should expect next for which column
192  std::vector<std::vector<i64>> current_valid_input_idx_;
193 
194  // List of row ids of the outputs to compute
195  std::vector<std::set<i64>> compute_rows_set_;
196  std::vector<std::vector<i64>> compute_rows_;
197  // Tracks which index in compute_rows_ we should expect next
198  std::vector<i64> current_compute_idx_;
199 
200  // Outputs to keep
201  std::vector<std::set<i64>> valid_output_rows_set_;
202  std::vector<std::vector<i64>> valid_output_rows_;
203  // Tracks which output we should expect next
204  std::vector<i64> current_valid_output_idx_;
205 
206  // Per kernel -> per input column -> deque of element)
207  std::vector<i64> current_element_cache_input_idx_;
208  std::vector<std::vector<std::deque<Element>>> element_cache_;
209  // Per kernel -> per input column -> device handle
210  std::vector<std::vector<DeviceHandle>> element_cache_devices_;
211  // Per kernel -> per input column -> deque of row ids
212  std::vector<std::vector<std::deque<i64>>> element_cache_row_ids_;
213 
214  // Continutation state
215  EvalWorkEntry entry_;
216  i32 current_input_;
217  i32 total_inputs_;
218 
219  std::vector<DeviceHandle> final_output_handles_;
220  std::vector<std::deque<Element>> final_output_columns_;
221  std::vector<std::vector<i64>> final_row_ids_;
222 };
223 
225  std::string codec;
226  std::map<std::string, std::string> options;
227 };
228 
230  // Uniform arguments
231  i32 node_id;
232 
233  // Per worker arguments
234  i32 id;
235  Profiler& profiler;
236  // Index in columns for inputs
237  std::vector<i32> column_mapping;
238  std::vector<Column> columns;
239  std::vector<ColumnCompressionOptions> column_compression;
240 };
241 
243  public:
245 
246  void feed(EvalWorkEntry& entry);
247 
248  bool yield(EvalWorkEntry& output);
249 
250  private:
251  Profiler& profiler_;
252  std::vector<i32> column_mapping_;
253  std::vector<Column> columns_;
254  std::set<i32> column_set_;
255 
256  DeviceHandle encoder_handle_;
257  VideoEncoderType encoder_type_;
258  std::vector<std::unique_ptr<VideoEncoder>> encoders_;
259  std::vector<bool> frame_size_initialized_;
260  std::vector<bool> encoder_configured_;
261  std::vector<EncodeOptions> encode_options_;
262  std::vector<bool> compression_enabled_;
263 
264  // Generator state
265  EvalWorkEntry buffered_entry_;
266  i64 current_offset_;
267  std::deque<EvalWorkEntry> buffered_entries_;
268 };
269 }
270 }
Definition: evaluate_worker.h:242
std::map< i64, std::vector< std::vector< proto::SamplingArgs > > > sampling_args
For sampling ops.
Definition: evaluate_worker.h:99
Definition: evaluate_worker.h:224
Definition: evaluate_worker.h:54
std::map< i64, std::vector< std::vector< i64 > > > unslice_input_rows
For unslice ops.
Definition: evaluate_worker.h:106
Definition: profiler.h:40
Definition: evaluate_worker.h:150
std::vector< std::tuple< KernelFactory *, KernelConfig > > kernel_factories
For regular kernels.
Definition: evaluate_worker.h:108
Definition: evaluate_worker.h:128
Definition: common.h:53
Definition: database.cpp:36
Worker thread arguments.
Definition: evaluate_worker.h:40
Definition: evaluate_worker.h:93
Definition: evaluate_worker.h:229
Definition: runtime.h:44
std::map< i64, std::vector< i64 > > slice_input_rows
For slice ops.
Definition: evaluate_worker.h:102