from scannerpy.common import ScannerException
from typing import List, Callable, Any, Generator
from scannerpy.types import get_type_info_cpp
import hwang
import storehouse
[docs]class NullElement:
"""Represents a 'null' output in a stream generated through Scanner.
These can show up e.g. when you use a spacing/repeat operator. NullElement is used instead
of the more generic Python 'None', as a Python kernel can have 'None' as a valid output which
isn't the same as a 'null' stream element.
"""
pass
[docs]class StorageBackend:
"""I/O backend for streams fed in/out of Scanner."""
[docs] def source(self, sc, streams):
"""Returns the Scanner source corresponding to this storage backend.
Parameters
----------
sc: Client
Scanner client
streams: List[StoredStream]
List of StoredStream objects of the same storage type
Returns
----------
source: Source
Scanner source
"""
raise ScannerException(
"StorageBackend class `{}` cannot serve as a Scanner input.".format(type(self).__name__))
[docs] def sink(self, sc, op, streams):
"""Returns the Scanner sink corresponding to this storage backend.
Parameters
----------
sc: Client
Scanner client
op: Op
Input op from computation graph
streams: List[StoredStream]
List of StoredStream objects of the same storage type
Returns
----------
sink: Sink
Scanner sink
"""
raise ScannerException(
"StorageBackend class `{}` cannot serve as a Scanner output.".format(type(self).__name__))
[docs] def delete(self, sc, streams):
"""Deletes the streams from storage if they exist.
Parameters
----------
sc: Client
Scanner client
streams: List[StoredStream]
List of StoredStream objects of the same storage type
"""
raise ScannerException(
"StorageBackend class `{}` cannot delete elements.".format(type(self).__name__))
[docs]class StoredStream:
"""Handle to a stream stored in a particular storage backend.
In general, a StoredStream is not guaranteed to exist, but you can always check this using
:func:`exists`.
"""
[docs] def load_bytes(self, rows: List[int] = None) -> Generator[bytes, None, None]:
"""A generator that incrementally loads raw bytes from the stored stream.
This function is not intended to be called directly by the user. See :func:`load`.
Parameters
----------
rows: List[int]
List of indices in the stream to load. Default is all elements.
"""
raise ScannerException(
"Stream `{}` cannot load elements into Python.".format(type(self).__name__))
[docs] def committed(self) -> bool:
"""Check if a stream is completely materialized in storage.
A stream may exist, but not be committed if a Scanner job was run that was supposed to
output the stream, but the job failed for any reason.
"""
raise NotImplementedError
[docs] def exists(self) -> bool:
"""Check if any part of a stream exists in storage."""
raise NotImplementedError
[docs] def storage(self) -> StorageBackend:
"""Get the storage backend corresponding to this stream."""
raise NotImplementedError
[docs] def len(self) -> int:
"""Get the number of elements in this stream."""
raise NotImplementedError
[docs] def name(self) -> str:
"""Gets a human interpretable name for this stored stream."""
raise NotImplementedError
[docs] def type(self) -> type:
"""Get the Scanner type of elements in this stream if it exists, and return None otherwise."""
raise NotImplementedError
[docs] def estimate_size(self) -> int:
"""Estimates the size in bytes of elements in the stream. Not guaranteed to be accurate,
just used for heuristics."""
raise NotImplementedError
[docs] def load(self, ty: type = None, fn: Callable[[bytes], Any] = None, rows: List[int] = None) -> Generator[Any, None, None]:
"""Load elements from the stored stream into Python.
Parameters
----------
ty: type
Scanner type of elements in the stream. If the storage backend recorded the type, that
will be used by default.
fn: Callable[[bytes], Any]
Deserialization function that maps elements as bytes to their actual type.
rows: List[int]
List of indices in the stream to load. Default is all elements.
Returns
----------
generator: Generator[Any, None, None]
A generator that outputs one stream element at a time.
"""
if not self.committed():
raise ScannerException("Tried to load from uncommitted stream")
# Use a deserialize function if provided.
# If not, use a type if provided.
# If not, attempt to determine the type from the column's table descriptor.
# If that doesn't work, then assume no deserialization function, and return bytes.
if fn is None:
if ty is None:
ty = self.type()
if ty is not None:
fn = ty.deserialize
for obj in self.load_bytes(rows=rows):
if fn is not None and type(obj) == bytes:
yield fn(obj)
else:
yield obj
[docs] def delete(self, sc):
"""Deletes the stream from its storage if it exists.
Parameters
----------
sc: Client
Scanner client
"""
self.storage().delete(sc, [self])
[docs]class NamedStorage(StorageBackend):
"""Named storage for byte streams. Useful default output format for non-video-data.
Stores byte streams in a custom packed binary file format. Supports both local filesystems
(Linux/Posix) and cloud file systems (S3, GCS).
"""
def __init__(self, storage_config=None):
if storage_config is None:
storage_config = storehouse.StorageConfig.make_posix_config()
self._storage_config = storage_config
self._storehouse_backend = None
def _storehouse(self):
if self._storehouse_backend is None:
self._storehouse_backend = storehouse.StorageBackend.make_from_config(self._storage_config)
return self._storehouse_backend
[docs] def source(self, sc, streams):
return sc.sources.Column(
table_name=[s._name for s in streams],
column_name=['column' for s in streams])
[docs] def sink(self, sc, op, streams):
return sc.sinks.Column(
columns={'column': op},
table_name=[s._name for s in streams],
column_name=['column' for s in streams])
[docs] def delete(self, sc, streams):
if len(streams) > 0:
sc.delete_tables([e._name for e in streams])
[docs]class NamedVideoStorage(NamedStorage):
"""Named storage for video streams. Special baked-in storage class for keeping videos compressed."""
[docs] def source(self, sc, streams):
return sc.sources.FrameColumn(
table_name=[s._name for s in streams],
column_name=['frame' for s in streams])
[docs] def sink(self, sc, op, streams):
return sc.sinks.FrameColumn(
columns={'frame': op},
table_name=[s._name for s in streams],
column_name=['frame' for s in streams])
[docs] def ingest(self, sc, streams, batch=500):
to_ingest = [(s._name, s._path) for s in streams
if (s._path is not None and
s._inplace == False)]
to_ingest_inplace = [(s._name, s._path) for s in streams
if (s._path is not None and
s._inplace == True)]
if len(to_ingest) > 0:
for i in range(0, len(to_ingest), batch):
sc.ingest_videos(to_ingest[i:i+batch], inplace=False, force=True)
if len(to_ingest_inplace) > 0:
for i in range(0, len(to_ingest_inplace), batch):
sc.ingest_videos(to_ingest_inplace[i:i+batch], inplace=True, force=True)
[docs]class NamedStream(StoredStream):
"""Stream of elements stored on disk with a given name."""
def __init__(self, sc, name: str, storage=None):
"""
Parameters
----------
sc: Client
Scanner client.
name: str
Name of the stream.
storage: NamedStorage
Optional NamedStorage object.
"""
if storage is None:
self._storage = NamedStorage()
else:
self._storage = storage
self._sc = sc
self._name = name
[docs] def type(self):
seq = self._sc.sequence(self._name)
seq._load_meta()
type_name = seq._descriptor.type_name
if type_name != "":
return get_type_info_cpp(type_name)
else:
return None
[docs] def name(self):
return self._name
[docs] def storage(self):
return self._storage
[docs] def committed(self):
return self._sc.sequence(self._name)._table.committed()
[docs] def exists(self):
return self._sc.sequence(self._name)._table.exists()
[docs] def len(self):
return self._sc.sequence(self._name)._table.num_rows()
[docs] def load_bytes(self, rows=None):
seq = self._sc.sequence(self._name)
yield from seq.load(fn=lambda x: x, workers=16, rows=rows)
[docs]class NamedVideoStream(NamedStream):
"""Stream of video frame stored (compressed) in named storage."""
def __init__(self, sc, name, path=None, inplace=False, storage=None):
"""
Parameters
----------
sc: Client
Scanner client
name: str
Name of the stream in Scanner storage.
path: str
Path to corresponding video file. If this parameter is given, the named video stream will be
created from the video at the given path.
storage: NamedFrameStorage
Optional NamedFrameStorage object.
"""
if storage is None:
self._storage = NamedVideoStorage()
else:
self._storage = storage
self._sc = sc
self._name = name
self._path = path
self._inplace = inplace
self._ingested = False
def _ingest(self):
if not self._ingested and self._path is not None:
self._ingested = True
self.storage().ingest(self._sc, [self])
[docs] def committed(self):
self._ingest()
return self._sc.sequence(self._name)._table.committed()
[docs] def exists(self):
self._ingest()
return self._sc.sequence(self._name)._table.exists()
[docs] def len(self):
self._ingest()
return self._sc.sequence(self._name)._table.num_rows()
[docs] def load_bytes(self, rows=None):
self._ingest()
seq = self._sc.sequence(self._name)
yield from seq.load(fn=lambda x: x, workers=16, rows=rows)
[docs] def estimate_size(self):
self._ingest()
col = self._sc.sequence(self._name)
col._load_meta()
vmeta = col._video_descriptor
return vmeta.width * vmeta.height * vmeta.channels
[docs] def save_mp4(self, output_name, fps=None, scale=None):
self._ingest()
return self._sc.sequence(self._name).save_mp4(output_name, fps=fps, scale=scale)
[docs] def as_hwang(self):
try:
video_file = storehouse.RandomReadFile(self._storage._storehouse(), self._path.encode('ascii'))
except UserWarning:
raise Exception('Path to video `{}` does not exist.'.format(self._path))
return hwang.Decoder(video_file)