Source code for easyvvuq.actions.action_statuses

"""Implements ActionPool - a thin wrapper around the Python Executor interface
that is meant to simplify the execution of actions and retrieval of results.
This object is instantiated by the Campaign. The user would never instantiate it
manually. The user does interact with it to track the progress of execution.
"""
import concurrent
from concurrent.futures import ThreadPoolExecutor
from dask.distributed import Client
from tqdm import tqdm
import copy

from . import QCGPJPool

__copyright__ = """

    Copyright 2020 Vytautas Jancauskas

    This file is part of EasyVVUQ

    EasyVVUQ is free software: you can redistribute it and/or modify
    it under the terms of the Lesser GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    EasyVVUQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    Lesser GNU General Public License for more details.

    You should have received a copy of the Lesser GNU General Public License
    along with this program.  If not, see <https://www.gnu.org/licenses/>.

"""
__license__ = "LGPL"


[docs]class ActionPool: """A class that handles the execution of Actions. Parameters ---------- campaign: Campaign An instance of an EasyVVUQ campaign. actions: Actions An instance of `Actions` containing things to be done as part of the simulation. inits: iterable Initial inputs to be passed to each `Actions` representing a sample. Will usually contain dictionaries with the following information: {'run_id': ..., 'campaign_dir': ..., 'run_info': ...}. sequential: bool Will run the actions sequentially. """ def __init__(self, campaign, actions, inits, sequential=False): self.campaign = campaign self.actions = actions self.inits = inits self.sequential = sequential self.futures = [] self.results = [] self._collate_callback = lambda previous: previous
[docs] def start(self, pool=None): """Start the actions. Parameters ---------- pool: An Executor instance (e.g. ThreadPoolExecutor) Returns ------- ActionPool Starts execution and returns a reference to itself for tracking progress and for collation. """ if pool is None: pool = ThreadPoolExecutor() self.pool = pool for previous in self.inits: previous = copy.copy(previous) if self.sequential: result = self.actions.start(previous) self.results.append(result) else: future = self.pool.submit(self.actions.start, previous) self.futures.append(future) return self
[docs] def progress(self): """Some basic stats about the action statuses status. Returns ------- dict A dictionary with four keys - 'ready', 'active' and 'finished', 'failed'. Values under "ready" correspond to `Actions` waiting for execution, "active" corresponds to the number of currently running tasks. """ ready = 0 running = 0 done = 0 failed = 0 for future in self.futures: if future.running(): running += 1 elif future.done(): if not future.result(): failed += 1 else: done += 1 else: ready += 1 return {'ready': ready, 'active': running, 'finished': done, 'failed': failed}
[docs] def add_collate_callback(self, fn): """Adds a callback to be called after collation is done. Parameters ---------- fn - A callable that takes previous as it's only input. """ self._collate_callback = fn
[docs] def collate(self, progress_bar=False): """A command that will block until all Futures in the pool have finished. It will also store the results gather from `Actions` in the database. Parameters ---------- progress_bar: bool Whether to show progress bar """ if not progress_bar: def tqdm_(x, total=None): return x else: tqdm_ = tqdm if isinstance(self.pool, Client): self.results = self.pool.gather(self.futures) if self.sequential or isinstance(self.pool, Client): for result in tqdm_(self.results, total=len(self.results)): result = self._collate_callback(result) self.campaign.campaign_db.store_result( result['run_id'], result, change_status=result['collated']) else: if isinstance(self.pool, QCGPJPool): as_completed_fn = self.pool.as_completed self.add_collate_callback(self.pool.convert_results) else: as_completed_fn = concurrent.futures.as_completed for future in tqdm_(as_completed_fn(self.futures), total=len(self.futures)): result = self._collate_callback(future.result()) self.campaign.campaign_db.store_result( result['run_id'], result, change_status=result['collated']) self.campaign.campaign_db.session.commit()