from attr import attrs, attrib
from scannerpy.common import ScannerException
from scannerpy.protobufs import protobufs
import pickle
from typing import NewType, Generic, TypeVar, Any
import numpy as np
import struct
PYTHON_TYPE_REGISTRY = {}
# This is a special built-in type for video frame streams.
# Class purely for type annotation.
[docs]class FrameType(object):
pass
BlobType = bytes
[docs]@attrs(frozen=True)
class ScannerTypeInfo:
type = attrib()
cpp_name = attrib()
serialize = attrib()
deserialize = attrib()
def _register_type(ty, cpp_name, serialize, deserialize):
global PYTHON_TYPE_REGISTRY
PYTHON_TYPE_REGISTRY[ty] = ScannerTypeInfo(
type=ty,
cpp_name=cpp_name,
serialize=serialize,
deserialize=deserialize)
[docs]def get_type_info(ty):
global PYTHON_TYPE_REGISTRY
if not ty in PYTHON_TYPE_REGISTRY:
raise ScannerException("Type `{}` has not been registered with Scanner".format(ty.__name__))
return PYTHON_TYPE_REGISTRY[ty]
[docs]def get_type_info_cpp(cpp_name):
global PYTHON_TYPE_REGISTRY
for ty in PYTHON_TYPE_REGISTRY.values():
if ty.cpp_name == cpp_name:
return ty
raise ScannerException("Type `{}` has not been registered with Scanner".format(cpp_name))
_register_type(bytes, "Bytes", lambda x: x, lambda x: x)
_register_type(Any, "Any", pickle.dumps, pickle.loads)
_register_type(FrameType, "FrameType", lambda x: x, lambda x: x)
# TODO: document this
[docs]def register_type(cls):
name = cls.__name__
_register_type(cls, name, cls.serialize, cls.deserialize)
return cls
[docs]def ProtobufType(name, proto):
def serialize(proto_obj):
return proto_obj.SerializeToString()
def deserialize(buf):
p = proto()
p.ParseFromString(buf)
return p
return register_type(type(name, (), dict(serialize=serialize, deserialize=deserialize)))
[docs]def VariableList(name, typ):
def serialize(variable_list):
s = struct.pack('=Q', len(variable_list))
for element in variable_list:
serialized = typ.serialize(element)
s += struct.pack('=Q', len(serialized))
s += serialized
return s
def deserialize(buf):
(N, ) = struct.unpack("=Q", buf[:8])
buf = buf[8:]
elements = []
for i in range(N):
(serialized_size, ) = struct.unpack("=Q", buf[:8])
buf = buf[8:]
element = typ.deserialize(buf[:serialized_size])
buf = buf[serialized_size:]
elements.append(element)
return elements
return register_type(type(name, (), dict(serialize=serialize, deserialize=deserialize)))
Bbox = ProtobufType('Bbox', protobufs.BoundingBox)
BboxList = VariableList('BboxList', Bbox)
[docs]@register_type
class NumpyArrayFloat32:
[docs] def serialize(array):
return array.tobytes()
[docs] def deserialize(data_buffer):
return np.frombuffer(data_buffer, dtype=np.float32)
[docs]@register_type
class NumpyArrayInt32:
[docs] def serialize(array):
return array.tobytes()
[docs] def deserialize(data_buffer):
return np.frombuffer(data_buffer, dtype=np.int32)
Histogram = UniformList('Histogram', NumpyArrayInt32, parts=3)
[docs]@register_type
class Image:
[docs] def serialize(image):
import cv2
return cv2.imencode('.png', image)
[docs] def deserialize(encoded_image):
import cv2
return cv2.imdecode(np.frombuffer(encoded_image, dtype=np.dtype(np.uint8)), cv2.IMREAD_COLOR)