Source code for pythonlab.pythonlab_reader

"""
Module with all necessary utility to parse a PythonLab process into a networkx graph.
todo: use pydentic to parse it into defined dataclasses
"""

from __future__ import annotations

import traceback
import logging
from abc import ABC
import graphviz
import inspect
import ast
from uuid import uuid1
from pathlib import Path
import sys
from importlib.util import spec_from_file_location, module_from_spec
import types
import networkx as nx
from pythonlab.process import PLProcess
from pythonlab.resource import ServiceResource, DynamicLabwareResource, LabwareResource
from typing import List, Dict, Any, Type
from copy import deepcopy
from contextlib import contextmanager


[docs] @contextmanager def get_source_code(src: str, node: ast.AST) -> str: try: code = ast.get_source_segment(src, node) yield code except Exception as ex: logging.error(f"Caught Exception {ex} while parsing \n\n{code}\n") raise ex
[docs] def find_pl_process_class(module: types.ModuleType) -> type[PLProcess]: process_classes = [ obj for obj in module.__dict__.values() if ( inspect.isclass(obj) and issubclass(obj, PLProcess) and obj is not PLProcess and obj.__module__ == module.__name__ # ensure defined in this file, not imported ) ] if not process_classes: raise RuntimeError(f"No subclass of PLProcess found in module {module.__name__}") return process_classes[0]
[docs] class PLProcessReader:
[docs] @staticmethod def parse_process_from_file_path(file_path: str | Path): spec = spec_from_file_location("importing_process", file_path) sys.path.append(Path(file_path).parent.as_posix()) module = module_from_spec(spec) sys.modules[spec.name] = module spec.loader.exec_module(module) # find the class implementing PLProcess process_class = find_pl_process_class(module) process = process_class() # call the parsing from instance method return PLProcessReader.parse_process_from_instance(process)
[docs] @staticmethod def parse_process_from_source_code(src: str): module_name = "tmp_module" module = types.ModuleType(module_name) # Execute the source code in the module's namespace. exec(src, module.__dict__) # Now, create an instance of the class implementing PLProcess. process_class = find_pl_process_class(module) process = process_class() # retrieve all imports and add them to scope scope = {} tree = ast.parse(src) for node in ast.walk(tree): if isinstance(node, ast.Import): for alias in node.names: stmt = f"import {alias.name}" + (f" as {alias.asname}" if alias.asname else "") exec(stmt, scope) elif isinstance(node, ast.ImportFrom): imports = ", ".join( f"{alias.name}" + (f" as {alias.asname}" if alias.asname else "") for alias in node.names ) stmt = f"from {node.module} import {imports}" exec(stmt, scope) return PLProcessReader.parse_process(process, src, scope)
[docs] @staticmethod def parse_process_from_instance(plp: PLProcess): src = inspect.getsource(type(plp)) scope = {} module_name = plp.__class__.__module__ module = sys.modules.get(module_name) if module: scope.update(module.__dict__) return PLProcessReader.parse_process(plp, src, scope)
[docs] @staticmethod def parse_process(plp: PLProcess, src=None, scope: dict | None = None): """ The main function. It takes a PythonLabProcess and derives a SMProcess from it. The Information is stored in Job and LabwareInfo classes from the structures-module. """ if scope is None: scope = {} if src is None: src = inspect.getsource(type(plp)) p = PLProcessReader.ast_get_process(type(plp), src) class Simulator(PLProcessSimulator, type(plp)): """This class is necessary to have the simulator inherit the methods of plp dynamically""" def __init__(self): PLProcessSimulator.__init__(self, type(plp)) simulator = Simulator() scope.update({'self': simulator, '_break_nodes': []}) devices = [key for key, var in vars(simulator).items() if isinstance(var, ServiceResource)] PLProcessReader.execute_scope(p.body, scope, [], src, devices, simulator) # contract the dummy_nodes used to link the if-conditions and break-nodes correctly PLProcessReader.contract_dummys(simulator.workflow) return simulator
[docs] @staticmethod def ast_get_process(process_type, src=None): if src is None: src = inspect.getsource(process_type) module = ast.parse(src) cl = [cls for cls in module.body if isinstance(cls, ast.ClassDef)][0] fcts = [elem for elem in cl.body if isinstance(elem, ast.FunctionDef)] processes = [elem for elem in fcts if elem.name == 'process'] return processes[0]
[docs] @staticmethod def contract_dummys(g: nx.DiGraph): """ This function contracts the dummy nodes in the workflow graph. To keep track of which labware participated in which process step throughout exploring if-clauses, break commands, etc. the reader includes dummy nodes into the workflow graph. In this function (after the whole process was read) these dummies are removed by contraction, i.e., remove them and connect all sources of incoming edges to all targets of outgoing edges. """ dummys = [n for n, data in g.nodes(data=True) if data['type'] == 'dummy'] for dummy in dummys: # buff1 and buff1 are the same as dummy for prior, buff1, data_in in g.in_edges(dummy, data=True): for buff2, posterior, data_out in g.out_edges(dummy, data=True): label = f"{data_in['label']} {data_out['label']}" kwargs = data_in.copy() kwargs.update(data_out) kwargs['label'] = label g.add_edge(prior, posterior, **kwargs) g.remove_node(dummy)
[docs] @staticmethod def execute_scope(body: List[ast.AST], scope: Dict[str, Any], runtime_vars: List[str], src: str, devices: List[str], plp: PLProcessSimulator): """ Goes through the list of code lines and constructs the workflow graph by manipulation the process state and executing certain lines of process description code. Ment to be called recursively :param plp: :param devices: List of variables that correspond to ServiceResources :param src: source code of the process description :param body: list of parsed code fragments :param scope: variables assigned so var :param runtime_vars: list of variable names in current scope that are evaluated at runtime :return: Nothing """ # make shallow copies of scope and runtime_vars scope = scope.copy() for node in body: if isinstance(node, ast.Expr): PLProcessReader.handle_expr(node, scope, runtime_vars, src) if isinstance(node, ast.Assign): PLProcessReader.handle_assign(node, scope, runtime_vars, src, devices, plp) if isinstance(node, ast.If): try: PLProcessReader.handle_if(node, scope, runtime_vars, src, devices, plp) except Exception as ex: logging.error(ex, traceback.print_exc()) if isinstance(node, ast.For): PLProcessReader.handle_for(node, scope, runtime_vars, src, devices, plp) if isinstance(node, ast.Break): PLProcessReader.handle_break(scope, plp)
[docs] @staticmethod def handle_expr(expr: ast.Expr, scope: Dict[str, Any], runtime_vars: List[str], src: str): with get_source_code(src, expr) as code: exec(code, scope)
[docs] @staticmethod def handle_assign(asg: ast.Assign, scope: Dict[str, Any], runtime_vars: List[str], src: str, devices: List[str], plp: PLProcessSimulator): with get_source_code(src, asg) as code: is_runtime_var = any(f"attr='{d}'" in ast.dump(asg.value) for d in devices) is_computation = any(f"id='{v}'" in ast.dump(asg.value) for v in runtime_vars) var_names = [t.id for t in asg.targets] if is_runtime_var: exec(ast.get_source_segment(src, asg.value), scope) plp.add_var_nodes(*var_names) runtime_vars.extend([v for v in var_names if v not in runtime_vars]) elif is_computation: used_vars = filter(lambda v: f"id='{v}'" in ast.dump(asg.value), runtime_vars) needed = {key: val for key, val in scope.items() if key in code} plp.add_computation(name=asg.targets[0].id, var_names=used_vars, fct_code=ast.get_source_segment(src, asg.value), needed_scope=needed.copy()) runtime_vars.extend([v for v in var_names if v not in runtime_vars]) else: exec(ast.get_source_segment(src, asg), scope) for name in var_names: if name in runtime_vars: runtime_vars.remove(name)
[docs] @staticmethod def handle_if(node: ast.If, scope: Dict[str, Any], runtime_vars: List[str], src: str, devices: List[str], plp: PLProcessSimulator): is_runtime_decision = any(f"id='{v}'" in ast.dump(node.test) for v in runtime_vars) if is_runtime_decision: used_vars = filter(lambda v: f"id='{v}'" in ast.dump(node.test), runtime_vars) with get_source_code(src, node.test) as code: needed = {key: val for key, val in scope.items() if key in code} if_node = plp.add_if_node(used_vars, code, needed) origin_state = plp.get_state() plp.prepare_true_execution(if_node) PLProcessReader.execute_scope(node.body, scope, runtime_vars, src, devices, plp) after_true_state = plp.get_state() plp.prepare_false_execution(if_node) PLProcessReader.execute_scope(node.orelse, scope, runtime_vars, src, devices, plp) plp.join_state(after_true_state) if len(scope['_break_nodes']) > 0: last_break = scope['_break_nodes'][0] else: last_break = -1 plp.finalize_if_construction(if_node, origin_state, last_break=last_break) else: # if this is no runtime decision, we evaluate it and execute accordingly with get_source_code(src, node.test) as code: decision = eval(code, scope) if decision: PLProcessReader.execute_scope(node.body, scope, runtime_vars, src, devices, plp) else: PLProcessReader.execute_scope(node.orelse, scope, runtime_vars, src, devices, plp)
[docs] @staticmethod def handle_for(node: ast.For, scope: Dict[str, Any], runtime_vars: List[str], src: str, devices: List[str], plp: PLProcessSimulator): iter_var = node.target.id with get_source_code(src, node.iter) as iter_over: unique_int = uuid1().int tmp = f"v{unique_int}" exec(f"{tmp} = {iter_over}", scope) for buff in scope[tmp]: # update the iteration variable in scope scope[iter_var] = buff PLProcessReader.execute_scope(node.body, scope, runtime_vars, src, devices, plp) # after finishing everything in for-loop finalize the break-nodes, that occurred in it while scope['_break_nodes']: # we have to iterate in a first-in->first-out fashion to respect the python syntax: # pop() pops the last element break_node = scope['_break_nodes'].pop() plp.finalize_break_node(break_node)
[docs] @staticmethod def handle_break(scope: Dict[str, Any], plp: PLProcessSimulator): # create a new break-node break_node = plp.add_break_node() # add it to the hidden list in scope (append adds in the end of the list) scope["_break_nodes"].append(break_node)
[docs] class PLProcessSimulator(PLProcess, ABC): """ A utility class to help going through actions in a PythonLabProcess while linking them correctly. This is done by systematically recursively exploring the abstract syntax tree and executing all functions of ServiceResources while keeping track of the labware. Each call of a function of a ServiceResource adds a new process step node to the workflow graph. The edges are determined by consecutive participation of labware in two steps. """ def __init__(self, process_type: Type[PLProcess]): self.workflow = nx.DiGraph() # this graph will represent the whole experiment self.last_job = {} # dictionary providing a list of current nodes of each sample in the workflow graph self.last_action = {} # dictionary providing the last action added id of each labware in the workflow graph self.label_to_node = {} process_type.__init__(self)
[docs] def visualize_workflow_graph(self, show=True): dot = graphviz.Digraph(comment="Workflow") dot.attr(rankdir='LR') node_col = dict(labware='grey', operation='red', if_node="yellow", variable='blue', computation='cyan') for n, data in self.workflow.nodes(data=True): dot.node(str(n), data['name'], color=node_col[data['type']], style='filled') for u, v in self.workflow.edges(): dot.edge(str(u), str(v), '') dot.format = "png" dot.render("workflow", view=show) return dot
[docs] def set_starting_position(self, resource: LabwareResource, device: ServiceResource, position: int): # the list of last_job is supposed to have only one entry at this point start_position = self.last_job[resource.name][0] cur_job = self.workflow.nodes[start_position] cur_job['origin_pos'] = position cur_job['origin'] = device.name cur_job['origin_type'] = type(device) cur_job['lidded'] = resource.lidded if isinstance(resource, DynamicLabwareResource): cur_job['is_reagent'] = True # copy all keyword arguments into the workflow node for key, val in resource.kwargs.items(): cur_job[key] = val
[docs] def register_labware_resource(self, resource): super().register_labware_resource(resource) # add a workflow node marking the start of the resources journey new_node = self.add_node(dict(type='labware', name=resource.name)) # these are used in the construction of the wfg self.last_job[resource.name] = [new_node] self.last_action[resource.name] = -1 self.labware_nodes[resource.name] = new_node
[docs] def handle_labels(self, t, **kwargs): """ Very preliminary handling of relations between process steps independent of labware :param t: :param kwargs: :return: """ if "label" in kwargs: self.label_to_node[kwargs['label']] = t logging.debug("label_to_node: ", self.label_to_node) edges = {} if "relations" in kwargs: for relation, label, args in kwargs['relations']: node = self.label_to_node[label] logging.debug(relation, label, node, args) if node not in edges: edges[node] = dict(wait_cost=1, label=relation, max_wait=float('inf')) if relation == "direct_after": edges[node]['wait_cost'] = 200 if relation == "min_wait": edges[node]['min_wait'] = args[0] if relation == "max_wait": edges[node]['max_wait'] = args[0] for node, data in edges.items(): self.add_edge(node, t, **data)
[docs] def add_process_step(self, resource: ServiceResource, labware: List[LabwareResource], is_movement: bool = False, **kwargs): if "executor" not in kwargs: kwargs['executor'] = [] # if a certain device shall execute this operation, we add its name to the kwargs if resource.name in [r.name for r in self._service_resources]: kwargs['executor'].append(resource) node_attr = dict(cont_names=[c.name for c in labware], type='operation', name=f"{kwargs['fct']} {', '.join([c.name for c in labware])}", device_type=type(resource)) node_attr.update(kwargs) t = self.add_node(node_attr) self.handle_labels(t, **kwargs) if "reagents" in kwargs: labware = labware + kwargs["reagents"] for labware_piece in labware: max_wait = labware_piece.consume_max_wait() min_wait = labware_piece.consume_min_wait() wait_cost = labware_piece.consume_wait_cost() last = self.last_job[labware_piece.name] edge_attr = dict(wait_cost=wait_cost, cont_name=labware_piece.name, label='', max_wait=max_wait if max_wait is not None else float('inf')) # if this is a starting step, these are wait_to_start_costs if any(self.workflow.nodes[l]['type'] == "labware" for l in last): self.workflow.nodes[t]["wait_to_start_costs"] = wait_cost if min_wait: edge_attr['min_wait'] = min_wait for s in last: self.add_edge(s, t, **edge_attr) self.last_job[labware_piece.name] = [t] self.last_action[labware_piece.name] = t
[docs] def add_var_nodes(self, *args): last_node = list(self.workflow.nodes)[-1] for var_name in args: new_node = self.add_node(dict(name=var_name, type='variable', var_name=var_name)) self.add_edge(last_node, new_node, label='out')
[docs] def add_computation(self, name, var_names, fct_code, needed_scope): # this function will be called at runtime to execute the computation def fct(**kwargs): my_scope = needed_scope.copy() my_scope.update(kwargs) return eval(fct_code, my_scope) new_node = self.add_node(dict(type='computation', function=fct, name=name, var_name=name)) # link the node to its needed variables for name in var_names: # take the latest of mathing variable nodes var_nodes = [n for n, data in self.workflow.nodes(data=True) if data['type'] in ['variable', 'computation'] and data['name'] == name] var_node = var_nodes[-1] self.add_edge(var_node, new_node, label='in')
[docs] def add_node(self, attr): n = self.workflow.number_of_nodes() self.workflow.add_nodes_from([(n, attr)]) return n
[docs] def add_edge(self, s, t, **kwargs): self.workflow.add_edges_from([(s, t, kwargs)])
[docs] def prepare_true_execution(self, if_node): true_dummy = self.workflow.nodes[if_node]["true_dummy"] for name, state in self.last_job.items(): self.last_job[name] = [true_dummy]
[docs] def prepare_false_execution(self, if_node): false_dummy = self.workflow.nodes[if_node]["false_dummy"] for name, state in self.last_job.items(): self.last_job[name] = [false_dummy]
# these utility functions are used for example in parsing if-statements
[docs] def get_state(self): return self.last_job.copy()
[docs] def set_state(self, status): self.last_job = status.copy()
[docs] def join_state(self, status): for n, l in self.last_job.items(): l.extend(status[n]) self.last_job[n] = list(set(l))
[docs] def add_if_node(self, var_names, decision_code, needed_scope): # this function will be called at runtime to evaluate the decision def fct(**kwargs): my_scope = needed_scope.copy() my_scope.update(kwargs) return eval(decision_code, my_scope) true_dummy = self.add_node(dict(type="dummy", name="true dummy")) false_dummy = self.add_node(dict(type="dummy", name="false dummy")) new_node = self.add_node(dict(type="if_node", name=decision_code, function=fct, true_dummy=true_dummy, false_dummy=false_dummy)) self.add_edge(new_node, true_dummy, sub_tree=True, label="True") self.add_edge(new_node, false_dummy, sub_tree=False, label="False") # link the node to its needed variables for name in var_names: # take the latest of mathing variable nodes var_nodes = [n for n, data in self.workflow.nodes(data=True) if data['type'] in ['variable', 'computation'] and data['name'] == name] var_node = var_nodes[-1] self.add_edge(var_node, new_node, label='in') return new_node
[docs] def finalize_if_construction(self, if_node, orig_state, last_break=-1): for cont, last in self.last_job.items(): # check whether some actions have been added to this labware since the if_node creation if self.last_action[cont] < if_node: # check for break nodes in the scope of this if clause if last_break > if_node: self.workflow.nodes[last_break]['if_nodes'].insert(0, (orig_state[cont], if_node, cont)) else: self.last_job[cont] = orig_state[cont].copy() else: for old in orig_state[cont]: self.add_edge(old, if_node, label='', cont_name=cont)
[docs] def add_break_node(self): # the only thing a break node needs is a copy of the current state new_node = self.add_node(dict(type="dummy", name='break', cur_state=deepcopy(self.last_job), if_nodes=[])) for cont in self.last_job: self.last_job[cont] = [] return new_node
[docs] def finalize_break_node(self, break_node): # check for every labware whether something has changed since creating the break node before_break = self.workflow.nodes[break_node]['cur_state'] for cont, last_action in self.last_action.items(): if last_action > break_node: for old in before_break[cont]: self.add_edge(old, break_node, label='', cont_name=cont) self.last_job[cont].append(break_node) else: self.last_job[cont] = before_break[cont].copy() for orig_state, if_node, cont in self.workflow.nodes[break_node]['if_nodes']: if self.last_action[cont] > if_node: for old in orig_state: self.add_edge(old, if_node, label='', cont_name=cont) else: self.last_job[cont] = orig_state.copy()