Source code for scannerpy.source

import grpc
import copy

from scannerpy.common import *
from scannerpy.op import OpColumn, collect_per_stream_args, check_modules
from scannerpy.protobufs import python_to_proto, protobufs


[docs]class Source: def __init__(self, sc, name, enumerator_args, source_args={}): self._sc = sc self._name = name self._args = source_args self._job_args = enumerator_args source_info = self._sc._get_source_info(self._name) cols = source_info.output_columns outputs = [OpColumn(self._sc, self, c.name, c.type) for c in cols] self._outputs = outputs self._inputs = [OpColumn(self._sc, None, c.name, c.type) for c in cols] # For the ColumnSource, we insert the storage config to allow the source # to read from the database if name == 'FrameColumn' or name == 'Column': sc = self._sc.config.config['storage'] def check_and_add(key): if key in sc: self._args[key] = sc[key] self._args['storage_type'] = sc['type'] check_and_add('bucket') check_and_add('region') check_and_add('endpoint') if 'load_sparsity_threshold' not in self._args: self._args['load_sparsity_threshold'] = 8
[docs] def outputs(self): if len(self._outputs) == 1: return self._outputs[0] else: return tuple(self._outputs)
[docs] def to_proto(self, indices): e = protobufs.Op() e.name = self._name e.is_source = True inp = e.inputs.add() inp.column = self._inputs[0]._col inp.op_index = -1 if isinstance(self._args, dict): # To convert an arguments dict, we search for a protobuf with the # name {Name}SourceArgs (e.g. ColumnSourceArgs) and the name # {Name}EnumeratorArgs (e.g. ColumnEnumeratorArgs) in the # args.proto module, and fill that in with keys from the args dict. if len(self._args) > 0: source_info = self._sc._get_source_info(self._name) if len(source_info.protobuf_name) > 0: proto_name = source_info.protobuf_name e.kernel_args = python_to_proto(proto_name, self._args) else: e.kernel_args = self._args else: # If arguments are a protobuf object, serialize it directly e.kernel_args = self._args.SerializeToString() return e
[docs]class SourceGenerator: """ Creates Source instances to define a computation. When a particular Source is requested from the generator, e.g. `sc.source.Sequence`, the generator does a dynamic lookup for the Source in the servers registry. """ def __init__(self, sc): self._sc = sc def __getattr__(self, name): check_modules(self._sc) # Use Sequence as alias of Column if name == 'Sequence' or name == 'FrameSequence': name = name.replace('Sequence', 'Column') # This will raise an exception if the source does not exist. source_info = self._sc._get_source_info(name) enumerator_info = self._sc._get_enumerator_info(name) def make_source(*args, **kwargs): enumerator_args = collect_per_stream_args(name, enumerator_info.protobuf_name, kwargs) source = Source(self._sc, name, enumerator_args, kwargs) return source.outputs() return make_source