18 #include "scanner/api/kernel.h" 19 #include "scanner/util/common.h" 20 #include "scanner/util/profiler.h" 21 #include "scanner/util/storehouse.h" 29 std::vector<std::string> input_columns;
30 std::vector<proto::ColumnType> input_column_types;
33 storehouse::StorageConfig* storage_config;
53 virtual void validate(proto::Result* result) { result->set_success(
true); }
71 virtual void write(
const BatchedElements& input_columns) = 0;
97 using SinkConstructor =
98 std::function<Sink*(const SinkConfig& config)>;
109 SinkBuilder(
const std::string& name, SinkConstructor constructor)
111 constructor_(constructor),
112 variadic_inputs_(
false),
113 per_element_output_(
false),
114 entire_stream_output_(
false) {}
117 if (input_columns_.size() > 0) {
118 LOG(FATAL) <<
"Sink " << name_ <<
" cannot have both fixed and variadic " 121 variadic_inputs_ =
true;
126 ColumnType type = ColumnType::Bytes) {
127 if (variadic_inputs_) {
128 LOG(FATAL) <<
"Sink " << name_ <<
" cannot have both fixed and variadic " 131 input_columns_.push_back(std::make_tuple(name, type));
135 SinkBuilder& frame_input(
const std::string& name) {
136 return input(name, ColumnType::Video);
140 if (entire_stream_output_) {
141 LOG(FATAL) <<
"Sink " << name_
142 <<
" cannot specify both per element and entire stream output";
144 per_element_output_ =
true;
149 LOG(FATAL) <<
"Entire stream output is not implemented yet.";
151 if (per_element_output_) {
152 LOG(FATAL) <<
"Sink " << name_
153 <<
" cannot specify both per element and entire stream output";
155 entire_stream_output_ =
true;
159 SinkBuilder& protobuf_name(
const std::string& name) {
160 protobuf_name_ = name;
164 SinkBuilder& stream_protobuf_name(
const std::string& name) {
165 stream_protobuf_name_ = name;
171 SinkConstructor constructor_;
172 bool variadic_inputs_;
173 std::vector<std::tuple<std::string, ColumnType>> input_columns_;
174 bool per_element_output_;
175 bool entire_stream_output_;
176 std::string protobuf_name_;
177 std::string stream_protobuf_name_;
181 #define REGISTER_SINK(name__, sink__) \ 182 REGISTER_SINK_HELPER(__COUNTER__, name__, sink__) 184 #define REGISTER_SINK_HELPER(uid__, name__, sink__) \ 185 REGISTER_SINK_UID(uid__, name__, sink__) 187 #define REGISTER_SINK_UID(uid__, name__, sink__) \ 188 static ::scanner::internal::SinkRegistration sink_registration_##uid__ \ 189 __attribute__((unused)) = ::scanner::internal::SinkBuilder( \ 190 #name__, [](const ::scanner::SinkConfig& config) { \ 191 return new sink__(config); \ virtual void finished()
When this function returns, the data for all previous 'write' calls MUST BE durably written...
Definition: sink.h:77
i32 node_id
Byte-string of proto args if given.
Definition: sink.h:32
virtual void set_profiler(Profiler *profiler)
For internal use.
Definition: sink.h:82
virtual void new_stream(const std::vector< u8 > &args)
Called when the Sink is about to process a new stream.
Definition: sink.h:62
Interface for reading data in a computation graph.
Definition: sink.h:42
Definition: profiler.h:40
Definition: database.cpp:36
virtual void validate(proto::Result *result)
Checks if Sink arguments are valid.
Definition: sink.h:53
Parameters provided at instantiation of Sink node.
Definition: sink.h:28