Source code for scannerpy.streams

import scannerpy.op

from scannerpy.common import *
from scannerpy.protobufs import protobufs
from scannerpy.op import SliceList
from typing import Sequence, Union, Tuple, Optional, List

[docs]class StreamsGenerator: r"""Provides Ops for sampling elements from streams. The methods of this class construct Scanner Ops that enable selecting subsets of the elements in a stream to produce new streams. This class should not be constructed directly, but accessed via a Database object like: sc.streams.Range(input) """ def __init__(self, sc): self._sc = sc
[docs] def Slice(self, input: scannerpy.op.OpColumn, partitions) -> scannerpy.op.OpColumn: r"""Partitions a stream into independent substreams. Parameters ---------- input The stream to partition. partitioner The partitioner that should be used to split the stream into substreams. Returns ------- scannerpy.op.OpColumn A new stream which represents multiple substreams. """ def arg_builder(partitioner): return partitioner return self._sc.ops.Slice( col=input, extra={'type': 'Slice', 'arg_builder': arg_builder, 'job_args': partitions})
[docs] def Unslice(self, input: scannerpy.op.OpColumn) -> scannerpy.op.OpColumn: r"""Joins substreams back together. Parameters ---------- input The stream which contains substreams to join back together. Returns ------- scannerpy.op.OpColumn A new stream which is the concatentation of the input substreams. """ return self._sc.ops.Unslice(col=input)
[docs] def All(self, input: scannerpy.op.OpColumn) -> scannerpy.op.OpColumn: r"""Samples all elements from the stream. Serves as an identity sampling function. Parameters ---------- input The stream to sample. Returns ------- scannerpy.op.OpColumn The sampled stream. """ def arg_builder(): sampling_args = protobufs.SamplingArgs() sampling_args.sampling_function = "All" return sampling_args return self._sc.ops.Sample( col=input, extra={'type': 'All', 'arg_builder': arg_builder, 'default': {}})
[docs] def Stride(self, input: scannerpy.op.OpColumn, strides) -> scannerpy.op.OpColumn: r"""Samples every n'th element from the stream, where n is the stride. Parameters ---------- input The stream to sample. strides The list of strides for each input stream. Returns ------- scannerpy.op.OpColumn The sampled stream. """ def arg_builder(stride): args = protobufs.StridedSamplerArgs() args.stride = stride sampling_args = protobufs.SamplingArgs() sampling_args.sampling_function = "Strided" sampling_args.sampling_args = args.SerializeToString() return sampling_args return self._sc.ops.Sample( col=input, extra={'type': 'Stride', 'arg_builder': arg_builder, 'job_args': strides})
[docs] def Range(self, input: scannerpy.op.OpColumn, ranges: Sequence[Tuple[int]]) -> scannerpy.op.OpColumn: r"""Samples a range of elements from the input stream. Parameters ---------- input The stream to sample. ranges Pairs of (start, end) for each stream being processed. Returns ------- scannerpy.op.OpColumn The sampled stream. Examples -------- For example, to select frames 0-10 for one stream, you would write: sc.streams.Ranges(input=input, ranges=[(0, 11)]) """ def arg_builder(start, end): args = protobufs.StridedRangeSamplerArgs() args.stride = 1 args.starts.append(start) args.ends.append(end) sampling_args = protobufs.SamplingArgs() sampling_args.sampling_function = "StridedRanges" sampling_args.sampling_args = args.SerializeToString() return sampling_args return self._sc.ops.Sample( col=input, extra={'type': 'Range', 'arg_builder': arg_builder, 'job_args': ranges})
[docs] def Ranges(self, input: scannerpy.op.OpColumn, intervals: List[Sequence[Tuple[int, int]]]) -> scannerpy.op.OpColumn: r"""Samples multiple ranges of elements from the input stream. Parameters ---------- input The stream to sample. intervals The intervals to sample from. This should be a list of tuples representing start and end ranges. Returns ------- scannerpy.op.OpColumn The sampled stream. Examples -------- For example, to select frames 0-10 and 100-200 for one stream, you would write: sc.streams.Ranges(input=input, intervals=[[(0, 11), (100, 201)]]) """ def arg_builder(intervals): args = protobufs.StridedRangeSamplerArgs() args.stride = 1 for start, end in intervals: args.starts.append(start) args.ends.append(end) sampling_args = protobufs.SamplingArgs() sampling_args.sampling_function = "StridedRanges" sampling_args.sampling_args = args.SerializeToString() return sampling_args return self._sc.ops.Sample( col=input, extra={'type': 'Ranges', 'arg_builder': arg_builder, 'job_args': intervals})
[docs] def StridedRange(self, input: scannerpy.op.OpColumn, ranges) -> scannerpy.op.OpColumn: r"""Samples a strided range of elements from the input stream. Parameters ---------- input The stream to sample. start The default index to start sampling from. end The default index to end sampling at. stride The default value to stride by. Returns ------- scannerpy.op.OpColumn The sampled stream. """ def arg_builder(start, end, stride): args = protobufs.StridedRangeSamplerArgs() args.stride = stride args.starts.append(start) args.ends.append(end) sampling_args = protobufs.SamplingArgs() sampling_args.sampling_function = "StridedRanges" sampling_args.sampling_args = args.SerializeToString() return sampling_args return self._sc.ops.Sample( col=input, extra={'type': 'StridedRange', 'arg_builder': arg_builder, 'job_args': ranges})
[docs] def StridedRanges(self, input: scannerpy.op.OpColumn, intervals: Sequence[Tuple[int, int]] = None, stride: int = None) -> scannerpy.op.OpColumn: r"""Samples strided ranges of elements from the input stream. Parameters ---------- input The stream to sample. intervals The default intervals to sample from. This should be a list of tuples representing start and end ranges. stride The default value to stride by. Returns ------- scannerpy.op.OpColumn The sampled stream. """ def arg_builder(intervals=intervals, stride=stride): args = protobufs.StridedRangeSamplerArgs() args.stride = stride for start, end in intervals: args.starts.append(start) args.ends.append(end) sampling_args = protobufs.SamplingArgs() sampling_args.sampling_function = "StridedRanges" sampling_args.sampling_args = args.SerializeToString() return sampling_args return self._sc.ops.Sample( col=input, extra={'type': 'StridedRanges', 'arg_builder': arg_builder, 'default': (intervals, stride) if (intervals is not None and stride is not None) else None})
[docs] def Gather(self, input: scannerpy.op.OpColumn, indices: Sequence[Sequence[int]]) -> scannerpy.op.OpColumn: r"""Samples a list of elements from the input stream. Parameters ---------- input The stream to sample. rows A list of the indices to sample. Returns ------- scannerpy.op.OpColumn The sampled stream. """ def arg_builder(rows): args = protobufs.GatherSamplerArgs() args.rows[:] = rows sampling_args = protobufs.SamplingArgs() sampling_args.sampling_function = 'Gather' sampling_args.sampling_args = args.SerializeToString() return sampling_args return self._sc.ops.Sample( col=input, extra={'type': 'Gather', 'arg_builder': arg_builder, 'job_args': indices})
[docs] def RepeatNull(self, input: scannerpy.op.OpColumn, spacings) -> scannerpy.op.OpColumn: r"""Expands a sequence by inserting nulls. Parameters ---------- input The stream to expand. spacing Returns ------- scannerpy.op.OpColumn The sampled stream. """ def arg_builder(spacing): args = protobufs.SpaceNullSamplerArgs() args.spacing = spacing sampling_args = protobufs.SamplingArgs() sampling_args.sampling_function = "SpaceNull" sampling_args.sampling_args = args.SerializeToString() return sampling_args return self._sc.ops.Space( col=input, extra={'type': 'RepeatNull', 'arg_builder': arg_builder, 'job_args': spacings})
[docs] def Repeat(self, input: scannerpy.op.OpColumn, spacings) -> scannerpy.op.OpColumn: r"""Expands a sequence by repeating elements. Parameters ---------- input The stream to expand. spacing Returns ------- scannerpy.op.OpColumn The sampled stream. """ def arg_builder(spacing): args = protobufs.SpaceRepeatSamplerArgs() args.spacing = spacing sampling_args = protobufs.SamplingArgs() sampling_args.sampling_function = "SpaceRepeat" sampling_args.sampling_args = args.SerializeToString() return sampling_args return self._sc.ops.Space( col=input, extra={'type': 'Repeat', 'arg_builder': arg_builder, 'job_args': spacings})