Scanner C++ API
dag_analysis.h
1 /* Copyright 2016 Carnegie Mellon University
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #pragma once
17 
18 #include "scanner/engine/metadata.h"
19 #include "scanner/engine/table_meta_cache.h"
20 #include "scanner/engine/runtime.h"
21 
22 #include <deque>
23 
24 namespace scanner {
25 namespace internal {
26 
27 const std::string INPUT_OP_NAME = "Input";
28 const std::string OUTPUT_OP_NAME = "OutputTable";
29 const std::string SAMPLE_OP_NAME = "Sample";
30 const std::string SPACE_OP_NAME = "Space";
31 const std::string SLICE_OP_NAME = "Slice";
32 const std::string UNSLICE_OP_NAME = "Unslice";
33 
34 const std::vector<std::string> BUILTIN_OP_NAMES = {
35  SAMPLE_OP_NAME,
36  SPACE_OP_NAME,
37  SLICE_OP_NAME,
38  UNSLICE_OP_NAME,
39 };
40 
41 bool is_builtin_op(const std::string& name);
42 
44  bool has_table_output{false};
45 
46  std::vector<i32> op_slice_level;
47  std::map<i64, i64> source_ops;
48  std::map<i64, i64> sink_ops;
49  std::map<i64, i64> slice_ops;
50  std::map<i64, i64> unslice_ops;
51  std::map<i64, i64> sampling_ops;
52  std::map<i64, std::vector<i64>> op_children;
53  std::set<i64> column_sink_ops;
54 
55  // job -> op -> table
56  // We need to remember the column sink table names because
57  // they are used by the master to pre-create the tables
58  std::vector<std::map<i64, std::string>> column_sink_table_names;
59 
60  // Input rows to slice Ops per Job
61  std::vector<std::map<i64, i64>> slice_input_rows;
62  // Job -> Op -> Slice
63  std::vector<std::map<i64, std::vector<i64>>> slice_output_rows;
64  // Input rows to unslice Ops per Job
65  // Job -> Op -> Slice
66  std::vector<std::map<i64, std::vector<i64>>> unslice_input_rows;
67  // Total rows for each ops domain
68  // Job -> Op -> Slice
69  std::vector<std::map<i64, std::vector<i64>>> total_rows_per_op;
70  // Total output rows per Job
71  std::vector<i64> total_output_rows;
72 
73  std::map<i64, bool> bounded_state_ops;
74  std::map<i64, bool> unbounded_state_ops;
75  std::map<i64, i32> warmup_sizes;
76  std::map<i64, i32> batch_sizes;
77  std::map<i64, std::vector<i32>> stencils;
78 
79  // Filled in by remap_input_op_edges
80  std::map<i64, i64> input_ops_to_first_op_columns;
81  // Filled in by remap_sink_op_edges
82  std::map<i64, i64> sink_ops_to_last_op_columns;
83 
84  // Op -> Elements
85  std::vector<std::vector<std::tuple<i32, std::string>>> live_columns;
86  std::vector<std::vector<i32>> dead_columns;
87  std::vector<std::vector<i32>> unused_outputs;
88  std::vector<std::vector<i32>> column_mapping;
89 };
90 
91 
92 Result validate_jobs_and_ops(
93  DatabaseMetadata& meta, TableMetaCache& table_metas,
94  const std::vector<proto::Job>& jobs,
95  const std::vector<proto::Op>& ops,
96  DAGAnalysisInfo& info);
97 
98 Result determine_input_rows_to_slices(
99  DatabaseMetadata& meta, TableMetaCache& table_metas,
100  const std::vector<proto::Job>& jobs,
101  const std::vector<proto::Op>& ops,
102  DAGAnalysisInfo& info,
103  storehouse::StorageConfig* storage_config);
104 
105 Result derive_slice_final_output_rows(
106  const proto::Job& job,
107  const std::vector<proto::Op>& ops,
108  i64 slice_op_idx,
109  i64 slice_input_rows,
110  DAGAnalysisInfo& info,
111  std::vector<i64>& slice_output_partition);
112 
113 void populate_analysis_info(
114  const std::vector<proto::Job>& jobs, const std::vector<proto::Op>& ops,
115  DAGAnalysisInfo& info);
116 
117 // Change all edges from input Ops to instead come from the first Op.
118 // We currently only implement IO at the start and end of a pipeline.
119 void remap_input_op_edges(std::vector<proto::Op>& ops,
120  DAGAnalysisInfo& info);
121 
122 // Change all edges to sink Ops to instead go to the last sink Op.
123 // We currently only implement IO at the start and end of a pipeline.
124 void remap_sink_op_edges(std::vector<proto::Op>& ops, DAGAnalysisInfo& info);
125 
126 void perform_liveness_analysis(const std::vector<proto::Op>& ops,
127  DAGAnalysisInfo& info);
128 
129 Result derive_stencil_requirements(
130  const DatabaseMetadata& meta, TableMetaCache& table_meta,
131  const proto::Job& job, const std::vector<proto::Op>& ops,
132  const DAGAnalysisInfo& analysis_results,
133  proto::BulkJobParameters::BoundaryCondition boundary_condition,
134  i64 job_idx, i64 task_idx,
135  const std::vector<i64>& output_rows, LoadWorkEntry& output_entry,
136  std::deque<TaskStream>& task_streams, storehouse::StorageConfig* storage_config);
137 
138 // Result derive_input_rows_from_output_rows(
139 // const std::vector<proto::Job>& jobs,
140 // const std::vector<proto::Op>& ops,
141 // const std::vector<std::vector<i64>>& output_rows,
142 // DAGAnalysisInfo& info,
143 // std::vector<std::vector<i64>>& input_rows);
144 }
145 }
Definition: dag_analysis.h:43
Definition: database.cpp:36
Definition: table_meta_cache.h:26
Definition: metadata.h:106