Source code for easyvvuq.campaign

"""EasyVVUQ Campaign

This module contains the Campaign class that is used to coordinate all
EasyVVUQ workflows.
"""
import os
import json
import logging
import tempfile
import easyvvuq
from concurrent.futures import ProcessPoolExecutor
from easyvvuq.constants import default_campaign_prefix, Status
from easyvvuq.data_structs import RunInfo, CampaignInfo, AppInfo
from easyvvuq.sampling import BaseSamplingElement
from easyvvuq.actions import ActionPool
import easyvvuq.db.sql as db

__copyright__ = """

    Copyright 2018 Robin A. Richardson, David W. Wright

    This file is part of EasyVVUQ

    EasyVVUQ is free software: you can redistribute it and/or modify
    it under the terms of the Lesser GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    EasyVVUQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    Lesser GNU General Public License for more details.

    You should have received a copy of the Lesser GNU General Public License
    along with this program.  If not, see <https://www.gnu.org/licenses/>.

"""
__license__ = "LGPL"


logger = logging.getLogger(__name__)


[docs]class Campaign: """Campaigns organise the dataflow in EasyVVUQ workflows. The Campaign functions as as state machine for the VVUQ workflows. It uses a database (CampaignDB) to store information on both the target application and the VVUQ algorithms being employed. It also collects data from the simulations and can be used to store and resume your state. Notes ----- Multiple campaigns can be combined in a CampaignDB. Hence the particular campaign we are currently working on will be specified using `campaign_id`. Parameters ---------- name: str Name of the Campaign. Freely chosen, serves as a human-readable way of distinguishing between several campaigns in the same database. params: dict, optional Description of the parameters to associated with the application. Will be used to create an app when creating the campaign. It is also possible to add apps manually using `add_app` method of the Campaign class. But this can be a useful shorthand when working with single app campaigns. To use this functionality both `params` and `actions` has to be specified. The name of this app will be the same as the name of the Campaign. actions: Actions, optional Actions object associated with an application. See description of the `params` parameter for more details. db_location: str, optional Location of the underlying campaign database - either a path or acceptable URI for SQLAlchemy. work_dir: str, optional, default='./' Path to working directory - used to store campaign directory. change_to_state: bool, optional, default=False Should we change to the directory containing any specified `state_file` in order to make relative paths work. verify_all_runs: bool, optional, default=True Check all new runs being added for unrecognised params (not defined for the currently set app), values lying within defined physical range, type checking etc. This should normally always be set to True, but in cases where the performance is too degraded, the checks can be disabled by setting to False. Attributes ---------- campaign_name : str or None Name for the campaign/workflow. _campaign_dir: str or None Path to the directory campaign uses for local storage (runs inputs etc) db_location : str or None Location of the underlying campaign database - either a path or acceptable URI for SQLAlchemy. _log: list The log of all elements that have been applied, with information about their application campaign_id : int ID number for the current campaign in the db.CampaignDB. campaign_db: easyvvuq.db.Basedb.CampaignDB A campaign database object last_analysis: The result of the most recent analysis carried out on this campaign _active_app: dict Info about currently set app _active_app_name: str Name of currently set app _active_sampler_id: int The database id of the currently set Sampler object Examples -------- A typical instantiation might look like this. >>> params = { "S0": {"type": "float", "default": 997}, "I0": {"type": "float", "default": 3}, "beta": {"type": "float", "default": 0.2}, "gamma": {"type": "float", "default": 0.04, "min": 0.0, "max": 1.0}, "iterations": {"type": "integer", "default": 100}, "outfile": {"type": "string", "default": "output.csv"} } >>> encoder = uq.encoders.GenericEncoder(template_fname='sir.template', delimiter='$', target_filename='input.json') >>> decoder = uq.decoders.SimpleCSV(target_filename='output.csv', output_columns=['I']) >>> actions = uq.actions.local_execute(encoder, os.path.abspath('sir') + ' input.json', decoder) >>> campaign = uq.Campaign(name='sir', params=params, actions=actions) A simplified one (without an app) might look simply like this. >>> campaign = Campaign('simple') An app then can be added. >>> campaign.add_app('simple_app', params=params, actions=actions) """ def __init__( self, name, params=None, actions=None, db_location=None, work_dir="./", change_to_state=False, verify_all_runs=True ): self.work_dir = os.path.realpath(os.path.expanduser(work_dir)) self.verify_all_runs = verify_all_runs self.campaign_name = name self._campaign_dir = None if db_location is None: self._campaign_dir = tempfile.mkdtemp(prefix=name, dir=self.work_dir) self.db_location = "sqlite:///" + self._campaign_dir + "/campaign.db" else: self.db_location = db_location self.campaign_id = None self.campaign_db = None self.last_analysis = None self._active_app = None self._active_app_name = None self._active_app_actions = None self._active_sampler = None self._active_sampler_id = None self.init_db(name, self.work_dir) self._state_dir = None # here we assume that the user wants to add an app if (params is not None) and (actions is not None): self.add_app(name=name, params=params, actions=actions) @property def campaign_dir(self): """Get the path in which to load/save files related to the campaign. Returns ------- str Path to the campaign directory - given as a subdirectory of the working directory. """ return os.path.join(self.work_dir, self._campaign_dir)
[docs] def init_db(self, name, work_dir='.'): """Initialize the connection with the database and either resume or create the campaign. Parameters ---------- name: str Name of the campaign. work_dir: str Work directory, defaults to cwd. """ self.campaign_db = db.CampaignDB(location=self.db_location) if self.campaign_db.campaign_exists(name): self.campaign_id = self.campaign_db.get_campaign_id(name) self._active_app_name = self.campaign_db.get_active_app()[0].name self.campaign_name = name self._campaign_dir = self.campaign_db.campaign_dir(name) if not os.path.exists(self._campaign_dir): message = (f"Campaign directory ({self.campaign_dir}) does not exist.") raise RuntimeError(message) self._active_sampler_id = self.campaign_db.get_sampler_id(self.campaign_id) self._active_sampler = self.campaign_db.resurrect_sampler(self._active_sampler_id) self.set_app(self._active_app_name) self.campaign_db.resume_campaign(name) else: if self._campaign_dir is None: self._campaign_dir = tempfile.mkdtemp(prefix=name, dir=work_dir) info = CampaignInfo( name=name, campaign_dir_prefix=default_campaign_prefix, easyvvuq_version=easyvvuq.__version__, campaign_dir=self._campaign_dir) self.campaign_db.create_campaign(info) self.campaign_name = name self.campaign_id = self.campaign_db.get_campaign_id(self.campaign_name)
[docs] def add_app(self, name=None, params=None, actions=None, set_active=True): """Add an application to the CampaignDB. Parameters ---------- name : str Name of the application. params : dict Description of the parameters to associate with the application. actions : Actions An instance of Actions containing actions to be executed set_active: bool Should the added app be set to be the currently active app? """ # Verify input parameters dict paramsspec = easyvvuq.ParamsSpecification(params, appname=name) # validate application input app = AppInfo( name=name, paramsspec=paramsspec, actions=actions, ) self.campaign_db.add_app(app) if set_active: self.set_app(app.name)
[docs] def set_app(self, app_name): """Set active app for the campaign. Application information is retrieved from `self.campaign_db`. Parameters ---------- app_name: str Name of selected app, if `None` given then first app will be selected. """ self._active_app_name = app_name self._active_app = self.campaign_db.app(name=app_name) self.campaign_db.set_active_app(app_name) # Resurrect the app encoder, decoder and collation elements self._active_app_actions = self.campaign_db.resurrect_app(app_name)
[docs] def replace_actions(self, app_name, actions): """Replace actions for an app with a given name. Parameters ---------- app_name: str Name of the app. actions: Actions `Actions` instance, will replace the current `Actions` of an app. """ self.campaign_db.replace_actions(app_name, actions) self._active_app_actions = actions
[docs] def set_sampler(self, sampler, update=False): """Set active sampler. Parameters ---------- sampler : Sampler Sampler that will be used to create runs for the current campaign. update : bool If set to True it will not add the sampler to the database, just change it as the active sampler. """ self._active_sampler = sampler if not update: self._active_sampler_id = self.campaign_db.add_sampler(sampler) sampler.sampler_id = self._active_sampler_id self._active_sampler_id = self._active_sampler.sampler_id self.campaign_db.set_sampler(self.campaign_id, self._active_sampler.sampler_id)
[docs] def add_external_runs(self, input_files, output_files, input_decoder, output_decoder): """Takes a list of files and adds them to the database. This method is to be used when adding runs to the EasyVVUQ database that were not executed using EasyVVUQ. Parameters ---------- output_files: list of str A list of output file paths to be loaded to the database. decoder: Decoder A decoder that will be used to parse these files. """ inputs = [] for input_file in input_files: input_decoder.target_filename = os.path.basename(input_file) params = input_decoder.parse_sim_output({'run_dir': os.path.dirname(input_file)}) inputs.append(params) outputs = [] for output_file in output_files: output_decoder.target_filename = os.path.basename(output_file) result = output_decoder.parse_sim_output({'run_dir': os.path.dirname(output_file)}) outputs.append(result) i = 0 for params, result in zip(inputs, outputs): i += 1 table = db.RunTable(run_name='run_{}'.format(i), app=self._active_app['id'], params=json.dumps(params), status=Status.COLLATED, run_dir=self.get_campaign_runs_dir(), result=json.dumps(result), campaign=self.campaign_id, sampler=self._active_sampler_id) self.campaign_db.session.add(table) self.campaign_db.session.commit()
[docs] def add_runs(self, runs, mark_invalid=False): """Add runs to the database. Parameters ---------- runs : list of dicts Each dict defines the value of each model parameter listed in self.params_info for a run to be added to self.runs mark_invalid : bool Will mark runs that fail verification as invalid (but will not raise an exception) """ if self._active_app is None: msg = ("No app is currently set for this campaign. " "Use set_app('name_of_app').") logging.error(msg) raise Exception(msg) app_default_params = self._active_app["params"] run_info_list = [] for new_run in runs: if new_run is None: msg = ("add_run() was passed new_run of type None. Bad sampler?") logging.error(msg) raise Exception(msg) # Verify and complete run with missing/default param values status = Status.NEW try: new_run = app_default_params.process_run(new_run, verify=self.verify_all_runs) except RuntimeError: if mark_invalid: new_run = app_default_params.process_run(new_run, verify=False) status = Status.INVALID else: raise # Add to run queue run_info = RunInfo(app=self._active_app['id'], params=new_run, sample=self._active_sampler_id, campaign=self.campaign_id, status=status) run_info_list.append(run_info) self.campaign_db.add_runs(run_info_list, iteration=self._active_sampler.iteration)
[docs] def draw_samples(self, num_samples=0, mark_invalid=False): """Draws `num_samples` sets of parameters from the currently set sampler, resulting in `num_samples` new runs added to the runs list. If `num_samples` is 0 (its default value) then this method draws ALL samples from the sampler, until exhaustion (this will fail if the sampler is not finite). Parameters ---------- num_samples : int Number of samples to draw from the active sampling element. By default is 0 (draw ALL samples) mark_invalid : bool If True will mark runs that go outside valid parameter range as INVALID. This is useful for MCMC style methods where you want those runs to evaluate to low probabilities. """ # Make sure `num_samples` is not 0 for an infinite generator # (this would add runs forever...) if not self._active_sampler.is_finite() and num_samples <= 0: msg = (f"Sampling_element '{self._active_sampler.element_name()}' " f"is an infinite generator, therefore a finite number of " f"draws (n > 0) must be specified.") raise RuntimeError(msg) num_added = 0 new_runs = [] for new_run in self._active_sampler: new_runs.append(new_run) num_added += 1 if num_samples != 0 and num_added >= num_samples: break self.add_runs(new_runs, mark_invalid) # Write sampler's new state to database self.campaign_db.update_sampler(self._active_sampler_id, self._active_sampler) return new_runs
[docs] def list_runs(self, sampler=None, campaign=None, app_id=None, status=None): """Get list of runs in the CampaignDB. Parameters ---------- sampler: int Sampler id to filter for. campaign: int Campaign id to filter for. app_id: int App id to filter for. status: Status Status to filter for. Returns ------- list of runs """ return list(self.campaign_db.runs( sampler=sampler, campaign=campaign, app_id=app_id, status=status))
[docs] def get_campaign_runs_dir(self): """Get the runs directory from the CampaignDB. Returns ------- str Path in which the runs information will be written. """ return self.campaign_db.runs_dir(self.campaign_name)
[docs] def relocate(self, campaign_dir): """Relocate the campaign by specifying a new path where campaign is located. Parameters ---------- new_path: str new runs directory """ if not os.path.exists(campaign_dir): raise RuntimeError("specified directory does not exist: {}".format(campaign_dir)) self.campaign_db.relocate(campaign_dir, self.campaign_name)
[docs] def execute(self, nsamples=0, pool=None, mark_invalid=False, sequential=False): """This will draw samples and execute the Actions on those samples. Parameters ---------- nsamples: int Number of samples to draw. For infinite samplers or when you want to process samples in batches. pool: Executor A pool object to be used when processing runs (e.g. instance of `ThreadPoolExecutor` or `ProcessPoolExecutor`). mark_invalid: bool Mark runs that go outside the specified input parameter range as INVALID. sequential: bool Whether to process samples sequentially (sometimes more efficient or you might want to avoid the concurrent module for some reason). """ self.draw_samples(nsamples, mark_invalid=mark_invalid) action_pool = self.apply_for_each_sample( self._active_app_actions, sequential=sequential) return action_pool.start(pool=pool)
[docs] def apply_for_each_sample(self, actions, status=Status.NEW, sequential=False): """For each run in this Campaign's run list, apply the specified action (an object of type Action). Parameters ---------- actions: Actions Actions to be applied to each relevant run in the database. status: Status Will apply the Actions only to those runs whose status is as specified. sequential: bool Whether to process samples sequentially (sometimes more efficient or you might want to avoid the concurrent module for some reason). Returns ------- ActionPool An object containing ActionStatus instances to track action execution. """ # Loop through all runs in this campaign with status ENCODED, and # run the specified action on each run's dir def inits(): for run_id, run_data in self.campaign_db.runs( status=status, app_id=self._active_app['id']): previous = {} previous['run_id'] = run_id previous['campaign_dir'] = self._campaign_dir previous['rundir'] = run_data['run_dir'] previous['run_info'] = run_data previous['result'] = {} previous['collated'] = False yield previous return ActionPool(self, actions, inits=inits(), sequential=sequential)
[docs] def iterate(self, nsamples=0, pool=None, mark_invalid=False, sequential=False): """This is the equivalent of `execute` for methods that rely on the output of the previous sampling stage (designed for MCMC, should work for others). Parameters ---------- nsamples : int Number of samples to draw (during a single iteration). pool : Executor An Executor instance. For example ThreadPoolExecutor or a Dask Client. Defaults to the ThreadPoolExecutor. mark_invalid : bool Mark runs that go outside the specified input parameter range as INVALID. sequential: bool Will execute the `Actions` associated with runs sequentially. Might be more efficient in some situations. Yields ------ ActionPool An object containing Futures instances to track action execution. """ while True: self.draw_samples(nsamples, mark_invalid=mark_invalid) action_pool = self.apply_for_each_sample( self._active_app_actions, sequential=sequential) yield action_pool.start(pool=pool) result = self.get_collation_result(last_iteration=True) invalid = self.get_invalid_runs(last_iteration=True) ignored_runs = self._active_sampler.update(result, invalid) for run_id in ignored_runs: self.campaign_db.session.query(db.RunTable).\ filter(db.RunTable.id == int(run_id)).\ update({'status': easyvvuq.constants.Status.IGNORED}) self.campaign_db.session.commit()
[docs] def recollate(self): """Clears the current collation table, changes all COLLATED status runs back to ENCODED, then runs collate() again """ collated_run_ids = list(self.campaign_db.run_ids(status=Status.COLLATED)) self.campaign_db.set_run_statuses(collated_run_ids, Status.ENCODED) self.collate()
[docs] def get_collation_result(self, last_iteration=False): """Return dataframe containing all collated results Parameters ---------- last_iteration : bool Will only return the result of the last iteration. Returns ------- DataFrame A DataFrame with the simulation results along with the inputs used to produce them. """ if last_iteration: iteration = self._active_sampler.iteration - 1 else: iteration = -1 return self.campaign_db.get_results( self._active_app['name'], self._active_sampler_id, status=easyvvuq.constants.Status.COLLATED, iteration=iteration)
[docs] def get_invalid_runs(self, last_iteration=False): """Return dataframe containing all results marked as INVALID. Parameters ---------- last_iteration : bool Will only return the result of the last iteration. Returns ------- DataFrame A DataFrame with the results form simulations that were marked as INVALID. These will usually be the ones that went outside the specified parameter ranges. These still have to be accounted for in some way by some methods (e.g. MCMC). """ if last_iteration: iteration = self._active_sampler.iteration - 1 else: iteration = -1 return self.campaign_db.get_results( self._active_app['name'], self._active_sampler_id, status=easyvvuq.constants.Status.INVALID, iteration=iteration)
[docs] def apply_analysis(self, analysis): """Run the `analysis` element on the output of the last run collation. Parameters ---------- analysis : Analysis Element that performs a VVUQ analysis on a dataframe summary of run outputs. """ # Apply analysis element to most recent collation result self.last_analysis = analysis.analyse(data_frame=self.get_collation_result())
[docs] def analyse(self, **kwargs): """If available will call an appropriate analysis class on the collation result. Parameters ---------- **kwargs : dict Argument to the analysis class constructor (after sampler). Returns ------- AnalysisResults An object representing analysis results. Can be used to interact with those results in some way. Plot, retrieve surrogate models and so on. See `easyvvuq.analysis.AnalysisResults` for further information. """ collation_result = self.get_collation_result() try: analysis = self._active_sampler.analysis_class(sampler=self._active_sampler, **kwargs) return analysis.analyse(collation_result) except NotImplementedError: raise RuntimeError("This sampler does not have a corresponding analysis class")
[docs] def get_last_analysis(self): """Return the output of the most recently run analysis element. """ if self.last_analysis is None: logging.warning("No last analysis output available.") return self.last_analysis
def __str__(self): """Returns formatted summary of the current Campaign state. Enables class to work with standard print() method """ return (f"db_location = {self.db_location}\n" f"active_sampler_id = {self._active_sampler_id}\n" f"campaign_name = {self.campaign_name}\n" f"campaign_dir = {self.campaign_dir}\n" f"campaign_id = {self.campaign_id}\n")
[docs] def get_active_sampler(self): """Return the active sampler element in use by this campaign. Returns ------- The sampler currently in use """ return self._active_sampler
[docs] def ignore_runs(self, list_of_run_IDs): """Flags the specified runs to be IGNORED in future collation. Note that this does NOT remove previously collated results from the collation table. For that you must refresh the collation by running recollate(). Parameters ---------- list The list of run IDs for the runs that should be set to status IGNORED """ self.campaign_db.set_run_statuses(list_of_run_IDs, Status.IGNORED)
[docs] def rerun(self, list_of_run_IDs): """Sets the status of the specified runs to ENCODED, so that their results may be recollated later (presumably after extending, rerunning or otherwise modifying the data in the relevant run folder). Note that this method will NOT perform any execution - it simply flags the run in EasyVVUQ as being uncollated. Actual execution is (as usual) the job of the user or middleware. Parameters ---------- list The list of run IDs for the runs that should be set to status ENCODED """ for run_ID in list_of_run_IDs: status = self.campaign_db.get_run_status(run_ID) if status == Status.NEW: msg = (f"Cannot rerun {run_ID} as it has status NEW, and must" f"be encoded before execution.") raise RuntimeError(msg) self.campaign_db.set_run_statuses(list_of_run_IDs, Status.ENCODED)
[docs] def get_active_app(self): """Returns a dict of information regarding the application that is currently set for this campaign. """ return self._active_app