"""This module provides an assortment of actions generally concerned with executing
simulations locally. Some Actions will also be useful when using Dask.
"""
import os
from pathlib import Path
import shutil
import subprocess
import dill
import copy
__license__ = "LGPL"
[docs]def local_execute(encoder, command, decoder, root='/tmp'):
"""A helper function for a simple local execution.
It will create a directory under your specified root folder, encode the sampler output, execute a command
and decode the results of the simulation.
Parameters
----------
encoder: Encoder
an encoder to use
command: list of str
a command to run your simulation (same as argument to popen, e.g. ['ls', '-al'])
decoder: Decoder
a decoder to use
root: str
root folder, for example '/tmp' or if you want to use ram based filesystem it could be '/dev/shm'
Returns
-------
Actions
"""
return Actions(
CreateRunDirectory(root),
Encode(encoder),
ExecuteLocal(command),
Decode(decoder))
[docs]class CreateRunDirectory():
"""Creates a directory structure for storing simulation input and output files.
Parameters
----------
root: str
Root directory to create a directory structure in.
flatten: bool
If set to True will result in a flat directory structure (each run gets a directory
under root). If left as False will create a hierarchical structure. This is useful
so as not to overload the filesystem.
"""
def __init__(self, root, flatten=False):
self.root = root
self.flatten = flatten
[docs] def start(self, previous=None):
"""Starts the action.
Will read a `run_id` from a dictionary supplied by the previous `Action`.
Will then create a directory structure based on the numerical value of the `run_id`.
Returns
-------
dict
A dictionary to be passed to the following `Action`.
"""
run_id = previous['run_id']
level1_a, level1_b = (int(run_id / 100 ** 4) * 100 ** 4,
int(run_id / 100 ** 4 + 1) * 100 ** 4)
level2_a, level2_b = (int(run_id / 100 ** 3) * 100 ** 3,
int(run_id / 100 ** 3 + 1) * 100 ** 3)
level3_a, level3_b = (int(run_id / 100 ** 2) * 100 ** 2,
int(run_id / 100 ** 2 + 1) * 100 ** 2)
level4_a, level4_b = (int(run_id / 100 ** 1) * 100 ** 1,
int(run_id / 100 ** 1 + 1) * 100 ** 1)
level1_dir = "runs_{}-{}/".format(level1_a, level1_b)
level2_dir = "runs_{}-{}/".format(level2_a, level2_b)
level3_dir = "runs_{}-{}/".format(level3_a, level3_b)
level4_dir = "runs_{}-{}/".format(level4_a, level4_b)
level5_dir = "run_{}".format(int(run_id))
if self.flatten:
path = os.path.join(self.root, previous['campaign_dir'], 'runs', level5_dir)
else:
path = os.path.join(self.root, previous['campaign_dir'], 'runs',
level1_dir, level2_dir, level3_dir, level4_dir, level5_dir)
Path(path).mkdir(parents=True, exist_ok=True)
previous['rundir'] = path
self.result = previous
return self.result
[docs] def succeeded(self):
"""Has the `Action` finished successfully.
Returns
-------
bool
True if `Action` completed successfully. False otherwise.
"""
return True
[docs]class Encode():
def __init__(self, encoder):
self.encoder = encoder
[docs] def start(self, previous=None):
self.encoder.encode(
params=previous['run_info']['params'],
target_dir=previous['rundir'])
try:
previous['encoder_filename'] = self.encoder.target_filename
except AttributeError:
pass
return previous
[docs] def finished(self):
return True
[docs] def finalise(self):
pass
[docs] def succeeeded(self):
return True
[docs]class Decode():
def __init__(self, decoder):
self.decoder = decoder
[docs] def start(self, previous=None):
run_info = copy.copy(previous['run_info'])
run_info['run_dir'] = previous['rundir']
result = self.decoder.parse_sim_output(run_info)
previous['result'] = result
previous['decoder_filename'] = self.decoder.target_filename
previous['collated'] = True
return previous
[docs] def finished(self):
return True
[docs] def finalise(self):
pass
[docs] def succeeded(self):
return True
[docs]class CleanUp():
def __init__(self):
pass
[docs] def start(self, previous=None):
if not ('rundir' in previous.keys()):
raise RuntimeError('must be used with actions that create a directory structure')
shutil.rmtree(previous['rundir'])
return previous
[docs] def finished(self):
return True
[docs] def finalise(self):
pass
[docs] def succeeded(self):
return True
[docs]class ExecutePython():
def __init__(self, function):
self.function = dill.dumps(function)
self.params = None
self.eval_result = None
[docs] def start(self, previous=None):
function = dill.loads(self.function)
self.eval_result = function(previous['run_info']['params'])
previous['result'] = self.eval_result
previous['collated'] = True
return previous
[docs] def finished(self):
if self.eval_result is None:
return False
else:
return True
[docs] def finalise(self):
pass
[docs] def succeeded(self):
if not self.finished():
raise RuntimeError('action did not finish yet')
else:
return True
[docs]class ExecuteLocal():
def __init__(self, full_cmd, stdout=None, stderr=None):
self.full_cmd = full_cmd.split()
self.popen_object = None
self.ret = None
self._started = False
self.stdout = stdout
self.stderr = stderr
[docs] def start(self, previous=None):
target_dir = previous['rundir']
if isinstance(self.stdout, str):
stdout = open(os.path.join(target_dir, self.stdout), 'w')
else:
stdout = self.stdout
if isinstance(self.stderr, str):
stderr = open(os.path.join(target_dir, self.stderr), 'w')
else:
stderr = self.stderr
self.ret = subprocess.run(
self.full_cmd, cwd=target_dir,
stdout=stdout, stderr=stderr)
return previous
[docs] def finished(self):
return True
[docs] def finalise(self):
"""Performs clean-up if necessary. In this case it isn't. I think.
"""
pass
[docs] def succeeded(self):
"""Will return True if the process finished successfully.
It judges based on the return code and will return False
if that code is not zero.
"""
if self.ret != 0:
return False
else:
return True
[docs]class Actions():
def __init__(self, *args):
self.actions = list(args)
self.wrapper = lambda action, previous: action.start(previous)
[docs] def set_wrapper(self, wrapper):
"""Adds a wrapper to be called on each Action.
Parameters
----------
wrapper: callable
A function to call on each Action. Should pass through the return of the
start method.
"""
self.wrapper = wrapper
[docs] def start(self, previous=None):
for action in self.actions:
if not hasattr(action, 'start'):
raise RuntimeError('action in the actions list does not provide a start method')
previous = copy.copy(previous)
run_id = previous['run_id']
for action in self.actions:
previous = self.wrapper(action, previous)
self.result = previous
assert (self.result['run_id'] == run_id)
return previous
[docs] def finished(self):
return all([action.finished() for action in self.actions])
[docs] def finalise(self):
for action in self.actions:
action.finalise()
[docs] def succeeded(self):
return all([action.succeeded() for action in self.actions])