Source code for easyvvuq.actions.execute_qcgpj

import base64
import json
import logging
from os import environ
from os import remove

import time

import dill
from typing import Tuple, Dict, Any

from concurrent.futures import Executor

from qcg.pilotjob.executor_api.qcgpj_executor import QCGPJExecutor
from qcg.pilotjob.executor_api.templates.qcgpj_template import QCGPJTemplate


logger = logging.getLogger(__name__)


[docs]class EasyVVUQBasicTemplate(QCGPJTemplate): """A basic template class for submission of QCG-PilotJob tasks that run on a single core The class can be used only for the most simple use-cases. For example it doesn't allow to specify resource requirements. Thus, for more advanced use-cases, it is recommended to provide custom implementation of QCGPJTemplate. For complete reference of QCG-PilotJob task's description parameters please look at https://qcg-pilotjob.readthedocs.io/en/latest/fileinterface.html#submit """
[docs] @staticmethod def template() -> Tuple[str, Dict[str, Any]]: template = """ { 'name': '${name}', 'execution': { 'exec': '${exec}', 'args': ${args}, 'stdout': '${stdout}', 'stderr': '${stderr}', 'venv': '${venv}', 'model': '${model}', 'model_opts': ${model_opts} } } """ defaults = { 'args': [], 'stdout': 'stdout', 'stderr': 'stderr', 'venv': '', 'model': 'default', 'model_opts': {} } return template, defaults
[docs]class EasyVVUQParallelTemplate(QCGPJTemplate): """A template class for submission of QCG-PilotJob tasks that run on exact number cores / nodes With this class it is possible to define basic resource requirements for tasks. For advanced use-cases, it is recommended to provide custom implementation of QCGPJTemplate. For complete reference of QCG-PilotJob task's description parameters please look at https://qcg-pilotjob.readthedocs.io/en/latest/fileinterface.html#submit """
[docs] @staticmethod def template() -> Tuple[str, Dict[str, Any]]: template = """ { 'name': '${name}', 'execution': { 'exec': '${exec}', 'args': ${args}, 'stdout': '${stdout}', 'stderr': '${stderr}', 'venv': '${venv}', 'model': '${model}', 'model_opts': ${model_opts} }, 'resources': { 'numCores': { 'exact': ${numCores} }, 'numNodes': { 'exact': ${numNodes} } } } """ defaults = { 'args': [], 'stdout': 'stdout', 'stderr': 'stderr', 'venv': '', 'model': 'default', 'model_opts': {}, 'numCores': 1, 'numNodes': 1 } return template, defaults
[docs]class QCGPJPool(Executor): """A Pool that manages execution of actions with QCG-PilotJob. Parameters ---------- qcgpj_executor: str An instance of QCGPJExecutor. If not provided, an instance of QCGPJExecutor with default settings will be created template: QCGPJTemplate An object which contains only a single method `template` that returns a tuple. The first element of a tuple should be a string representing a QCG-PilotJob task's description with placeholders (identifiers preceded by $ symbol) and the second a dictionary that assigns default values for selected placeholders. If not provided, a default EasyVVUQBasicTemplate will be used template_params: dict A dictionary that contains parameters that will be used to substitute placeholders defined in the template polling_interval: int An interval between queries to the QCG-PilotJob Manager service about state of the tasks, in seconds. """ def __init__( self, qcgpj_executor=None, template=None, template_params=None, polling_interval=1): if qcgpj_executor is None: qcgpj_executor = QCGPJExecutor() if template is None: template = EasyVVUQBasicTemplate() self._qcgpj_executor = qcgpj_executor self._template = template self._template_params = template_params self._polling_interval = polling_interval self._campaign_dir = None
[docs] def submit(self, fn, *args, **kwargs): """Submits a callable to be executed by QCG-PilotJob. Schedules the callable to be executed inside a QCG-PilotJob's task and returns a Future representing the execution of the callable. Returns ------- QCGPJFuture representing the given call. """ actions = fn.__self__ actions.set_wrapper(QCGPJPool._wrapper) exec = 'python3' arg1 = "-m" arg2 = "easyvvuq.actions.execute_qcgpj_task" self._campaign_dir = args[0]['campaign_dir'] pickled_actions = base64.b64encode(dill.dumps(actions)).decode('ascii') pickled_previous = base64.b64encode(dill.dumps(args[0])).decode('ascii') actions_file = f'{self._campaign_dir}/.qcgpj_in_act_{args[0]["run_id"]}' previous_file = f'{self._campaign_dir}/.qcgpj_in_prev_{args[0]["run_id"]}' with open(actions_file, 'w') as f: f.write(pickled_actions) with open(previous_file, 'w') as f: f.write(pickled_previous) return self._qcgpj_executor.submit( self._template.template, self._template_params, exec=exec, name=args[0]['run_id'], stdout=f"{self._campaign_dir}/stdout_{args[0]['run_id']}", stderr=f"{self._campaign_dir}/stderr_{args[0]['run_id']}", args=[arg1, arg2, actions_file, previous_file])
@property def executor(self): """Returns current QCGPJExecutor instance. It gives you an access to QCG-PilotJob Manager instance, which in turn can be used to get information about the QCG-PilotJob execution environment. """ return self._qcgpj_executor
[docs] def convert_results(self, result_qcgpj): """Converts results generated by QCG-PilotJob task to EasyVVUQ-compatible form The method loads results data from a file where it was stored by QCG-PilotJob's task and then converts it to a dictionary which can be further processed by EasyVVUQ. Parameters ---------- result_qcgpj: list or None A list of results returned by a QCG-PilotJob task (only the first element will be used), or None if the task hasn't finished with the status SUCCEED Returns ------- A dictionary containing results """ for key, value in result_qcgpj.items(): if value != 'SUCCEED': logging.error(f"Task {key} finished with the status: {value}") raise RuntimeError(f"QCG-PilotJob task {key} finished with the status: {value}") result_file = f'{self._campaign_dir}/.qcgpj_result_{key}' with open(result_file, 'r') as f: previous = json.load(f) remove(result_file) return previous
[docs] def shutdown(self, **kwargs): """Clean-up the resources associated with the QCGPJPool. """ return self._qcgpj_executor.shutdown()
[docs] def as_completed(self, futures): """Checks for the status of features and yields those that are finished """ pending = set(futures) finished = set() for f in futures: if f.done(): finished.add(f) pending = pending - finished while finished: yield finished.pop() while pending: for f in pending: if f.done(): finished.add(f) pending = pending - finished while finished: yield finished.pop() time.sleep(self._polling_interval)
@staticmethod def _wrapper(action, previous): # TODO: Implement support for specialised execution models of QCG-PilotJob # """For the actions other than ExecuteQCGPJ ensures that the code is invoked only once # """ # if not isinstance(action, ExecuteQCGPJ): # rank = 0 # if 'OMPI_COMM_WORLD_RANK' in environ: # rank = environ.get('OMPI_COMM_WORLD_RANK') # elif 'PMI_RANK' in environ: # rank = environ.get('PMI_RANK') # # if rank != 0: # return return action.start(previous)
[docs]class ExecuteQCGPJ: """A utility decorator over action that marks the action as configured for parallel execution by QCG-PilotJob Currently it has no influence on the processing. Parameters ---------- action: Action an action that will be decorated in order to enable parallel execution inside a QCG-PilotJob task. """ def __init__(self, action): self._action = action
[docs] def start(self, previous=None): return self._action.start(previous)
[docs] def finished(self): return self._action.finished()
[docs] def finalise(self): return self._action.finalise()
[docs] def succeeded(self): return self._action.succeeded()