Source code for scannerpy.io

from scannerpy.common import ScannerException
from scannerpy.storage import StoredStream

[docs]class IOGenerator: def __init__(self, sc): self._sc = sc
[docs] def Input(self, streams): if not isinstance(streams, list) or not isinstance(streams[0], StoredStream): raise ScannerException("io.Input must take a list of streams as input") example = streams[0] source = example.storage().source(self._sc, streams) source._streams = streams return source
[docs] def Output(self, op, streams): if not isinstance(streams, list) or not isinstance(streams[0], StoredStream): raise ScannerException("io.Output must take a list of streams as input") example = streams[0] sink = example.storage().sink(self._sc, op, streams) sink._streams = streams return sink