"""
Module with all necessary utility to parse a PythonLab process into a networkx graph.
todo: use pydentic to parse it into defined dataclasses
"""
from __future__ import annotations
import traceback
import logging
from abc import ABC
import graphviz
import inspect
import ast
from uuid import uuid1
from pathlib import Path
import sys
from importlib.util import spec_from_file_location, module_from_spec
import types
import networkx as nx
from pythonlab.process import PLProcess
from pythonlab.resource import ServiceResource, DynamicLabwareResource, LabwareResource
from typing import List, Dict, Any, Type
from copy import deepcopy
from contextlib import contextmanager
[docs]
@contextmanager
def get_source_code(src: str, node: ast.AST) -> str:
try:
code = ast.get_source_segment(src, node)
yield code
except Exception as ex:
logging.error(f"Caught Exception {ex} while parsing \n\n{code}\n")
raise ex
[docs]
def find_pl_process_class(module: types.ModuleType) -> type[PLProcess]:
process_classes = [
obj
for obj in module.__dict__.values()
if (
inspect.isclass(obj)
and issubclass(obj, PLProcess)
and obj is not PLProcess
and obj.__module__
== module.__name__ # ensure defined in this file, not imported
)
]
if not process_classes:
raise RuntimeError(f"No subclass of PLProcess found in module {module.__name__}")
return process_classes[0]
[docs]
class PLProcessReader:
[docs]
@staticmethod
def parse_process_from_file_path(file_path: str | Path):
spec = spec_from_file_location("importing_process", file_path)
sys.path.append(Path(file_path).parent.as_posix())
module = module_from_spec(spec)
sys.modules[spec.name] = module
spec.loader.exec_module(module)
# find the class implementing PLProcess
process_class = find_pl_process_class(module)
process = process_class()
# call the parsing from instance method
return PLProcessReader.parse_process_from_instance(process)
[docs]
@staticmethod
def parse_process_from_source_code(src: str):
module_name = "tmp_module"
module = types.ModuleType(module_name)
# Execute the source code in the module's namespace.
exec(src, module.__dict__)
# Now, create an instance of the class implementing PLProcess.
process_class = find_pl_process_class(module)
process = process_class()
# retrieve all imports and add them to scope
scope = {}
tree = ast.parse(src)
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
stmt = f"import {alias.name}" + (f" as {alias.asname}" if alias.asname else "")
exec(stmt, scope)
elif isinstance(node, ast.ImportFrom):
imports = ", ".join(
f"{alias.name}" + (f" as {alias.asname}" if alias.asname else "")
for alias in node.names
)
stmt = f"from {node.module} import {imports}"
exec(stmt, scope)
return PLProcessReader.parse_process(process, src, scope)
[docs]
@staticmethod
def parse_process_from_instance(plp: PLProcess):
src = inspect.getsource(type(plp))
scope = {}
module_name = plp.__class__.__module__
module = sys.modules.get(module_name)
if module:
scope.update(module.__dict__)
return PLProcessReader.parse_process(plp, src, scope)
[docs]
@staticmethod
def parse_process(plp: PLProcess, src=None, scope: dict | None = None):
"""
The main function. It takes a PythonLabProcess and derives a SMProcess from it. The Information is stored
in Job and LabwareInfo classes from the structures-module.
"""
if scope is None:
scope = {}
if src is None:
src = inspect.getsource(type(plp))
p = PLProcessReader.ast_get_process(type(plp), src)
class Simulator(PLProcessSimulator, type(plp)):
"""This class is necessary to have the simulator inherit the methods of plp dynamically"""
def __init__(self):
PLProcessSimulator.__init__(self, type(plp))
simulator = Simulator()
scope.update({'self': simulator, '_break_nodes': []})
devices = [key for key, var in vars(simulator).items() if isinstance(var, ServiceResource)]
PLProcessReader.execute_scope(p.body, scope, [], src, devices, simulator)
# contract the dummy_nodes used to link the if-conditions and break-nodes correctly
PLProcessReader.contract_dummys(simulator.workflow)
return simulator
[docs]
@staticmethod
def ast_get_process(process_type, src=None):
if src is None:
src = inspect.getsource(process_type)
module = ast.parse(src)
cl = [cls for cls in module.body if isinstance(cls, ast.ClassDef)][0]
fcts = [elem for elem in cl.body if isinstance(elem, ast.FunctionDef)]
processes = [elem for elem in fcts if elem.name == 'process']
return processes[0]
[docs]
@staticmethod
def contract_dummys(g: nx.DiGraph):
"""
This function contracts the dummy nodes in the workflow graph. To keep track of which labware participated in
which process step throughout exploring if-clauses, break commands, etc. the reader includes dummy nodes
into the workflow graph. In this function (after the whole process was read) these dummies are removed by
contraction, i.e., remove them and connect all sources of incoming edges to all targets of outgoing edges.
"""
dummys = [n for n, data in g.nodes(data=True) if data['type'] == 'dummy']
for dummy in dummys:
# buff1 and buff1 are the same as dummy
for prior, buff1, data_in in g.in_edges(dummy, data=True):
for buff2, posterior, data_out in g.out_edges(dummy, data=True):
label = f"{data_in['label']} {data_out['label']}"
kwargs = data_in.copy()
kwargs.update(data_out)
kwargs['label'] = label
g.add_edge(prior, posterior, **kwargs)
g.remove_node(dummy)
[docs]
@staticmethod
def execute_scope(body: List[ast.AST], scope: Dict[str, Any], runtime_vars: List[str], src: str,
devices: List[str], plp: PLProcessSimulator):
"""
Goes through the list of code lines and constructs the workflow graph by manipulation the process state and
executing certain lines of process description code. Ment to be called recursively
:param plp:
:param devices: List of variables that correspond to ServiceResources
:param src: source code of the process description
:param body: list of parsed code fragments
:param scope: variables assigned so var
:param runtime_vars: list of variable names in current scope that are evaluated at runtime
:return: Nothing
"""
# make shallow copies of scope and runtime_vars
scope = scope.copy()
for node in body:
if isinstance(node, ast.Expr):
PLProcessReader.handle_expr(node, scope, runtime_vars, src)
if isinstance(node, ast.Assign):
PLProcessReader.handle_assign(node, scope, runtime_vars, src, devices, plp)
if isinstance(node, ast.If):
try:
PLProcessReader.handle_if(node, scope, runtime_vars, src, devices, plp)
except Exception as ex:
logging.error(ex, traceback.print_exc())
if isinstance(node, ast.For):
PLProcessReader.handle_for(node, scope, runtime_vars, src, devices, plp)
if isinstance(node, ast.Break):
PLProcessReader.handle_break(scope, plp)
[docs]
@staticmethod
def handle_expr(expr: ast.Expr, scope: Dict[str, Any], runtime_vars: List[str], src: str):
with get_source_code(src, expr) as code:
exec(code, scope)
[docs]
@staticmethod
def handle_assign(asg: ast.Assign, scope: Dict[str, Any], runtime_vars: List[str], src: str, devices: List[str],
plp: PLProcessSimulator):
with get_source_code(src, asg) as code:
is_runtime_var = any(f"attr='{d}'" in ast.dump(asg.value) for d in devices)
is_computation = any(f"id='{v}'" in ast.dump(asg.value) for v in runtime_vars)
var_names = [t.id for t in asg.targets]
if is_runtime_var:
exec(ast.get_source_segment(src, asg.value), scope)
plp.add_var_nodes(*var_names)
runtime_vars.extend([v for v in var_names if v not in runtime_vars])
elif is_computation:
used_vars = filter(lambda v: f"id='{v}'" in ast.dump(asg.value), runtime_vars)
needed = {key: val for key, val in scope.items() if key in code}
plp.add_computation(name=asg.targets[0].id, var_names=used_vars,
fct_code=ast.get_source_segment(src, asg.value), needed_scope=needed.copy())
runtime_vars.extend([v for v in var_names if v not in runtime_vars])
else:
exec(ast.get_source_segment(src, asg), scope)
for name in var_names:
if name in runtime_vars:
runtime_vars.remove(name)
[docs]
@staticmethod
def handle_if(node: ast.If, scope: Dict[str, Any], runtime_vars: List[str], src: str, devices: List[str],
plp: PLProcessSimulator):
is_runtime_decision = any(f"id='{v}'" in ast.dump(node.test) for v in runtime_vars)
if is_runtime_decision:
used_vars = filter(lambda v: f"id='{v}'" in ast.dump(node.test), runtime_vars)
with get_source_code(src, node.test) as code:
needed = {key: val for key, val in scope.items() if key in code}
if_node = plp.add_if_node(used_vars, code, needed)
origin_state = plp.get_state()
plp.prepare_true_execution(if_node)
PLProcessReader.execute_scope(node.body, scope, runtime_vars, src, devices, plp)
after_true_state = plp.get_state()
plp.prepare_false_execution(if_node)
PLProcessReader.execute_scope(node.orelse, scope, runtime_vars, src, devices, plp)
plp.join_state(after_true_state)
if len(scope['_break_nodes']) > 0:
last_break = scope['_break_nodes'][0]
else:
last_break = -1
plp.finalize_if_construction(if_node, origin_state, last_break=last_break)
else:
# if this is no runtime decision, we evaluate it and execute accordingly
with get_source_code(src, node.test) as code:
decision = eval(code, scope)
if decision:
PLProcessReader.execute_scope(node.body, scope, runtime_vars, src, devices, plp)
else:
PLProcessReader.execute_scope(node.orelse, scope, runtime_vars, src, devices, plp)
[docs]
@staticmethod
def handle_for(node: ast.For, scope: Dict[str, Any], runtime_vars: List[str], src: str, devices: List[str],
plp: PLProcessSimulator):
iter_var = node.target.id
with get_source_code(src, node.iter) as iter_over:
unique_int = uuid1().int
tmp = f"v{unique_int}"
exec(f"{tmp} = {iter_over}", scope)
for buff in scope[tmp]:
# update the iteration variable in scope
scope[iter_var] = buff
PLProcessReader.execute_scope(node.body, scope, runtime_vars, src, devices, plp)
# after finishing everything in for-loop finalize the break-nodes, that occurred in it
while scope['_break_nodes']:
# we have to iterate in a first-in->first-out fashion to respect the python syntax:
# pop() pops the last element
break_node = scope['_break_nodes'].pop()
plp.finalize_break_node(break_node)
[docs]
@staticmethod
def handle_break(scope: Dict[str, Any], plp: PLProcessSimulator):
# create a new break-node
break_node = plp.add_break_node()
# add it to the hidden list in scope (append adds in the end of the list)
scope["_break_nodes"].append(break_node)
[docs]
class PLProcessSimulator(PLProcess, ABC):
"""
A utility class to help going through actions in a PythonLabProcess while linking them correctly.
This is done by systematically recursively exploring the abstract syntax tree and executing all
functions of ServiceResources while keeping track of the labware. Each call of a function of a ServiceResource
adds a new process step node to the workflow graph. The edges are determined by consecutive participation of labware
in two steps.
"""
def __init__(self, process_type: Type[PLProcess]):
self.workflow = nx.DiGraph() # this graph will represent the whole experiment
self.last_job = {} # dictionary providing a list of current nodes of each sample in the workflow graph
self.last_action = {} # dictionary providing the last action added id of each labware in the workflow graph
self.label_to_node = {}
process_type.__init__(self)
[docs]
def visualize_workflow_graph(self, show=True):
dot = graphviz.Digraph(comment="Workflow")
dot.attr(rankdir='LR')
node_col = dict(labware='grey', operation='red', if_node="yellow", variable='blue', computation='cyan')
for n, data in self.workflow.nodes(data=True):
dot.node(str(n), data['name'], color=node_col[data['type']], style='filled')
for u, v in self.workflow.edges():
dot.edge(str(u), str(v), '')
dot.format = "png"
dot.render("workflow", view=show)
return dot
[docs]
def set_starting_position(self, resource: LabwareResource, device: ServiceResource, position: int):
# the list of last_job is supposed to have only one entry at this point
start_position = self.last_job[resource.name][0]
cur_job = self.workflow.nodes[start_position]
cur_job['origin_pos'] = position
cur_job['origin'] = device.name
cur_job['origin_type'] = type(device)
cur_job['lidded'] = resource.lidded
if isinstance(resource, DynamicLabwareResource):
cur_job['is_reagent'] = True
# copy all keyword arguments into the workflow node
for key, val in resource.kwargs.items():
cur_job[key] = val
[docs]
def register_labware_resource(self, resource):
super().register_labware_resource(resource)
# add a workflow node marking the start of the resources journey
new_node = self.add_node(dict(type='labware', name=resource.name))
# these are used in the construction of the wfg
self.last_job[resource.name] = [new_node]
self.last_action[resource.name] = -1
self.labware_nodes[resource.name] = new_node
[docs]
def handle_labels(self, t, **kwargs):
"""
Very preliminary handling of relations between process steps independent of labware
:param t:
:param kwargs:
:return:
"""
if "label" in kwargs:
self.label_to_node[kwargs['label']] = t
logging.debug("label_to_node: ", self.label_to_node)
edges = {}
if "relations" in kwargs:
for relation, label, args in kwargs['relations']:
node = self.label_to_node[label]
logging.debug(relation, label, node, args)
if node not in edges:
edges[node] = dict(wait_cost=1, label=relation, max_wait=float('inf'))
if relation == "direct_after":
edges[node]['wait_cost'] = 200
if relation == "min_wait":
edges[node]['min_wait'] = args[0]
if relation == "max_wait":
edges[node]['max_wait'] = args[0]
for node, data in edges.items():
self.add_edge(node, t, **data)
[docs]
def add_process_step(self, resource: ServiceResource, labware: List[LabwareResource], is_movement: bool = False, **kwargs):
if "executor" not in kwargs:
kwargs['executor'] = []
# if a certain device shall execute this operation, we add its name to the kwargs
if resource.name in [r.name for r in self._service_resources]:
kwargs['executor'].append(resource)
node_attr = dict(cont_names=[c.name for c in labware], type='operation',
name=f"{kwargs['fct']} {', '.join([c.name for c in labware])}",
device_type=type(resource))
node_attr.update(kwargs)
t = self.add_node(node_attr)
self.handle_labels(t, **kwargs)
if "reagents" in kwargs:
labware = labware + kwargs["reagents"]
for labware_piece in labware:
max_wait = labware_piece.consume_max_wait()
min_wait = labware_piece.consume_min_wait()
wait_cost = labware_piece.consume_wait_cost()
last = self.last_job[labware_piece.name]
edge_attr = dict(wait_cost=wait_cost,
cont_name=labware_piece.name,
label='',
max_wait=max_wait if max_wait is not None else float('inf'))
# if this is a starting step, these are wait_to_start_costs
if any(self.workflow.nodes[l]['type'] == "labware" for l in last):
self.workflow.nodes[t]["wait_to_start_costs"] = wait_cost
if min_wait:
edge_attr['min_wait'] = min_wait
for s in last:
self.add_edge(s, t, **edge_attr)
self.last_job[labware_piece.name] = [t]
self.last_action[labware_piece.name] = t
[docs]
def add_var_nodes(self, *args):
last_node = list(self.workflow.nodes)[-1]
for var_name in args:
new_node = self.add_node(dict(name=var_name, type='variable', var_name=var_name))
self.add_edge(last_node, new_node, label='out')
[docs]
def add_computation(self, name, var_names, fct_code, needed_scope):
# this function will be called at runtime to execute the computation
def fct(**kwargs):
my_scope = needed_scope.copy()
my_scope.update(kwargs)
return eval(fct_code, my_scope)
new_node = self.add_node(dict(type='computation', function=fct, name=name, var_name=name))
# link the node to its needed variables
for name in var_names:
# take the latest of mathing variable nodes
var_nodes = [n for n, data in self.workflow.nodes(data=True)
if data['type'] in ['variable', 'computation'] and data['name'] == name]
var_node = var_nodes[-1]
self.add_edge(var_node, new_node, label='in')
[docs]
def add_node(self, attr):
n = self.workflow.number_of_nodes()
self.workflow.add_nodes_from([(n, attr)])
return n
[docs]
def add_edge(self, s, t, **kwargs):
self.workflow.add_edges_from([(s, t, kwargs)])
[docs]
def prepare_true_execution(self, if_node):
true_dummy = self.workflow.nodes[if_node]["true_dummy"]
for name, state in self.last_job.items():
self.last_job[name] = [true_dummy]
[docs]
def prepare_false_execution(self, if_node):
false_dummy = self.workflow.nodes[if_node]["false_dummy"]
for name, state in self.last_job.items():
self.last_job[name] = [false_dummy]
# these utility functions are used for example in parsing if-statements
[docs]
def get_state(self):
return self.last_job.copy()
[docs]
def set_state(self, status):
self.last_job = status.copy()
[docs]
def join_state(self, status):
for n, l in self.last_job.items():
l.extend(status[n])
self.last_job[n] = list(set(l))
[docs]
def add_if_node(self, var_names, decision_code, needed_scope):
# this function will be called at runtime to evaluate the decision
def fct(**kwargs):
my_scope = needed_scope.copy()
my_scope.update(kwargs)
return eval(decision_code, my_scope)
true_dummy = self.add_node(dict(type="dummy", name="true dummy"))
false_dummy = self.add_node(dict(type="dummy", name="false dummy"))
new_node = self.add_node(dict(type="if_node", name=decision_code, function=fct, true_dummy=true_dummy,
false_dummy=false_dummy))
self.add_edge(new_node, true_dummy, sub_tree=True, label="True")
self.add_edge(new_node, false_dummy, sub_tree=False, label="False")
# link the node to its needed variables
for name in var_names:
# take the latest of mathing variable nodes
var_nodes = [n for n, data in self.workflow.nodes(data=True)
if data['type'] in ['variable', 'computation'] and data['name'] == name]
var_node = var_nodes[-1]
self.add_edge(var_node, new_node, label='in')
return new_node
[docs]
def finalize_if_construction(self, if_node, orig_state, last_break=-1):
for cont, last in self.last_job.items():
# check whether some actions have been added to this labware since the if_node creation
if self.last_action[cont] < if_node:
# check for break nodes in the scope of this if clause
if last_break > if_node:
self.workflow.nodes[last_break]['if_nodes'].insert(0, (orig_state[cont], if_node, cont))
else:
self.last_job[cont] = orig_state[cont].copy()
else:
for old in orig_state[cont]:
self.add_edge(old, if_node, label='', cont_name=cont)
[docs]
def add_break_node(self):
# the only thing a break node needs is a copy of the current state
new_node = self.add_node(dict(type="dummy", name='break', cur_state=deepcopy(self.last_job), if_nodes=[]))
for cont in self.last_job:
self.last_job[cont] = []
return new_node
[docs]
def finalize_break_node(self, break_node):
# check for every labware whether something has changed since creating the break node
before_break = self.workflow.nodes[break_node]['cur_state']
for cont, last_action in self.last_action.items():
if last_action > break_node:
for old in before_break[cont]:
self.add_edge(old, break_node, label='', cont_name=cont)
self.last_job[cont].append(break_node)
else:
self.last_job[cont] = before_break[cont].copy()
for orig_state, if_node, cont in self.workflow.nodes[break_node]['if_nodes']:
if self.last_action[cont] > if_node:
for old in orig_state:
self.add_edge(old, if_node, label='', cont_name=cont)
else:
self.last_job[cont] = orig_state.copy()