Source code for scannerpy.kernel
import pickle
import traceback
[docs]class KernelConfig(object):
def __init__(self, config):
self.devices = config.devices
self.input_columns = config.input_columns
self.input_column_types = config.input_column_types
self.output_columns = config.output_columns
self.output_column_types = config.output_column_types
self.args = pickle.loads(config.args()) if config.args() != b'' else None
self.node_id = config.node_id
[docs]class Kernel(object):
def __init__(self, config: KernelConfig, init_parameter = None):
r"""
Parameters
----------
config
Contains the configuration settings for this instance of the kernel.
init_parameter
An example init parameter. Any parameters defined in the __init__ method
of a Kernel can be set when creating an instance of the corresponding operation.
For example, an operation for this Kernel could be initialized like this:
:code:`cl.ops.Kernel(init_parameter='test', ...)`
"""
self.config = config
[docs] def close(self):
r"""Called when this Kernel instance will no longer be used.
"""
pass
[docs] def new_stream(self):
r"""Runs after fetch_resources for each instance of this operation.
Parameters added for this method by operations are considered
`stream config parameters` (see :ref:`stream-config-parameters`).
"""
pass
[docs] def reset(self):
r"""Called for stateful operations when the operation should reset its logical state.
"""
pass
[docs] def fetch_resources(self):
r"""Runs once per Scanner worker to download resources for running this operation.
"""
pass
[docs] def setup_with_resources(self):
r"""Runs after fetch_resources for each instance of this operation.
This method is reponsible for handling any setup that requires resources to first be downloaded.
"""
pass
[docs] def execute(self, stream_parameter: bytes) -> bytes:
r"""Runs the kernel on input elements and returns new output elements.
Parameters
----------
stream_parameter
An example stream parameter. Must be annotated with a stream parameter type.
See :ref:`stream-parameters`.
Returns
-------
bytes
The outputs for the operation.
"""
raise NotImplementedError
[docs]def python_kernel_fn(n, recv_conn, send_conn, p_conn1, p_conn2):
import pickle
import cloudpickle
import traceback
import os
from scannerpy import Config, DeviceType, DeviceHandle, KernelConfig
# Close parent connections
p_conn1.close()
p_conn2.close()
try:
kernel_config = KernelConfig(cloudpickle.loads(n['config']))
kernel = cloudpickle.loads(n['kernel_code'])(kernel_config, **kernel_config.args)
while True:
try:
data = recv_conn.recv_bytes()
msg_type, data = cloudpickle.loads(data)
except EOFError as e:
break
if msg_type == 'reset':
kernel.reset()
elif msg_type == 'new_stream':
kernel.new_stream(**(data if data is not None else {}))
elif msg_type == 'execute':
result = kernel.execute(data)
send_conn.send_bytes(cloudpickle.dumps(result))
elif msg_type == 'fetch_resources':
kernel.fetch_resources()
send_conn.send_bytes(b'')
elif msg_type == 'setup_with_resources':
kernel.setup_with_resources()
except Exception as e:
traceback.print_exc()
raise
finally:
send_conn.close()
recv_conn.close()