Operations

Overview

Processing data in Scanner occurs primarily through operations (ops). Operations are built-in or user defined functionality that transform streams of input elements to streams of output elements. For example, this Resize operation transforms input frames by resizing them to a target width and height:

import scannerpy as sp
import scannertools.imgproc

sc = sp.Client()
...
sc.ops.Resize(frame=input_frames, width=[640], height=[480])

Follow the rest of this guide to understand how to write your own operations.

Defining an Operation in Python

Scanner supports defining operations in Python using the following syntax:

import scannerpy as sp

@sp.register_python_op()
class PersonDetectorDNN(sp.Kernel):
    # Init runs once when the class instance is initialized
    def __init__(self, config, model_url):
        self._model_url = model_url
        self._local_model_path = '/tmp/model'

    def fetch_resources(self):
        download_model(self._model_url, self._local_model_path)

    def setup_with_resources(self):
        with open(self._local_model_path, 'rb') as f:
            # load and setup the model
            self.model = ...

    def new_stream(self, threshold):
        self._threshold = threshold

    def execute(self, frame: sp.FrameType) -> BoundingBoxList:
        person_boxes, confidences = self.model.execute(frame)
        return [box
                for box, conf in zip(person_boxes, confidences)
                if conf > self._threshold]

We now have an operation defined named PersonDetectorDNN which can be instantiated using a client object named sc like this: sc.ops.PersonDetectorDNN(frame=..., model_url=..., threshold=[...]). The rest of this guide will explain the above piece of code in more detail. First, let’s start with the way parameters to the operation.

Declaring Parameters

Parameters to operations can be declared at one of three locations:

  • __init__(): the constructor for the operation.

  • new_stream(): called whenever the operation starts processing a new stream.

  • execute(): called with input stream elements to produce transformed output elements.

The frequency at which each of these functions is invoked is called its rate.

Init Parameters

Init parameters are specified once and given to the constructor of an operation, and then never changed. For example, the model_url parameter to the PersonDetectorDNN operation is a init parameter as it is specified in the __init__() method to the class.

Stream Config Parameters

Stream config parameters are provided to an operation before processing elements from a stream in a batch of streams (see Batch processing of stored streams). For example, the threshold parameter specified in the new_stream() method of the PersonDetectorDNN operation is a stream config parameter. For each stream in the batch of streams being processed, Scanner expects a separate threshold parameter to be provided. This is done by passing a list like so: PersonDetectorDNN(threshold=[0.5, 0.6, ..., 0.2], ...).

Stream Parameters

Stream parameters are parameters that bind to streams and are processed element at a time by the operation. For the PersonDetectorDNN operation, the frame parameter specified in the execute() method is a stream parameter. Stream parameters must be annotated with a type so that Scanner understands how to serialize and deserialize the data. The following types are supported:

  • FrameType: a built-in type that represents frames from a video. In Python operations, parameters of this type are represented with numpy arrays.

  • bytes: these parameters represent blobs of binary data.

  • User-defined types: Scanner supports registering custom types for stream parameters. See the next section to find out how to do that!

Stream Parameter Types

Stream parameter types tell Scanner how to serialize and deserialize the elements in streams and the inputs/outputs to operations. One can define their own custom stream parameter type in Python by calling scannerpy.types.register_type(). For example, to register a new type for numpy float32 arrays, we can write the following code:

import scannerpy as sp
import scannerpy.types
import numpy as np

@sp.types.register_type
class NumpyArrayFloat32:
    def serialize(array):
        return array.tobytes()

    def deserialize(data_buffer):
        return np.fromstring(data_buffer, dtype=np.float32)

A custom type implements a serialize method, which takes an instance of the type and converts it to a byte buffer, and a deserialize method, which takes a byte buffer produced by calling serialize and converts it back into an instance of the type.

Fetching Resources

Some operations require external resources to be download or fetched before they can start processing data. In the case of the PersonDetectorDNN operation, it requires the model weights for its deep neural network. To download external resources, operations should implement the following two methods:

Operation Properties

Scanner operations can be annotated with several different properties to change their functionality.

Device Sets

By default, Scanner will assume operations only use the CPU when processing data. If an operation utilizes the GPU when processing elements, it can declare that it requires that device type during op declaration:

@sp.register_python_op(device_sets=[(DeviceType.GPU, 1)])
class GpuOp():
    def __init__(self, config):
        pass

    ...

Batch

Many operations benefit from being able to process a batch of elements all at once, especially when using the GPU. Operations can declare they are able to process batches of elements at once using the batch property:

from typing import Sequence

@sp.register_python_op(batch=8)
class BatchOp():
    def __init__(self, config):
        pass

    def execute(self, frame: Sequence[sp.FrameType]) -> Sequence[sp.FrameType]:
        # process a batch of frames
        ...

Notice how the signature of the execute method changed. Since we are processing a batch of input, the frame parameter and the output are now lists of frames instead of a single frame.

Stencil

Some operations require looking at a window of data over time. For example, computing optical flow requires both the current and next frame in time. These operations can indicate they require a stencil of frames:

from typing import Sequence

@sp.register_python_op(stencil=[0, 1])
class OpticalFlow():
    def __init__(self, config):
        pass

    def execute(self, frame: Sequence[sp.FrameType]) -> sp.FrameType:
        # process a window of frames
        ...

Like with the batch property, the signature of the execute method changed. However, instead of both the input and output becoming lists of frames, only the input did. This is because the operation needs a list of frames as input, but still produces a single output element for each invocation of execute.

Defining an Operation in C++

For performance critical operations, Scanner also supports defining operations in C++. Check out the tutorial 08_defining_cpp_ops.py to find out how to write your own C++ operation.