import grpc
import copy
import pickle
import types as pytypes
import uuid
import collections
from scannerpy.common import *
from scannerpy.protobufs import python_to_proto, protobufs, analyze_proto
from scannerpy import types as scannertypes
from typing import Dict, List, Union, Tuple, Optional, Sequence
from inspect import signature
from itertools import islice
from collections import OrderedDict
from functools import wraps
[docs]class SliceList(list):
"""List of per-stream arguments to each slice of an op."""
pass
[docs]def collect_per_stream_args(name, protobuf_name, kwargs):
stream_arg_names = list(analyze_proto(getattr(protobufs, protobuf_name)).keys())
stream_args = {k: kwargs.pop(k) for k in stream_arg_names if k in kwargs}
if len(stream_args) == 0:
raise ScannerException(
"Op `{}` received no per-stream arguments. Options: {}" \
.format(name, ', '.join(stream_args)))
N = [len(v) for v in stream_args.values() if isinstance(v, list)][0]
job_args = [
python_to_proto(protobuf_name, {
k: v[i] if isinstance(v, list) else v
for k, v in stream_args.items()
if v is not None
})
for i in range(N)
]
return job_args
[docs]class OpColumn:
def __init__(self, sc, op, col, typ):
self._sc = sc
self._op = op
self._col = col
self._type = typ
self._encode_options = None
if self._type == protobufs.Video:
self._encode_options = {'codec': 'default'}
[docs] def compress(self, codec='video', **kwargs):
self._assert_is_video()
codecs = {
'video': self.compress_video,
'default': self.compress_default,
'raw': self.lossless
}
if codec in codecs:
return codecs[codec](self, **kwargs)
else:
raise ScannerException('Compression codec {} not currently '
'supported. Available codecs are: {}.'
.format(' '.join(list(codecs.keys()))))
[docs] def compress_video(self, quality=-1, bitrate=-1, keyframe_distance=-1):
self._assert_is_video()
encode_options = {
'codec': 'h264',
'quality': quality,
'bitrate': bitrate,
'keyframe_distance': keyframe_distance
}
return self._new_compressed_column(encode_options)
[docs] def lossless(self):
self._assert_is_video()
encode_options = {'codec': 'raw'}
return self._new_compressed_column(encode_options)
[docs] def compress_default(self):
self._assert_is_video()
encode_options = {'codec': 'default'}
return self._new_compressed_column(encode_options)
def _assert_is_video(self):
if self._type != protobufs.Video:
raise ScannerException('Compression only supported for sequences of'
'type "video". Sequence {} type is {}.'.format(
self._col,
protobufs.ColumnType.Name(
self._type)))
def _new_compressed_column(self, encode_options):
new_col = OpColumn(self._sc, self._op, self._col, self._type)
new_col._encode_options = encode_options
return new_col
MODULE_REGISTRY = set()
[docs]def register_module(so_path: str, proto_path: str = None):
MODULE_REGISTRY.add((so_path, proto_path))
[docs]def check_modules(sc):
unregistered_modules = MODULE_REGISTRY - sc._modules
for module in unregistered_modules:
sc.load_op(*module)
PYTHON_OP_REGISTRY = {}
[docs]class OpGenerator:
"""
Creates Op instances to define a computation.
When a particular op is requested from the generator, e.g.
`sc.ops.Histogram`, the generator does a dynamic lookup for the
op in a C++ registry.
"""
def __init__(self, sc):
self._sc = sc
def __getattr__(self, name):
check_modules(self._sc)
stream_params = []
orig_name = name
# Check python registry for Op
if name in PYTHON_OP_REGISTRY:
py_op_info = PYTHON_OP_REGISTRY[name]
stream_params = py_op_info['stream_params']
# If Op has not been registered yet, register it
pseudo_name = name + ':' + py_op_info['registration_id']
name = pseudo_name
if not name in self._sc._python_ops:
devices = []
if py_op_info['device_type']:
devices.append(py_op_info['device_type'])
if py_op_info['device_sets']:
for d in py_op_info['device_sets']:
devices.append(d[0])
self._sc.register_op(
pseudo_name, py_op_info['input_columns'],
py_op_info['output_columns'], py_op_info['variadic_inputs'],
py_op_info['stencil'], py_op_info['unbounded_state'],
py_op_info['bounded_state'], py_op_info['proto_path'])
for device in devices:
self._sc.register_python_kernel(pseudo_name, device,
py_op_info['kernel'],
py_op_info['batch'])
# This will raise an exception if the op does not exist.
op_info = self._sc._get_op_info(name)
if (not orig_name in PYTHON_OP_REGISTRY and
len(op_info.stream_protobuf_name) > 0):
stream_proto = getattr(protobufs, op_info.stream_protobuf_name)
for proto_field_name, _ in analyze_proto(stream_proto).items():
stream_params.append(proto_field_name)
def make_op(*args, **kwargs):
inputs = []
if op_info.variadic_inputs:
inputs.extend(args)
else:
for c in op_info.input_columns:
val = kwargs.pop(c.name, None)
if val is None:
raise ScannerException(
'Op {} required sequence {} as input'.format(
orig_name, c.name))
inputs.append(val)
device = kwargs.pop('device', DeviceType.CPU)
batch = kwargs.pop('batch', -1)
bounded_state = kwargs.pop('bounded_state', -1)
stencil = kwargs.pop('stencil', [])
extra = kwargs.pop('extra', None)
args = kwargs.pop('args', None)
if len(stream_params) > 0:
stream_args = [(k, kwargs.pop(k, None)) for k in stream_params
if k in kwargs]
if len(stream_args) == 0:
raise ScannerException(
"No arguments provided to op `{}` for stream parameters."
.format(orig_name))
for e in stream_args:
if not isinstance(e[1], list):
raise ScannerException(
"The argument `{}` to op `{}` is a stream config argument and must be a list."
.format(e[0], orig_name))
example_list = stream_args[0][1]
N = len(example_list)
if not isinstance(example_list[0], SliceList):
stream_args = [
(k, [SliceList([x]) for x in arg]) for (k, arg) in stream_args]
M = len(stream_args[0][1][0])
if orig_name in PYTHON_OP_REGISTRY:
stream_args = [
SliceList([pickle.dumps({k: v[i][j] for k, v in stream_args})
for j in range(M)])
for i in range(N)
]
else:
stream_args = [
SliceList([python_to_proto(op_info.stream_protobuf_name,
{k: v[i][j] for k, v in stream_args})
for j in range(M)])
for i in range(N)
]
else:
stream_args = None
op = Op(self._sc, name, inputs, device, batch, bounded_state,
stencil, kwargs if args is None else args, extra, stream_args)
return op.outputs()
return make_op
[docs]class Op:
def __init__(self,
sc,
name,
inputs,
device,
batch=-1,
warmup=-1,
stencil=[0],
args={},
extra=None,
stream_args=None):
self._sc = sc
self._name = name
self._inputs = inputs
self._device = device
self._batch = batch
self._warmup = warmup
self._stencil = stencil
self._args = args
self._extra = extra
self._job_args = stream_args
if (name == 'Space' or name == 'Sample' or name == 'Slice'
or name == 'Unslice'):
outputs = []
for c in inputs:
outputs.append(OpColumn(sc, self, c._col, c._type))
else:
cols = self._sc._get_output_columns(self._name)
outputs = [OpColumn(self._sc, self, c.name, c.type) for c in cols]
self._outputs = outputs
[docs] def outputs(self):
if len(self._outputs) == 1:
return self._outputs[0]
else:
return tuple(self._outputs)
[docs] def to_proto(self, indices):
e = protobufs.Op()
e.name = self._name
e.device_type = DeviceType.to_proto(protobufs, self._device)
e.stencil.extend(self._stencil)
e.batch = self._batch
e.warmup = self._warmup
if e.name == "Input":
inp = e.inputs.add()
inp.column = self._inputs[0]._col
inp.op_index = -1
else:
for i in self._inputs:
inp = e.inputs.add()
idx = indices[i._op] if i._op is not None else -1
inp.op_index = idx
inp.column = i._col
if isinstance(self._args, dict):
if self._name in self._sc._python_ops:
e.kernel_args = pickle.dumps(self._args)
elif len(self._args) > 0:
# To convert an arguments dict, we search for a protobuf with the
# name {Op}Args (e.g. BlurArgs, HistogramArgs) in the
# args.proto module, and fill that in with keys from the args dict.
op_info = self._sc._get_op_info(self._name)
if len(op_info.protobuf_name) > 0:
proto_name = op_info.protobuf_name
e.kernel_args = python_to_proto(proto_name, self._args)
else:
e.kernel_args = self._args
else:
# If arguments are a protobuf object, serialize it directly
e.kernel_args = self._args.SerializeToString()
return e
[docs]def register_python_op(name: str = None,
stencil: List[int] = None,
unbounded_state: bool = False,
bounded_state: int = None,
device_type: DeviceType = None,
device_sets: List[Tuple[DeviceType, int]] = None,
batch: int = 1,
proto_path: str = None):
r"""Class or function decorator which registers a new Op and Kernel with the
Scanner master.
Parameters
----------
name
Optional name for the Op. By default, it will be inferred as the name of the
decorated class/kernel.
stencil
Specifies the default stencil to use for the Op. If none, indicates
that the the Op does not have the ability to stencil. A stencil of
[0] should be specified if the Op can stencil but should not by
default.
unbounded_state
If true, indicates that the Op needs to see all previous elements
of its input sequences before it can compute a given element. For
example, to compute output element at index 100, the Op must have
already produced elements 0-99. This option is mutually exclusive
with `bounded_state`.
bounded_state
If true, indicates that the Op needs to see all previous elements
of its input sequences before it can compute a given element. For
example, to compute output element at index 100, the Op must have
already produced elements 0-99. This option is mutually exclusive
with `bounded_state`.
device_type
device_sets
batch
proto_path
Optional path to the proto file that describes the configuration
arguments to this Op.
"""
def dec(fn_or_class):
is_fn = False
if isinstance(fn_or_class, pytypes.FunctionType) or isinstance(
fn_or_class, pytypes.BuiltinFunctionType):
is_fn = True
if name is None:
# Infer name from fn_or_class name
kname = fn_or_class.__name__
else:
kname = name
can_stencil = stencil is not None
can_batch = batch > 1
# Get execute function to determine input and output types
if is_fn:
exec_fn = fn_or_class
else:
exec_fn = getattr(fn_or_class, "execute", None)
if not callable(exec_fn):
raise ScannerException(
('Attempted to register Python Op with name {:s}, but that '
'provided class has no "execute" method.').format(kname))
input_columns = []
has_variadic_inputs = False
variadic_inputs_type = None
sig = signature(exec_fn)
fn_params = sig.parameters
if is_fn:
# If this is a fn kernel, then the first argument should be `config`
fn_params = OrderedDict(islice(fn_params.items(), 1, None))
else:
# If this is a class kernel, then first argument should be self
fn_params = OrderedDict(islice(fn_params.items(), 1, None))
def parse_tuple(typ):
is_tuple = True
origin_type = getattr(typ, '__origin__', None)
if (origin_type == Tuple or origin_type == tuple or
getattr(typ, '__tuple_params__', None)):
if getattr(typ, '__tuple_params__', None):
# Python 3.5
use_ellipsis = typ.__tuple_use_ellipsis__
tuple_params = typ.__tuple_params__
elif getattr(typ, '__args__', None):
# Python 3.6+
use_ellipsis = typ.__args__[-1] is Ellipsis
tuple_params = typ.__args__[:-1 if use_ellipsis else None]
else:
raise ScannerException('This should not happen...')
else:
use_ellipsis = False
is_tuple = False
tuple_params = [typ]
return is_tuple, use_ellipsis, tuple_params
def parse_annotation_to_column_type(typ, is_input=False):
if can_batch:
# If the op can batch, then we expect the types to be
# Sequence[T], where T = {bytes, FrameType}
if (not getattr(typ, '__origin__', None) or
(typ.__origin__ != Sequence and
typ.__origin__ != collections.abc.Sequence)):
raise ScannerException(
('A batched Op must specify a "Sequence" type '
'annotation for each input and output.'))
typ = typ.__args__[0]
if is_input and can_stencil:
# If the op can stencil, then we expect the input types to be
# Sequence[T], where T = {bytes, FrameType}
if (not getattr(typ, '__origin__', None) or
(typ.__origin__ != Sequence and
typ.__origin__ != collections.abc.Sequence)):
raise ScannerException(
('A stenciled Op must specify a "Sequence" type '
'annotation for each input. If the Op both stencils '
'and batches, then it should have the type '
'"Sequence[Sequence[T]], where T = {bytes, FrameType}.'
))
typ = typ.__args__[0]
if typ == scannertypes.FrameType:
column_type = ColumnType.Video
elif typ == bytes:
column_type = ColumnType.Blob
else:
# For now, all non-FrameType types are equivalent to bytes.
column_type = ColumnType.Blob
return column_type, typ
# Analyze exec_fn parameters to determine the input types
for param_name, param in fn_params.items():
# We only allow keyword arguments and *args.
# There is no support currently for positional or **kwargs
kind = param.kind
if (kind == param.POSITIONAL_ONLY or kind == param.VAR_KEYWORD):
raise ScannerException(
('Positional arguments and **kwargs are currently not '
'supported for the "execute" method of kernels'))
if kind == param.VAR_POSITIONAL:
# This means we have variadic inputs
has_variadic_inputs = True
if len(fn_params) > 1:
raise ScannerException(
('Variadic positional inputs (*args) are not supported '
'when used with other inputs.'))
# Get tuple type
typ = param.annotation
is_tuple, use_ellipsis, tuple_params = parse_tuple(typ)
annotation_message = (
'Variadic positional inputs (*args) must be annotated as '
'"args: Tuple[Type, ...]" or "args: Type"')
if is_tuple:
if not use_ellipsis:
raise ScannerException(annotation_message)
variadic_inputs_type, typ = parse_annotation_to_column_type(
tuple_params[0], is_input=True)
type_info = scannertypes.get_type_info(typ)
input_columns.append((param_name, variadic_inputs_type, type_info))
break
if param.annotation == param.empty:
raise ScannerException(
('No type annotation specified for input {:s}. Must '
'specify an annotation of "bytes" or "FrameType".')
.format(param_name))
typ = param.annotation
column_type, typ = parse_annotation_to_column_type(typ, is_input=True)
type_info = scannertypes.get_type_info(typ)
input_columns.append((param_name, column_type, type_info))
output_columns = []
# Analyze exec_fn return type to determine output types
typ = sig.return_annotation
if typ == sig.empty:
raise ScannerException(
('Return annotation must be specified for "execute" method.'))
return_is_tuple, use_ellipsis, tuple_params = parse_tuple(typ)
if use_ellipsis:
raise ScannerException(
('Ellipsis tuples not supported for return type.'))
# Parse the return types into Scanner column types
for i, typ in enumerate(tuple_params):
column_type, typ = parse_annotation_to_column_type(typ)
type_info = scannertypes.get_type_info(typ)
output_columns.append(('ret{:d}'.format(i), column_type, type_info))
if kname in PYTHON_OP_REGISTRY:
raise ScannerException(
'Attempted to register Op with name {:s} twice'.format(kname))
def parse_params(in_cols):
args = {}
for (param_name, _1, type_info), cs in zip(input_columns, in_cols):
if can_batch ^ can_stencil:
args[param_name] = [type_info.deserialize(c) for c in cs]
elif can_batch and can_stencil:
args[param_name] = [[type_info.deserialize(c) for c in c2] for c2 in cs]
else:
args[param_name] = type_info.deserialize(cs)
return args
def parse_ret(r):
columns = r if return_is_tuple else (r, )
outputs = []
for (_1, _2, type_info), column in zip(output_columns, columns):
if can_batch:
outputs.append([
type_info.serialize(element)
for element in column
])
else:
outputs.append(
type_info.serialize(column))
return tuple(outputs)
# Wrap exec_fn to destructure input and outputs to proper python inputs
if is_fn:
if has_variadic_inputs:
@wraps(fn_or_class)
def wrapper_exec(config, in_cols):
return parse_ret(exec_fn(config, *in_cols))
else:
@wraps(fn_or_class)
def wrapper_exec(config, in_cols):
args = parse_params(in_cols)
return parse_ret(exec_fn(config, **args))
wrapped_fn_or_class = wrapper_exec
else:
wrapped_fn_or_class = type(fn_or_class.__name__ + 'Kernel', fn_or_class.__bases__,
dict(fn_or_class.__dict__))
if has_variadic_inputs:
def execute(self, in_cols):
return parse_ret(exec_fn(self, *in_cols))
else:
def execute(self, in_cols):
args = parse_params(in_cols)
return parse_ret(exec_fn(self, **args))
wrapped_fn_or_class.execute = execute
dtype = device_type
if device_type is None and device_sets is None:
dtype = DeviceType.CPU
if device_type is not None and device_sets is not None:
raise ScannerException(
'Must only specify one of "device_type" or "device_sets" for python Op.')
if not is_fn:
init_params = list(signature(getattr(fn_or_class, "__init__")).parameters.keys())[1:]
if init_params[0] != 'config':
raise ScannerException("__init__ first argument (after self) must be `config`")
kernel_params = init_params[1:]
stream_params = list(signature(getattr(fn_or_class, "new_stream")).parameters.keys())[1:]
else:
kernel_params = []
stream_params = []
PYTHON_OP_REGISTRY[kname] = {
'input_columns': input_columns,
'output_columns': output_columns,
'variadic_inputs': has_variadic_inputs,
'stencil': stencil,
'unbounded_state': unbounded_state,
'bounded_state': bounded_state,
'kernel': wrapped_fn_or_class,
'device_type': dtype,
'device_sets': device_sets,
'batch': batch,
'proto_path': proto_path,
'registration_id': uuid.uuid4().hex,
'kernel_params': kernel_params,
'stream_params': stream_params
}
return fn_or_class
return dec