Scanner C++ API
sink.h
1 /* Copyright 2017 Carnegie Mellon University
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #pragma once
17 
18 #include "scanner/api/kernel.h"
19 #include "scanner/util/common.h"
20 #include "scanner/util/profiler.h"
21 #include "scanner/util/storehouse.h"
22 
23 #include <vector>
24 
25 namespace scanner {
26 
28 struct SinkConfig {
29  std::vector<std::string> input_columns;
30  std::vector<proto::ColumnType> input_column_types;
31  std::vector<u8> args;
32  i32 node_id;
33  storehouse::StorageConfig* storage_config;
34 };
35 
42 class Sink {
43  public:
44  Sink(const SinkConfig& config) {}
45 
46  virtual ~Sink(){};
47 
53  virtual void validate(proto::Result* result) { result->set_success(true); }
54 
62  virtual void new_stream(const std::vector<u8>& args) {};
63 
71  virtual void write(const BatchedElements& input_columns) = 0;
72 
77  virtual void finished() {};
78 
82  virtual void set_profiler(Profiler* profiler) { profiler_ = profiler; }
83 
88  Profiler* profiler_ = nullptr;
89 };
90 
93 namespace internal {
94 
95 class SinkBuilder;
96 
97 using SinkConstructor =
98  std::function<Sink*(const SinkConfig& config)>;
99 
101  public:
102  SinkRegistration(const SinkBuilder& builder);
103 };
104 
105 class SinkBuilder {
106  public:
107  friend class SinkRegistration;
108 
109  SinkBuilder(const std::string& name, SinkConstructor constructor)
110  : name_(name),
111  constructor_(constructor),
112  variadic_inputs_(false),
113  per_element_output_(false),
114  entire_stream_output_(false) {}
115 
116  SinkBuilder& variadic_inputs() {
117  if (input_columns_.size() > 0) {
118  LOG(FATAL) << "Sink " << name_ << " cannot have both fixed and variadic "
119  << "inputs";
120  }
121  variadic_inputs_ = true;
122  return *this;
123  }
124 
125  SinkBuilder& input(const std::string& name,
126  ColumnType type = ColumnType::Bytes) {
127  if (variadic_inputs_) {
128  LOG(FATAL) << "Sink " << name_ << " cannot have both fixed and variadic "
129  << "inputs";
130  }
131  input_columns_.push_back(std::make_tuple(name, type));
132  return *this;
133  }
134 
135  SinkBuilder& frame_input(const std::string& name) {
136  return input(name, ColumnType::Video);
137  }
138 
139  SinkBuilder& per_element_output() {
140  if (entire_stream_output_) {
141  LOG(FATAL) << "Sink " << name_
142  << " cannot specify both per element and entire stream output";
143  }
144  per_element_output_ = true;
145  return *this;
146  }
147 
148  SinkBuilder& entire_stream_output() {
149  LOG(FATAL) << "Entire stream output is not implemented yet.";
150 
151  if (per_element_output_) {
152  LOG(FATAL) << "Sink " << name_
153  << " cannot specify both per element and entire stream output";
154  }
155  entire_stream_output_ = true;
156  return *this;
157  }
158 
159  SinkBuilder& protobuf_name(const std::string& name) {
160  protobuf_name_ = name;
161  return *this;
162  }
163 
164  SinkBuilder& stream_protobuf_name(const std::string& name) {
165  stream_protobuf_name_ = name;
166  return *this;
167  }
168 
169  private:
170  std::string name_;
171  SinkConstructor constructor_;
172  bool variadic_inputs_;
173  std::vector<std::tuple<std::string, ColumnType>> input_columns_;
174  bool per_element_output_;
175  bool entire_stream_output_;
176  std::string protobuf_name_;
177  std::string stream_protobuf_name_;
178 };
179 }
180 
181 #define REGISTER_SINK(name__, sink__) \
182  REGISTER_SINK_HELPER(__COUNTER__, name__, sink__)
183 
184 #define REGISTER_SINK_HELPER(uid__, name__, sink__) \
185  REGISTER_SINK_UID(uid__, name__, sink__)
186 
187 #define REGISTER_SINK_UID(uid__, name__, sink__) \
188  static ::scanner::internal::SinkRegistration sink_registration_##uid__ \
189  __attribute__((unused)) = ::scanner::internal::SinkBuilder( \
190  #name__, [](const ::scanner::SinkConfig& config) { \
191  return new sink__(config); \
192  })
193 }
virtual void finished()
When this function returns, the data for all previous &#39;write&#39; calls MUST BE durably written...
Definition: sink.h:77
i32 node_id
Byte-string of proto args if given.
Definition: sink.h:32
virtual void set_profiler(Profiler *profiler)
For internal use.
Definition: sink.h:82
virtual void new_stream(const std::vector< u8 > &args)
Called when the Sink is about to process a new stream.
Definition: sink.h:62
Interface for reading data in a computation graph.
Definition: sink.h:42
Definition: profiler.h:40
Definition: sink.h:105
Definition: database.cpp:36
virtual void validate(proto::Result *result)
Checks if Sink arguments are valid.
Definition: sink.h:53
Parameters provided at instantiation of Sink node.
Definition: sink.h:28