Source code for scannerpy.types

from attr import attrs, attrib
from scannerpy.common import ScannerException
from scannerpy.protobufs import protobufs
import pickle
from typing import NewType, Generic, TypeVar, Any
import numpy as np
import struct

PYTHON_TYPE_REGISTRY = {}

# This is a special built-in type for video frame streams.
# Class purely for type annotation.
[docs]class FrameType(object): pass
BlobType = bytes
[docs]@attrs(frozen=True) class ScannerTypeInfo: type = attrib() cpp_name = attrib() serialize = attrib() deserialize = attrib()
def _register_type(ty, cpp_name, serialize, deserialize): global PYTHON_TYPE_REGISTRY PYTHON_TYPE_REGISTRY[ty] = ScannerTypeInfo( type=ty, cpp_name=cpp_name, serialize=serialize, deserialize=deserialize)
[docs]def get_type_info(ty): global PYTHON_TYPE_REGISTRY if not ty in PYTHON_TYPE_REGISTRY: raise ScannerException("Type `{}` has not been registered with Scanner".format(ty.__name__)) return PYTHON_TYPE_REGISTRY[ty]
[docs]def get_type_info_cpp(cpp_name): global PYTHON_TYPE_REGISTRY for ty in PYTHON_TYPE_REGISTRY.values(): if ty.cpp_name == cpp_name: return ty raise ScannerException("Type `{}` has not been registered with Scanner".format(cpp_name))
_register_type(bytes, "Bytes", lambda x: x, lambda x: x) _register_type(Any, "Any", pickle.dumps, pickle.loads) _register_type(FrameType, "FrameType", lambda x: x, lambda x: x) # TODO: document this
[docs]def register_type(cls): name = cls.__name__ _register_type(cls, name, cls.serialize, cls.deserialize) return cls
[docs]def ProtobufType(name, proto): def serialize(proto_obj): return proto_obj.SerializeToString() def deserialize(buf): p = proto() p.ParseFromString(buf) return p return register_type(type(name, (), dict(serialize=serialize, deserialize=deserialize)))
[docs]def VariableList(name, typ): def serialize(variable_list): s = struct.pack('=Q', len(variable_list)) for element in variable_list: serialized = typ.serialize(element) s += struct.pack('=Q', len(serialized)) s += serialized return s def deserialize(buf): (N, ) = struct.unpack("=Q", buf[:8]) buf = buf[8:] elements = [] for i in range(N): (serialized_size, ) = struct.unpack("=Q", buf[:8]) buf = buf[8:] element = typ.deserialize(buf[:serialized_size]) buf = buf[serialized_size:] elements.append(element) return elements return register_type(type(name, (), dict(serialize=serialize, deserialize=deserialize)))
[docs]def UniformList(name, typ, size=None, parts=None): assert (size is not None) ^ (parts is not None) def serialize(uniform_list): return b''.join([typ.serialize(obj) for obj in uniform_list]) def deserialize(buf): nonlocal size nonlocal parts # HACK(will): need a placeholder for an empty list if len(buf) <= 4: return [] if parts is not None: size = len(buf) // parts assert len(buf) % size == 0 return [typ.deserialize(buf[i:i+size]) for i in range(0, len(buf), size)] return register_type(type(name, (), dict(serialize=serialize, deserialize=deserialize)))
Bbox = ProtobufType('Bbox', protobufs.BoundingBox) BboxList = VariableList('BboxList', Bbox)
[docs]@register_type class NumpyArrayFloat32:
[docs] def serialize(array): return array.tobytes()
[docs] def deserialize(data_buffer): return np.frombuffer(data_buffer, dtype=np.float32)
[docs]@register_type class NumpyArrayInt32:
[docs] def serialize(array): return array.tobytes()
[docs] def deserialize(data_buffer): return np.frombuffer(data_buffer, dtype=np.int32)
Histogram = UniformList('Histogram', NumpyArrayInt32, parts=3)
[docs]@register_type class Image:
[docs] def serialize(image): import cv2 return cv2.imencode('.png', image)
[docs] def deserialize(encoded_image): import cv2 return cv2.imdecode(np.frombuffer(encoded_image, dtype=np.dtype(np.uint8)), cv2.IMREAD_COLOR)