Source code for scannerpy.partitioner

from scannerpy.common import *
from scannerpy.protobufs import protobufs

DEFAULT_GROUP_SIZE = 250

[docs]class TaskPartitioner: """ Utility for specifying how to partition the output domain of a job into tasks. """ def __init__(self, sc): self._sc = sc
[docs] def all(self, group_size=DEFAULT_GROUP_SIZE): return self.strided(1, group_size=group_size)
[docs] def strided(self, stride, group_size=DEFAULT_GROUP_SIZE): args = protobufs.StridedPartitionerArgs() args.stride = stride args.group_size = group_size sampling_args = protobufs.SamplingArgs() sampling_args.sampling_function = 'Strided' sampling_args.sampling_args = args.SerializeToString() return sampling_args
[docs] def range(self, start, end): return self.ranges([(start, end)])
[docs] def ranges(self, intervals): return self.strided_ranges(intervals, 1)
[docs] def gather(self, groups): args = protobufs.GatherSamplerArgs() for rows in groups: gather_group = args.groups_add() gather_group.rows[:] = rows sampling_args = protobufs.SamplingArgs() sampling_args.sampling_function = 'Gather' sampling_args.sampling_args = args.SerializeToString() return sampling_args
[docs] def strided_range(self, start, end, stride): return self.strided_ranges([(start, end)], stride)
[docs] def strided_ranges(self, intervals, stride): args = protobufs.StridedRangePartitionerArgs() args.stride = stride for start, end in intervals: args.starts.append(start) args.ends.append(end) sampling_args = protobufs.SamplingArgs() sampling_args.sampling_function = 'StridedRange' sampling_args.sampling_args = args.SerializeToString() return sampling_args