Source code for scannerpy.profiler

import struct
import json
from concurrent.futures import ThreadPoolExecutor
import random
import tarfile
import os

from scannerpy.common import *
from scannerpy.protobufs import protobufs

[docs]def read_advance(fmt, buf, offset): new_offset = offset + struct.calcsize(fmt) return struct.unpack_from(fmt, buf, offset), new_offset
[docs]def unpack_string(buf, offset): s = '' while True: t, offset = read_advance('B', buf, offset) c = t[0] if c == 0: break s += str(chr(c)) return s, offset
[docs]class Profile: """ Contains profiling information about Scanner jobs. """ def __init__(self, sc, job_id, load_threads=8, subsample=None): self._storage = sc._storage job = sc._load_descriptor(protobufs.BulkJobDescriptor, 'jobs/{}/descriptor.bin'.format(job_id)) def get_prof(path, worker=True): file_info = self._storage.get_file_info(path) if file_info.file_exists: time, profs = self._parse_profiler_file(path, worker) return time, profs else: return None nodes = list(range(job.num_nodes)) if subsample is not None: nodes = random.sample(nodes, subsample) with ThreadPoolExecutor(max_workers=load_threads) as executor: profilers = executor.map( get_prof, ['{}/jobs/{}/profile_{}.bin'.format(sc._db_path, job_id, n) for n in nodes]) self._worker_profilers = {n: prof for n, prof in enumerate(profilers) if prof is not None} path = '{}/jobs/{}/profile_master.bin'.format(sc._db_path, job_id) self._master_profiler = get_prof(path, worker=False)
[docs] def write_trace(self, path: str): """ Generates a trace file in Chrome format. To visualize the trace, visit chrome://tracing in Google Chrome and click "Load" in the top left to load the trace. Args ---- path Output path to write the trace. """ # https://github.com/catapult-project/catapult/blob/master/tracing/tracing/base/color_scheme.html colors = { 'Wait on Input Queue': 'grey', 'Wait on Output Queue': 'grey', } traces = [] next_tid = 0 def make_trace_from_interval(interval, cat, proc, tid): trace = { 'name': interval[0], 'cat': cat, 'ph': 'X', 'ts': interval[1] / 1000, # ns to microseconds 'dur': (interval[2] - interval[1]) / 1000, 'pid': proc, 'tid': tid, 'args': {} } if interval[0] in colors: trace['cname'] = colors[interval[0]] return trace if self._master_profiler is not None: traces.append({ 'name': 'process_name', 'ph': 'M', 'pid': -1, 'args': {'name': 'Master'} }) traces.append({ 'name': 'thread_name', 'ph': 'M', 'pid': -1, 'tid': 0, 'args': {'name': 'EventLoop'} }) for interval in self._master_profiler[1]['intervals']: traces.append(make_trace_from_interval(interval, 'master', -1, 0)) for proc, (_, worker_profiler_groups) in self._worker_profilers.items(): traces.append({ 'name': 'process_name', 'ph': 'M', 'pid': proc, 'args': {'name': 'Worker {:d}'.format(proc)} }) num_load_workers = len(worker_profiler_groups['load']) num_eval_workers = sum([len(profs) for profs in worker_profiler_groups['eval']]) num_stages_per_pipeline = len(worker_profiler_groups['eval'][0]) if num_eval_workers > 0 else 0 for worker_type, profs in [('process_job', worker_profiler_groups['process_job']), ('load', worker_profiler_groups['load']), ('eval', worker_profiler_groups['eval']), ('save', worker_profiler_groups['save'])]: for i, prof in enumerate(profs): tid = next_tid next_tid += 1 pipeline_num = prof['worker_num'] tag = prof['worker_tag'] display_info = { ('process_job', ''): {'name': 'EventLoop', 'index': lambda x: 0}, ('eval', 'pre'): {'name': 'Pipeline[{:d}]:DecodeVideo', 'index': lambda x: 1 + num_load_workers + x * num_stages_per_pipeline + 0}, ('eval', 'eval'): {'name': 'Pipeline[{:d}]:Ops[{:d}]', 'index': lambda x: 1 + num_load_workers + x * num_stages_per_pipeline + 1}, ('eval', 'post'): {'name': 'Pipeline[{:d}]:EncodeVideo', 'index': lambda x: 1 + num_load_workers + x * num_stages_per_pipeline + 2}, ('load', ''): {'name': 'Reader[{:d}]', 'index': lambda x: 1 + x}, ('save', ''): {'name': 'Writer[{:d}]', 'index': lambda x: 1 + num_load_workers + num_eval_workers + x}, } info = display_info[(worker_type, tag)] name = info['name'] if tag == 'eval': name = name.format(pipeline_num, prof['kernel_group']) elif worker_type != 'process_job': name = name.format(pipeline_num) traces.append({ 'name': 'thread_name', 'ph': 'M', 'pid': proc, 'tid': tid, 'args': {'name': name} }) sort_index = info['index'](tid) traces.append({ 'name': 'thread_sort_index', 'ph': 'M', 'pid': proc, 'tid': tid, 'args': {'sort_index': sort_index} }) for interval in prof['intervals']: traces.append(make_trace_from_interval(interval, worker_type, proc, tid)) parts = path.split('.') base = parts[0] exts = parts[1:] with open(base + '.trace', 'w') as f: f.write(json.dumps(traces)) if exts == ['trace']: return path elif exts == ['tar', 'gz']: with tarfile.open(base + '.tar.gz', 'w:gz') as tar: tar.add(base + '.trace') os.remove(base + '.trace') return path else: raise ScannerException("Invalid trace extension '{}'. Must be .trace or .tar.gz." \ .format(''.join(['.' + e for e in exts])))
def _convert_time(self, d): def convert(t): if isinstance(t, float): return '{:2f}'.format(t / 1.0e9) return t return { k: self._convert_time(v) if isinstance(v, dict) else convert(v) for (k, v) in d.items() }
[docs] def total_time_interval(self): intv, _ = list(self._worker_profilers.values())[0] return intv
[docs] def statistics(self): totals = {} for (total_start, total_end), profiler in list(self._worker_profilers.values()): for kind in profiler: if kind not in totals: totals[kind] = {} for thread in profiler[kind]: for (key, start, end) in thread['intervals']: if key not in totals[kind]: totals[kind][key] = 0.0 totals[kind][key] += end - start for (name, value) in thread['counters'].items(): if name not in totals[kind]: totals[kind][name] = 0 totals[kind][name] += value totals['total_time'] = float(total_end - total_start) readable_totals = self._convert_time(totals) return readable_totals
def _parse_profiler_output(self, bytes_buffer, offset): # Node t, offset = read_advance('q', bytes_buffer, offset) node = t[0] # Worker type name worker_type, offset = unpack_string(bytes_buffer, offset) # Worker tag worker_tag, offset = unpack_string(bytes_buffer, offset) # Worker number t, offset = read_advance('q', bytes_buffer, offset) worker_num = t[0] # Number of keys t, offset = read_advance('q', bytes_buffer, offset) num_keys = t[0] # Key dictionary encoding key_dictionary = {} for i in range(num_keys): key_name, offset = unpack_string(bytes_buffer, offset) t, offset = read_advance('B', bytes_buffer, offset) key_index = t[0] key_dictionary[key_index] = key_name # Intervals t, offset = read_advance('q', bytes_buffer, offset) num_intervals = t[0] intervals = [] for i in range(num_intervals): # Key index t, offset = read_advance('B', bytes_buffer, offset) key_index = t[0] t, offset = read_advance('q', bytes_buffer, offset) start = t[0] t, offset = read_advance('q', bytes_buffer, offset) end = t[0] intervals.append((key_dictionary[key_index], start, end)) # Counters t, offset = read_advance('q', bytes_buffer, offset) num_counters = t[0] # wcrichto 12-27-18: seeing a strange issue where # of counters is corrupted in # output file. Putting in temporary sanity check until this is fixed. if num_counters > 1000000: num_counters = 0 counters = {} for i in range(num_counters): # Counter name counter_name, offset = unpack_string(bytes_buffer, offset) # Counter value t, offset = read_advance('q', bytes_buffer, offset) counter_value = t[0] counters[counter_name] = counter_value return { 'node': node, 'worker_type': worker_type, 'worker_tag': worker_tag, 'worker_num': worker_num, 'intervals': intervals, 'counters': counters }, offset def _parse_profiler_file(self, profiler_path, worker=True): bytes_buffer = self._storage.read(profiler_path) offset = 0 # Read start and end time intervals t, offset = read_advance('q', bytes_buffer, offset) start_time = t[0] t, offset = read_advance('q', bytes_buffer, offset) end_time = t[0] if worker: # Profilers profilers = defaultdict(list) # Load process_job profiler prof, offset = self._parse_profiler_output(bytes_buffer, offset) profilers[prof['worker_type']].append(prof) # Load worker profilers t, offset = read_advance('B', bytes_buffer, offset) num_load_workers = t[0] for i in range(num_load_workers): prof, offset = self._parse_profiler_output(bytes_buffer, offset) profilers[prof['worker_type']].append(prof) # Eval worker profilers t, offset = read_advance('B', bytes_buffer, offset) num_eval_workers = t[0] t, offset = read_advance('B', bytes_buffer, offset) groups_per_chain = t[0] for pu in range(num_eval_workers): for fg in range(groups_per_chain): prof, offset = self._parse_profiler_output( bytes_buffer, offset) if fg > 0 and fg < groups_per_chain - 1: prof['kernel_group'] = fg - 1 profilers[prof['worker_type']].append(prof) # Save worker profilers t, offset = read_advance('B', bytes_buffer, offset) num_save_workers = t[0] for i in range(num_save_workers): prof, offset = self._parse_profiler_output(bytes_buffer, offset) profilers[prof['worker_type']].append(prof) return (start_time, end_time), profilers else: return (start_time, end_time), self._parse_profiler_output(bytes_buffer, offset)[0]