utopya package
Contents
utopya package#
The utopya package implements the frontend of Utopia
Subpackages#
- utopya.model_registry package
- utopya.plot_funcs package
- Subpackages
- Submodules
- utopya.plot_funcs._basic module
- utopya.plot_funcs._file_writer module
- utopya.plot_funcs._graph module
- utopya.plot_funcs._mpl_helpers module
- utopya.plot_funcs._utils module
- utopya.plot_funcs.attractor module
- utopya.plot_funcs.basic_mv module
- utopya.plot_funcs.basic_uni module
- utopya.plot_funcs.ca module
- utopya.plot_funcs.distribution module
- utopya.plot_funcs.time_series module
Submodules#
utopya._cluster module#
This module holds functions used in the Multiverse cluster mode
- utopya._cluster.parse_node_list(node_list_str: str, *, mode: str, rcps: dict) List[str] [source]#
Parses the node list to a list of node names and checks against the given resolved cluster parameters.
Depending on
mode
, different forms of the node list are parsable. Forcondensed
mode:node042 node[002,004-011,016] m05s[0204,0402,0504] m05s[0204,0402,0504],m08s[0504,0604,0701],m13s0603,m14s[0501-0502]
utopya._path_setup module#
Helper module that can manipulate system path, e.g. to make additional utopya-related modules available.
- class utopya._path_setup.temporary_sys_path(path: str)[source]#
Bases:
object
A sys.path context manager, temporarily adding a path and removing it again upon exiting. If the given path already exists in the sys.path, it is neither added nor removed and the sys.path remains unchanged.
- __dict__ = mappingproxy({'__module__': 'utopya._path_setup', '__doc__': 'A sys.path context manager, temporarily adding a path and removing it\n again upon exiting. If the given path already exists in the sys.path, it\n is neither added nor removed and the sys.path remains unchanged.\n ', '__init__': <function temporary_sys_path.__init__>, '__enter__': <function temporary_sys_path.__enter__>, '__exit__': <function temporary_sys_path.__exit__>, '__dict__': <attribute '__dict__' of 'temporary_sys_path' objects>, '__weakref__': <attribute '__weakref__' of 'temporary_sys_path' objects>, '__annotations__': {}})#
- __module__ = 'utopya._path_setup'#
- __weakref__#
list of weak references to the object (if defined)
- class utopya._path_setup.temporary_sys_modules[source]#
Bases:
object
A context manager for the sys.modules cache, ensuring that it is in the same state after exiting as it was before entering the context manager.
- __dict__ = mappingproxy({'__module__': 'utopya._path_setup', '__doc__': 'A context manager for the sys.modules cache, ensuring that it is in the\n same state after exiting as it was before entering the context manager.\n ', '__init__': <function temporary_sys_modules.__init__>, '__enter__': <function temporary_sys_modules.__enter__>, '__exit__': <function temporary_sys_modules.__exit__>, '__dict__': <attribute '__dict__' of 'temporary_sys_modules' objects>, '__weakref__': <attribute '__weakref__' of 'temporary_sys_modules' objects>, '__annotations__': {}})#
- __module__ = 'utopya._path_setup'#
- __weakref__#
list of weak references to the object (if defined)
utopya._yaml module#
Supplies basic YAML interface, inherited from dantro and paramspace
utopya.batch module#
Implements batch running and evaluation of simulations
- class utopya.batch.BatchTaskManager(*, batch_cfg_path: Optional[str] = None, **update_batch_cfg)[source]#
Bases:
object
A manager for batch tasks
Sets up a BatchTaskManager.
- Parameters
batch_cfg_path (str, optional) – The batch file with all the task definitions.
**update_batch_cfg – Additional arguments that are used to update the batch configuration.
- Raises
NotImplementedError – If
run_tasks
orcluster_mode
were set in the batch configuration.
- RUN_DIR_TIME_FSTR = '%y%m%d-%H%M%S'#
- __init__(*, batch_cfg_path: Optional[str] = None, **update_batch_cfg)[source]#
Sets up a BatchTaskManager.
- Parameters
batch_cfg_path (str, optional) – The batch file with all the task definitions.
**update_batch_cfg – Additional arguments that are used to update the batch configuration.
- Raises
NotImplementedError – If
run_tasks
orcluster_mode
were set in the batch configuration.
- property debug: bool#
Whether debug mode was enabled.
- property parallelization_level: str#
- property run_defaults: dict#
A deepcopy of the run task defaults
- property eval_defaults: dict#
A deepcopy of the eval task defaults
- property dirs: dict#
The directories associated with this BatchTaskManager
- __dict__ = mappingproxy({'__module__': 'utopya.batch', '__doc__': 'A manager for batch tasks', 'RUN_DIR_TIME_FSTR': '%y%m%d-%H%M%S', '__init__': <function BatchTaskManager.__init__>, '__str__': <function BatchTaskManager.__str__>, 'debug': <property object>, 'parallelization_level': <property object>, 'run_defaults': <property object>, 'eval_defaults': <property object>, 'dirs': <property object>, 'perform_tasks': <function BatchTaskManager.perform_tasks>, '_setup_batch_cfg': <staticmethod object>, '_setup_dirs': <function BatchTaskManager._setup_dirs>, '_perform_backup': <function BatchTaskManager._perform_backup>, '_add_tasks': <function BatchTaskManager._add_tasks>, '_add_run_task': <function BatchTaskManager._add_run_task>, '_add_eval_task': <function BatchTaskManager._add_eval_task>, '__dict__': <attribute '__dict__' of 'BatchTaskManager' objects>, '__weakref__': <attribute '__weakref__' of 'BatchTaskManager' objects>, '__annotations__': {}})#
- __module__ = 'utopya.batch'#
- __weakref__#
list of weak references to the object (if defined)
utopya.cfg module#
Module that coordinates the Utopia Configuration Directory
- utopya.cfg.get_cfg_path(cfg_name: str) str [source]#
Returns the absolute path to the specified configuration file
- utopya.cfg.load_from_cfg_dir(cfg_name: str) dict [source]#
Load a configuration file; returns empty dict if no file exists.
- Parameters
cfg_name (str) – The name of the configuration to read
- Returns
- The configuration as read from the config directory; if no file
is available, will return an empty dict.
- Return type
dict
- utopya.cfg.write_to_cfg_dir(cfg_name: str, obj: dict)[source]#
Writes a YAML represetation of the given object to the configuration directory. Always overwrites a possibly existing file.
- Parameters
cfg_name (str) – The configuration name
obj (dict) – The yaml-representable object that is to be written; usually a dict.
utopya.cltools module#
Methods needed to implement the utopia command line interface
- class utopya.cltools.ANSIesc[source]#
Bases:
object
Some selected ANSI escape codes; usable in format strings
- RESET = '\x1b[0m'#
- BOLD = '\x1b[1m'#
- DIM = '\x1b[2m'#
- UNDERLINE = '\x1b[4m'#
- BLACK = '\x1b[30m'#
- RED = '\x1b[31m'#
- GREEN = '\x1b[32m'#
- YELLOW = '\x1b[33m'#
- BLUE = '\x1b[34m'#
- MAGENTA = '\x1b[35m'#
- CYAN = '\x1b[36m'#
- WHITE = '\x1b[37m'#
- __dict__ = mappingproxy({'__module__': 'utopya.cltools', '__doc__': 'Some selected ANSI escape codes; usable in format strings', 'RESET': '\x1b[0m', 'BOLD': '\x1b[1m', 'DIM': '\x1b[2m', 'UNDERLINE': '\x1b[4m', 'BLACK': '\x1b[30m', 'RED': '\x1b[31m', 'GREEN': '\x1b[32m', 'YELLOW': '\x1b[33m', 'BLUE': '\x1b[34m', 'MAGENTA': '\x1b[35m', 'CYAN': '\x1b[36m', 'WHITE': '\x1b[37m', '__dict__': <attribute '__dict__' of 'ANSIesc' objects>, '__weakref__': <attribute '__weakref__' of 'ANSIesc' objects>, '__annotations__': {}})#
- __module__ = 'utopya.cltools'#
- __weakref__#
list of weak references to the object (if defined)
- utopya.cltools.add_from_kv_pairs(*pairs, add_to: dict, attempt_conversion: bool = True, allow_eval: bool = False, allow_deletion: bool = True) None [source]#
Parses the key=value pairs and adds them to the given dict.
Note that this happens directly on the object, i.e. making use of the mutability of the given dict. This function has no return value!
- Parameters
*pairs – Sequence of key=value strings
add_to (dict) – The dict to add the pairs to
attempt_conversion (bool, optional) – Whether to attempt converting the strings to bool, float, int types
allow_eval (bool, optional) – Whether to try calling eval() on the value strings during conversion
allow_deletion (bool, optional) – If set, can pass DELETE string to a key to remove the corresponding entry.
- utopya.cltools.register_models(args, *, registry)[source]#
Handles registration of multiple models given argparse args
- utopya.cltools.register_project(args: list, *, arg_prefix: str = '') dict [source]#
Register or update information of an Utopia project, i.e. a repository that implements models.
- Parameters
args (list) – The CLI arguments object
arg_prefix (str, optional) – The prefix to use when using attribute access to these arguments. Useful if the names as defined in the CLI are different depending on the invocation
- Returns
Information on the newly added or updated project
- Return type
dict
- utopya.cltools.deploy_user_cfg(user_cfg_path: str = '/root/.config/utopia/user_cfg.yml') None [source]#
Deploys a copy of the full config to the specified location (usually the user config search path of the Multiverse class)
Instead of just copying the full config, it is written line by line, commenting out lines that are not already commented out, and changing the header.
- Parameters
user_cfg_path (str, optional) – The path the file is expected at. Is an argument in order to make testing easier.
- Returns
None
- utopya.cltools.copy_model_files(*, model_name: str, new_name: Optional[str] = None, target_project: Optional[str] = None, add_to_cmakelists: bool = True, skip_exts: Optional[Sequence[str]] = None, use_prompts: bool = True, dry_run: bool = False) None [source]#
A helper function to conveniently copy model-related files, rename them, and adjust their content to the new name as well.
- Parameters
model_name (str) – The name of the model to copy
new_name (str, optional) – The new name of the model. This may not conflict with any already existing model name in the model registry.
target_project (str, optional) – The name of the project to copy the model to. It needs to be a registered Utopia project.
add_to_cmakelists (bool, optional) – Whether to add the new model to the corresponding CMakeLists.txt file.
use_prompts (bool, optional) – Whether to interactively prompt for confirmation or missing arguments.
dry_run (bool, optional) – If given, no write or copy operations will be carried out.
- Raises
ValueError – Upon bad arguments
- Returns
None
- utopya.cltools.prompt_for_new_plot_args(*, old_argv: List[str], old_args: argparse.Namespace, parser: argparse.ArgumentParser) Tuple[dict, argparse.Namespace] [source]#
Given some old arguments, prompts for new ones and returns a new list of argument values and the parsed argparse namespace result.
- Parameters
old_argv (List[str]) – The old argument value list
old_args (argparse.Namespace) – The old set of parsed arguments
parser (argparse.ArgumentParser) – The parser to use for evaluating the newly specified argument value list
- Returns
- The new argument values list and the
parsed argument namespace.
- Return type
Tuple[dict, argparse.Namespace]
- Raises
ValueError – Upon error in parsing the new arguments.
utopya.datacontainer module#
Implements data container classes specialised on Utopia output data.
It is based on the dantro.DataContainer classes, especially its numeric form, the NumpyDataContainer.
- class utopya.datacontainer.NumpyDC(*, name: str, data: numpy.ndarray, **dc_kwargs)[source]#
Bases:
dantro.mixins.proxy_support.Hdf5ProxySupportMixin
,dantro.containers.numeric.NumpyDataContainer
This is the base class for numpy data containers used in Utopia.
It is based on the NumpyDataContainer provided by dantro and extends it with the Hdf5ProxySupportMixin, allowing to load the data from the Hdf5 file only once it becomes necessary.
Initialize a NumpyDataContainer, storing data that is ndarray-like.
- Parameters
name (str) – The name of this container
data (np.ndarray) – The numpy data to store
**dc_kwargs – Additional arguments for container initialization, passed on to parent method
- __abstractmethods__ = frozenset({})#
- __module__ = 'utopya.datacontainer'#
- class utopya.datacontainer.XarrayDC(*, name: str, data: Union[numpy.ndarray, xarray.core.dataarray.DataArray], dims: Optional[Sequence[str]] = None, coords: Optional[dict] = None, extract_metadata: bool = True, apply_metadata: bool = True, **dc_kwargs)[source]#
Bases:
dantro.mixins.proxy_support.Hdf5ProxySupportMixin
,dantro.containers.xrdatactr.XrDataContainer
This is the base class for xarray data containers used in Utopia.
It is based on the XrDataContainer provided by dantro. As of now, it has no proxy support, but will gain it once available on dantro side.
Initialize a XrDataContainer and extract dimension and coordinate labels.
- Parameters
name (str) – which name to give to the XrDataContainer
data (Union[np.ndarray, xr.DataArray]) – The data to store; anything that an xr.DataArray can take
dims (Sequence[str], optional) – The dimension names.
coords (dict, optional) – The coordinates. The keys of this dict have to correspond to the dimension names.
extract_metadata (bool, optional) – If True, missing
dims
orcoords
arguments are tried to be populated from the container attributes.apply_metadata (bool, optional) – Whether to apply the extracted or passed
dims
andcoords
to the underlying data. This might not be desired in cases where the givendata
already is a labelledxr.DataArray
or where the data is a proxy and the labelling should be postponed.**dc_kwargs – passed to parent
- PROXY_RESOLVE_ASTYPE = None#
- PROXY_RETAIN = True#
- PROXY_REINSTATE_FAIL_ACTION = 'log_warning'#
- __abstractmethods__ = frozenset({})#
- __module__ = 'utopya.datacontainer'#
- class utopya.datacontainer.XarrayYamlDC(*, name: str, data: Union[numpy.ndarray, xarray.core.dataarray.DataArray], dims: Optional[Sequence[str]] = None, coords: Optional[dict] = None, extract_metadata: bool = True, apply_metadata: bool = True, **dc_kwargs)[source]#
Bases:
utopya.datacontainer.XarrayDC
An XarrayDC specialization that assumes that each array entry is a YAML string, which is subsequently loaded. This can be done alongside the metadata application of the XarrayDC.
Initialize a XrDataContainer and extract dimension and coordinate labels.
- Parameters
name (str) – which name to give to the XrDataContainer
data (Union[np.ndarray, xr.DataArray]) – The data to store; anything that an xr.DataArray can take
dims (Sequence[str], optional) – The dimension names.
coords (dict, optional) – The coordinates. The keys of this dict have to correspond to the dimension names.
extract_metadata (bool, optional) – If True, missing
dims
orcoords
arguments are tried to be populated from the container attributes.apply_metadata (bool, optional) – Whether to apply the extracted or passed
dims
andcoords
to the underlying data. This might not be desired in cases where the givendata
already is a labelledxr.DataArray
or where the data is a proxy and the labelling should be postponed.**dc_kwargs – passed to parent
- __abstractmethods__ = frozenset({})#
- __module__ = 'utopya.datacontainer'#
- class utopya.datacontainer.GridDC(*, name: str, data: Union[numpy.ndarray, xarray.core.dataarray.DataArray], **dc_kwargs)[source]#
Bases:
utopya.datacontainer.XarrayDC
This is the base class for all grid data used in Utopia.
It is based on the XarrayDC and reshapes the data to the grid shape. The last dimension is assumed to be the dimension that goes along the grid cell IDs.
Initialize a GridDC which represents grid-like data.
Given the container attribute for _GDC_grid_shape_attr, this container takes care to reshape the underlying data such that it represents that grid, even if it is saved in another shape.
- Parameters
name (str) – The name of the data container
data (np.ndarray) – The not yet reshaped data. If this is 1D, it is assumed that there is no time dimension. If it is 2D, it is assumed to be (time, cell ids).
**kwargs – Further initialization kwargs, e.g. attrs …
- __init__(*, name: str, data: Union[numpy.ndarray, xarray.core.dataarray.DataArray], **dc_kwargs)[source]#
Initialize a GridDC which represents grid-like data.
Given the container attribute for _GDC_grid_shape_attr, this container takes care to reshape the underlying data such that it represents that grid, even if it is saved in another shape.
- Parameters
name (str) – The name of the data container
data (np.ndarray) – The not yet reshaped data. If this is 1D, it is assumed that there is no time dimension. If it is 2D, it is assumed to be (time, cell ids).
**kwargs – Further initialization kwargs, e.g. attrs …
- property grid_shape: tuple#
The shape of the grid
- property space_extent: tuple#
The space’s extent this grid is representing, read from attrs
- property shape: tuple#
Returns shape, proxy-aware
This is an overload of the property in Hdf5ProxySupportMixin which takes care that not the actual underlying proxy data shape is returned but whatever the container’s shape is to be after reshaping.
- property ndim: int#
Returns ndim, proxy-aware
This is an overload of the property in Hdf5ProxySupportMixin which takes care that not the actual underlying proxy data ndim is returned but whatever the container’s ndim is to be after reshaping.
- __abstractmethods__ = frozenset({})#
- __module__ = 'utopya.datacontainer'#
utopya.datagroup module#
Implements data group classes specific to the Utopia output data structure.
They are based on dantro BaseDataGroup-derived implementations. In this module, they are imported and configured to suit the needs of the Utopia data structes.
- class utopya.datagroup.UniverseGroup(*, name: str, containers: Optional[list] = None, attrs=None)[source]#
Bases:
dantro.groups.pspgrp.ParamSpaceStateGroup
This group represents the data of a single universe
Initialize a BaseDataGroup, which can store other containers and attributes.
- Parameters
name (str) – The name of this data container
containers (list, optional) – The containers that are to be stored as members of this group. If given, these are added one by one using the .add method.
attrs (None, optional) – A mapping that is stored as attributes
- __abstractmethods__ = frozenset({})#
- __module__ = 'utopya.datagroup'#
- class utopya.datagroup.MultiverseGroup(*, name: str, pspace: Optional[paramspace.paramspace.ParamSpace] = None, containers: Optional[list] = None, **kwargs)[source]#
Bases:
dantro.groups.pspgrp.ParamSpaceGroup
This group is meant to manage the uni group of the loaded data, i.e. the group where output of all universe groups is stored in.
Its main aim is to provide easy access to universes. By default, universes are only identified by their ID, which is a zero-padded _string_. This group adds the ability to access them via integer indices.
Furthermore, via dantro, an easy data selector method is available, see dantro.groups.ParamSpaceGroup.select.
Initialize a OrderedDataGroup from the list of given containers.
- Parameters
name (str) – The name of this group.
pspace (ParamSpace, optional) – Can already pass a ParamSpace object here.
containers (list, optional) – A list of containers to add, which need to be ParamSpaceStateGroup objects.
**kwargs – Further initialisation kwargs, e.g. attrs …
- __abstractmethods__ = frozenset({})#
- __module__ = 'utopya.datagroup'#
- class utopya.datagroup.TimeSeriesGroup(*args, dims: Optional[Tuple[str]] = None, mode: Optional[str] = None, allow_deep_selection: Optional[bool] = None, **kwargs)[source]#
Bases:
dantro.groups.time_series.TimeSeriesGroup
This group is meant to manage time series data, with the container names being interpreted as the time coordinate.
Initialize a LabelledDataGroup
- Parameters
*args – Passed on to
OrderedDataGroup
dims (TDims, optional) – The dimensions associated with this group. If not given, will use those defined in the
LDG_DIMS
class variable. These can not be changed afterwards!mode (str, optional) – By which coordinate extraction mode to get the coordinates from the group members. Can be
attrs
,name
,data
or anything else specified inextract_coords()
.allow_deep_selection (bool, optional) – Whether to allow deep selection. If not given, will use the
LDG_ALLOW_DEEP_SELECTION
class variable’s value. Behaviour can be changed via the property of the same name.**kwargs – Passed on to
OrderedDataGroup
- __abstractmethods__ = frozenset({})#
- __module__ = 'utopya.datagroup'#
- class utopya.datagroup.HeterogeneousTimeSeriesGroup(*args, dims: Optional[Tuple[str]] = None, mode: Optional[str] = None, allow_deep_selection: Optional[bool] = None, **kwargs)[source]#
Bases:
dantro.groups.time_series.HeterogeneousTimeSeriesGroup
This group is meant to manage time series data, with the container names being interpreted as the time coordinate.
Initialize a LabelledDataGroup
- Parameters
*args – Passed on to
OrderedDataGroup
dims (TDims, optional) – The dimensions associated with this group. If not given, will use those defined in the
LDG_DIMS
class variable. These can not be changed afterwards!mode (str, optional) – By which coordinate extraction mode to get the coordinates from the group members. Can be
attrs
,name
,data
or anything else specified inextract_coords()
.allow_deep_selection (bool, optional) – Whether to allow deep selection. If not given, will use the
LDG_ALLOW_DEEP_SELECTION
class variable’s value. Behaviour can be changed via the property of the same name.**kwargs – Passed on to
OrderedDataGroup
- __abstractmethods__ = frozenset({})#
- __module__ = 'utopya.datagroup'#
- class utopya.datagroup.GraphGroup(*args, **kwargs)[source]#
Bases:
dantro.groups.graph.GraphGroup
This group is meant to manage graph data and create a NetworkX graph from it.
Initialize a GraphGroup.
- Parameters
*args – passed to
dantro.base.BaseDataGroup.__init__()
**kwargs – passed to
dantro.base.BaseDataGroup.__init__()
- __abstractmethods__ = frozenset({})#
- __module__ = 'utopya.datagroup'#
utopya.datamanager module#
Implements a class that manages data written out by Utopia models.
It is based on the dantro.DataManager class and the containers specialised for Utopia data.
- class utopya.datamanager.DataManager(data_dir: str, *, name: Optional[str] = None, load_cfg: Optional[Union[dict, str]] = None, out_dir: Union[str, bool] = '_output/{timestamp:}', out_dir_kwargs: Optional[dict] = None, create_groups: Optional[List[Union[str, dict]]] = None, condensed_tree_params: Optional[dict] = None, default_tree_cache_path: Optional[str] = None)[source]#
Bases:
dantro.data_loaders.AllAvailableLoadersMixin
,dantro.data_mngr.DataManager
This class manages the data that is written out by Utopia simulations.
It is based on the dantro.DataManager class and adds the functionality for specific loader functions that are needed in Utopia: Hdf5 and Yaml.
Furthermore, to enable file caching via the DAG framework, all available data loaders are included here.
Initializes a DataManager for the specified data directory.
- Parameters
data_dir (str) – the directory the data can be found in. If this is a relative path, it is considered relative to the current working directory.
name (str, optional) – which name to give to the DataManager. If no name is given, the data directories basename will be used
load_cfg (Union[dict, str], optional) – The base configuration used for loading data. If a string is given, assumes it to be the path to a YAML file and loads it using the
load_yml()
function. If None is given, it can still be supplied to theload()
method later on.out_dir (Union[str, bool], optional) – where output is written to. If this is given as a relative path, it is considered relative to the
data_dir
. A formatting operation with the keystimestamp
andname
is performed on this, where the latter is the name of the data manager. If set to False, no output directory is created.out_dir_kwargs (dict, optional) – Additional arguments that affect how the output directory is created.
create_groups (List[Union[str, dict]], optional) – If given, these groups will be created after initialization. If the list entries are strings, the default group class will be used; if they are dicts, the name key specifies the name of the group and the Cls key specifies the type. If a string is given instead of a type, the lookup happens from the
_DATA_GROUP_CLASSES
variable.condensed_tree_params (dict, optional) – If given, will set the parameters used for the condensed tree representation. Available options:
max_level
andcondense_thresh
, where the latter may be a callable. Seedantro.base.BaseDataGroup._tree_repr()
for more information.default_tree_cache_path (str, optional) – The path to the default tree cache file. If not given, uses the value from the class variable
_DEFAULT_TREE_CACHE_PATH
. Whichever value was chosen is then prepared using the_parse_file_path()
method, which regards relative paths as being relative to the associated data directory.
- __abstractmethods__ = frozenset({})#
- __module__ = 'utopya.datamanager'#
utopya.dataprocessing module#
This module holds data processing functionality that is used in the plot functions.
- utopya.dataprocessing.create_mask(data: xarray.core.dataarray.DataArray, *, operator_name: str, rhs_value: float) xarray.core.dataarray.DataArray [source]#
Given the data, returns a binary mask by applying the following comparison:
data <operator> rhs value
.- Parameters
data (xr.DataArray) – The data to apply the comparison to. This is the lhs of the comparison.
operator_name (str) – The name of the binary operator function as registered in utopya.tools.OPERATOR_MAP
rhs_value (float) – The right-hand-side value
- Raises
KeyError – On invalid operator name
- No Longer Returned:
xr.DataArray: Boolean mask
- utopya.dataprocessing.where(data: xarray.core.dataarray.DataArray, *, operator_name: str, rhs_value: float) xarray.core.dataarray.DataArray [source]#
Filter elements from the given data according to a condition. Only those elemens where the condition is fulfilled are not masked.
NOTE This leads to a dtype change to float.
- utopya.dataprocessing.show_data(data, *_)[source]#
Show the data and (if given) additional properties
- utopya.dataprocessing.transform(data: xarray.core.dataarray.DataArray, *operations: Union[dict, str], aux_data: Optional[Union[xarray.core.dataset.Dataset, dantro.base.BaseDataGroup]] = None, log_level: Optional[int] = None) xarray.core.dataarray.DataArray [source]#
Applies transformations to the given data, e.g.: reducing dimensionality or calculating
- Parameters
data (xr.DataArray) – The data that is to be reduced in dimensionality
*operations (Union[dict, str]) – Which operations to apply and with which parameters. These should be operation names or dicts. For dicts, they should only have a single key, which is the name of the operation to perform. The available operations are defined in the TRANSFORMATIONS dict.
aux_data (Union[xr.Dataset, dantro.BaseDataGroup], optional) – The auxiliary data for binary operations. NOTE Needed in those cases.
log_level (int, optional) – Which level to log the progress of the operations on. If not given, will be 10.
- Returns
A new object with dimensionality-reduced data.
- Return type
xr.DataArray
- Raises
TypeError – On bad
operations
specificationValueError – On bad operation name
- utopya.dataprocessing.find_endpoint(data: xarray.core.dataarray.DataArray, *, time: int = - 1, **kwargs) Tuple[bool, xarray.core.dataarray.DataArray] [source]#
Find the endpoint of a dataset wrt.
time
coordinate.This function is compatible with the
utopya.plot_funcs.attractor.bifurcation_diagram()
.- Parameters
data (xr.DataArray) – The data
time (int, optional) – The time index to select
**kwargs – Passed on to
data.isel
call
- Returns
(endpoint found, endpoint)
- Return type
Tuple[bool, xr.DataArray]
- utopya.dataprocessing.find_fixpoint(data: xarray.core.dataset.Dataset, *, spin_up_time: int = 0, abs_std: Optional[float] = None, rel_std: Optional[float] = None, mean_kwargs=None, std_kwargs=None, isclose_kwargs=None, squeeze: bool = True) Tuple[bool, float] [source]#
Find the fixpoint(s) of a dataset and confirm it by standard deviation. For dimensions that are not ‘time’ the fixpoints are compared and duplicates removed.
This function is compatible with the
utopya.plot_funcs.attractor.bifurcation_diagram()
.- Parameters
data (xr.Dataset) – The data
spin_up_time (int, optional) – The first timestep included
abs_std (float, optional) – The maximal allowed absolute standard deviation
rel_std (float, optional) – The maximal allowed relative standard deviation
mean_kwargs (dict, optional) – Additional keyword arguments passed on to the appropriate array function for calculating mean on data.
std_kwargs (dict, optional) – Additional keyword arguments passed on to the appropriate array function for calculating std on data.
isclose_kwargs (dict, optional) – Additional keyword arguments passed on to the appropriate array function for calculating np.isclose for fixpoint-duplicates across dimensions other than ‘time’.
squeeze (bool, optional) – Use the data.squeeze method to remove dimensions of length one. Default is True.
- Returns
(fixpoint found, mean)
- Return type
tuple
- utopya.dataprocessing.find_multistability(*args, **kwargs) Tuple[bool, float] [source]#
Find the multistabilities of a dataset.
Performs find_fixpoint. Method conclusive if find_fixpoint conclusive with multiple entries.
- Parameters
*args – passed on to
find_fixpoint()
**kwargs – passed on to
find_fixpoint()
- Returns
Tuple[bool, float]: (multistability found, mean)
- utopya.dataprocessing.find_oscillation(data: xarray.core.dataset.Dataset, *, spin_up_time: int = 0, squeeze: bool = True, **find_peak_kwargs) Tuple[bool, list] [source]#
Find oscillations in a dataset.
This function is compatible with the
utopya.plot_funcs.attractor.bifurcation_diagram()
.- Parameters
data (xr.Dataset) – The data
spin_up_time (int, optional) – The first timestep included
squeeze (bool, optional) – Use the data.squeeze method to remove dimensions of length one. Default is True.
**find_peak_kwargs – Passed on to
scipy.signal.find_peaks
. Default for kwargheight
is set to-1.e+15
.
- Returns
(oscillation found, [min, max])
- Return type
Tuple[bool, list]
utopya.model module#
Provides the Model class to work interactively with Utopia models
- class utopya.model.Model(*, name: Optional[str] = None, info_bundle: Optional[utopya.model_registry.info_bundle.ModelInfoBundle] = None, base_dir: Optional[str] = None, sim_errors: Optional[str] = None, use_tmpdir: bool = False)[source]#
Bases:
object
A class to work with Utopia models interactively.
It attaches to a certain model and makes it easy to load config files, create a Multiverse from them, run it, and work with it further…
Initialize the ModelTest for the given model name
- Parameters
name (str, optional) – Name of the model to attach to. If not given, need to pass info_bundle.
info_bundle (ModelInfoBundle, optional) – The required information to work with this model. If not given, will attempt to find the model in the model registry via
name
.base_dir (str, optional) – For convenience, can specify this path which will be seen as the base path for config files; if set, arguments that allow specifying configuration files can specify them relative to this directory.
sim_errors (str, optional) – Whether to raise errors from Multiverse
use_tmpdir (bool, optional) – Whether to use a temporary directory to write data to. The default value can be set here; but the flag can be overwritten in the create_mv and create_run_load methods. For
false
, the regular model output directory is used.
- Raises
ValueError – Upon bad base_dir
- __init__(*, name: Optional[str] = None, info_bundle: Optional[utopya.model_registry.info_bundle.ModelInfoBundle] = None, base_dir: Optional[str] = None, sim_errors: Optional[str] = None, use_tmpdir: bool = False)[source]#
Initialize the ModelTest for the given model name
- Parameters
name (str, optional) – Name of the model to attach to. If not given, need to pass info_bundle.
info_bundle (ModelInfoBundle, optional) – The required information to work with this model. If not given, will attempt to find the model in the model registry via
name
.base_dir (str, optional) – For convenience, can specify this path which will be seen as the base path for config files; if set, arguments that allow specifying configuration files can specify them relative to this directory.
sim_errors (str, optional) – Whether to raise errors from Multiverse
use_tmpdir (bool, optional) – Whether to use a temporary directory to write data to. The default value can be set here; but the flag can be overwritten in the create_mv and create_run_load methods. For
false
, the regular model output directory is used.
- Raises
ValueError – Upon bad base_dir
- property info_bundle: utopya.model_registry.info_bundle.ModelInfoBundle#
The model info bundle
- property name: str#
The name of this Model object, which is at the same time the name of the attached model.
- property base_dir: str#
Returns the path to the base directory, if set during init.
This is the path to a directory from which config files can be loaded using relative paths.
- property default_model_cfg: dict#
Returns the default model configuration by loading it from the file specified in the info bundle.
- property default_config_set_search_dirs: List[str]#
Returns the default config set search directories for this model in the order of precedence.
Note
These may be relative paths.
- property default_config_sets: Dict[str, dict]#
Config sets at the default search locations.
To retrieve an individual config set, consider using
get_config_set()
instead of this property.For more information, see Configuration sets.
- create_mv(*, from_cfg: Optional[str] = None, from_cfg_set: Optional[str] = None, run_cfg_path: Optional[str] = None, use_tmpdir: Optional[bool] = None, **update_meta_cfg) utopya.multiverse.Multiverse [source]#
Creates a
utopya.multiverse.Multiverse
for this model, optionally loading a configuration from a file and updating it with further keys.- Parameters
from_cfg (str, optional) – The name of the config file (relative to the base directory) to be used.
from_cfg_set (str, optional) – Name of the config set to retrieve the run config from. Mutually exclusive with
from_cfg
andrun_cfg_path
.run_cfg_path (str, optional) – The path of the run config to use. Can not be passed if
from_cfg
orfrom_cfg_set
arguments were given.use_tmpdir (bool, optional) – Whether to use a temporary directory to write the data to. If not given, uses default value set at initialization.
**update_meta_cfg – Can be used to update the meta configuration
- Returns
The created Multiverse object
- Return type
- Raises
ValueError – If more than one of the run config selecting arguments (
from_cfg
,from_cfg_set
,run_cfg_path
) were given.
- create_frozen_mv(**fmv_kwargs) utopya.multiverse.FrozenMultiverse [source]#
Create a
utopya.multiverse.FrozenMultiverse
, coupling it to a run directory.Use this method if you want to load an existing simulation run.
- Parameters
**fmv_kwargs – Passed on to FrozenMultiverse.__init__
- create_run_load(*, from_cfg: Optional[str] = None, run_cfg_path: Optional[str] = None, from_cfg_set: Optional[str] = None, use_tmpdir: Optional[bool] = None, print_tree: bool = True, **update_meta_cfg) Tuple[utopya.multiverse.Multiverse, utopya.datamanager.DataManager] [source]#
Chains the create_mv, mv.run, and mv.dm.load_from_cfg methods together and returns a (Multiverse, DataManager) tuple.
- Parameters
from_cfg (str, optional) – The name of the config file (relative to the base directory) to be used.
from_cfg_set (str, optional) – Name of the config set to retrieve the run config from. Mutually exclusive with
from_cfg
andrun_cfg_path
.run_cfg_path (str, optional) – The path of the run config to use. Can not be passed if
from_cfg
orfrom_cfg_set
arguments were given.use_tmpdir (bool, optional) – Whether to use a temporary directory to write the data to. If not given, uses default value set at initialization.
print_tree (bool, optional) – Whether to print the loaded data tree
**update_meta_cfg – Arguments passed to the create_mv function
- Return type
Tuple[Multiverse, DataManager]
- get_config_set(name: Optional[str] = None) Dict[str, str] [source]#
Returns a configuration set: a dict containing paths to run and/or eval configuration files. These are accessible via the keys
run
andeval
.Config sets are retrieved from multiple locations:
The
cfgs
directory in the model’s source directoryThe user-specified lookup directories, specified in the utopya configuration as
config_set_search_dirs
If
name
is an absolute or relative path, and a directory exists at the specified location, the parent directory is interpreted as a search path.
This uses
get_config_sets()
to retrieve all available configuration sets from the above paths and then selects the one with the givenname
. Config sets that are found later overwrite those with the same name found in previous searches and log a warning message (which can be controlled with thewarn
argument); sets are not merged.For more information, see Configuration sets.
- Parameters
name (str, optional) – The name of the config set to retrieve. This may also be a local path, which is looked up prior to the default search directories.
- get_config_sets(*, search_dirs: Optional[List[str]] = None, warn: bool = True, cfg_sets: Optional[dict] = None) Dict[str, dict] [source]#
Searches for all available configuration sets in the given search directories, aggregating them into one dict.
The search is done in reverse order of the paths given in
search_dirs
, i.e. starting from those directories with the lowest precedence. If configuration sets with the same name are encountered, warnings are emitted, but the one with higher precedence (appearing more towards the front ofsearch_dirs
, i.e. the later-searched one) will take precedence.Note
This will not merge configuration sets from different search directories, e.g. if one contained only an eval configuration and the other contained only a run configuration, a warning will be emitted but the entry from the later-searched directory will be used.
- Parameters
search_dirs (List[str], optional) – The directories to search sequentially for config sets. If not given, will use the default config set search directories, see
default_config_set_search_dirs
.warn (bool, optional) – Whether to warn (via log message), if the search yields a config set with a name that already existed.
cfg_sets (dict, optional) – If given, aggregate newly found config sets into this dict. Otherwise, start with an empty one.
- __dict__ = mappingproxy({'__module__': 'utopya.model', '__doc__': 'A class to work with Utopia models interactively.\n\n It attaches to a certain model and makes it easy to load config files,\n create a Multiverse from them, run it, and work with it further...\n ', '__init__': <function Model.__init__>, '__str__': <function Model.__str__>, 'info_bundle': <property object>, 'name': <property object>, 'base_dir': <property object>, 'default_model_cfg': <property object>, 'default_config_set_search_dirs': <property object>, 'default_config_sets': <property object>, 'create_mv': <function Model.create_mv>, 'create_frozen_mv': <function Model.create_frozen_mv>, 'create_run_load': <function Model.create_run_load>, 'get_config_set': <function Model.get_config_set>, 'get_config_sets': <function Model.get_config_sets>, '_store_mv': <function Model._store_mv>, '_create_tmpdir': <function Model._create_tmpdir>, '_find_config_sets': <function Model._find_config_sets>, '__dict__': <attribute '__dict__' of 'Model' objects>, '__weakref__': <attribute '__weakref__' of 'Model' objects>, '__annotations__': {}})#
- __module__ = 'utopya.model'#
- __weakref__#
list of weak references to the object (if defined)
utopya.multiverse module#
Implementation of the Multiverse class.
The Multiverse supplies the main user interface of the frontend.
- class utopya.multiverse.Multiverse(*, model_name: Optional[str] = None, info_bundle: Optional[utopya.model_registry.info_bundle.ModelInfoBundle] = None, run_cfg_path: Optional[str] = None, user_cfg_path: Optional[str] = None, _shared_worker_manager: Optional[utopya.workermanager.WorkerManager] = None, **update_meta_cfg)[source]#
Bases:
object
The Multiverse is where a single simulation run is orchestrated from.
It spawns multiple universes, each of which represents a single simulation of the selected model with the parameters specified by the meta configuration.
The WorkerManager takes care to perform these simulations in parallel, the DataManager allows loading the created data, and the PlotManager handles plotting of that data.
Initialize the Multiverse.
- Parameters
model_name (str, optional) – The name of the model to run
info_bundle (ModelInfoBundle, optional) – The model information bundle that includes information about the binary path etc. If not given, will attempt to read it from the model registry.
run_cfg_path (str, optional) – The path to the run configuration.
user_cfg_path (str, optional) – If given, this is used to update the base configuration. If None, will look for it in the default path, see Multiverse.USER_CFG_SEARCH_PATH.
_shared_worker_manager (WorkerManager, optional) –
If given, this already existing WorkerManager instance (and its reporter) will be used instead of initializing new instances.
Warning
This argument is only exposed for internal purposes. It should not be used for production code and behavior of this argument may change at any time.
**update_meta_cfg – Can be used to update the meta configuration generated from the previous configuration levels
- BASE_META_CFG_PATH = '/builds/utopia-project/docs/utopia/python/utopya/utopya/cfg/base_cfg.yml'#
- USER_CFG_SEARCH_PATH = '/root/.config/utopia/user_cfg.yml'#
- RUN_DIR_TIME_FSTR = '%y%m%d-%H%M%S'#
- UTOPYA_BASE_PLOTS_PATH = '/builds/utopia-project/docs/utopia/python/utopya/utopya/plot_funcs/base_plots.yml'#
- __init__(*, model_name: Optional[str] = None, info_bundle: Optional[utopya.model_registry.info_bundle.ModelInfoBundle] = None, run_cfg_path: Optional[str] = None, user_cfg_path: Optional[str] = None, _shared_worker_manager: Optional[utopya.workermanager.WorkerManager] = None, **update_meta_cfg)[source]#
Initialize the Multiverse.
- Parameters
model_name (str, optional) – The name of the model to run
info_bundle (ModelInfoBundle, optional) – The model information bundle that includes information about the binary path etc. If not given, will attempt to read it from the model registry.
run_cfg_path (str, optional) – The path to the run configuration.
user_cfg_path (str, optional) – If given, this is used to update the base configuration. If None, will look for it in the default path, see Multiverse.USER_CFG_SEARCH_PATH.
_shared_worker_manager (WorkerManager, optional) –
If given, this already existing WorkerManager instance (and its reporter) will be used instead of initializing new instances.
Warning
This argument is only exposed for internal purposes. It should not be used for production code and behavior of this argument may change at any time.
**update_meta_cfg – Can be used to update the meta configuration generated from the previous configuration levels
- property info_bundle: utopya.model_registry.info_bundle.ModelInfoBundle#
The model info bundle for this Multiverse
- property model_name: str#
The model name associated with this Multiverse
- property model_binpath: str#
The path to this model’s binary
- property meta_cfg: dict#
The meta configuration.
- property dirs: dict#
Information on managed directories.
- property cluster_mode: bool#
Whether the Multiverse should run in cluster mode
- property cluster_params: dict#
Returns a copy of the cluster mode configuration parameters
- property resolved_cluster_params: dict#
Returns a copy of the cluster configuration with all parameters resolved. This makes some additional keys available on the top level.
- property dm: utopya.datamanager.DataManager#
The Multiverse’s DataManager.
- property wm: utopya.workermanager.WorkerManager#
The Multiverse’s WorkerManager.
- property pm: utopya.plotting.PlotManager#
The Multiverse’s PlotManager.
- run(*, sweep: Optional[bool] = None)[source]#
Starts a Utopia simulation run.
Specifically, this method adds simulation tasks to the associated WorkerManager, locks its task list, and then invokes the
start_working()
method which performs all the simulation tasks.If cluster mode is enabled, this will split up the parameter space into (ideally) equally sized parts and only run one of these parts, depending on the cluster node this Multiverse is being invoked on.
Note
As this method locks the task list of the
WorkerManager
, no further tasks can be added henceforth. This means, that each Multiverse instance can only perform a single simulation run.- Parameters
sweep (bool, optional) – Whether to perform a sweep or not. If None, the value will be read from the
perform_sweep
key of the meta-configuration.
- run_single()[source]#
Runs a single simulation using the parameter space’s default value.
See
run()
for more information.
- renew_plot_manager(**update_kwargs)[source]#
Tries to set up a new PlotManager. If this succeeds, the old one is discarded and the new one is associated with this Multiverse.
- Parameters
**update_kwargs – Passed on to PlotManager.__init__
- __dict__ = mappingproxy({'__module__': 'utopya.multiverse', '__doc__': 'The Multiverse is where a single simulation run is orchestrated from.\n\n It spawns multiple universes, each of which represents a single simulation\n of the selected model with the parameters specified by the meta\n configuration.\n\n The WorkerManager takes care to perform these simulations in parallel, the\n DataManager allows loading the created data, and the PlotManager handles\n plotting of that data.\n ', 'BASE_META_CFG_PATH': '/builds/utopia-project/docs/utopia/python/utopya/utopya/cfg/base_cfg.yml', 'USER_CFG_SEARCH_PATH': '/root/.config/utopia/user_cfg.yml', 'RUN_DIR_TIME_FSTR': '%y%m%d-%H%M%S', 'UTOPYA_BASE_PLOTS_PATH': '/builds/utopia-project/docs/utopia/python/utopya/utopya/plot_funcs/base_plots.yml', '__init__': <function Multiverse.__init__>, 'info_bundle': <property object>, 'model_name': <property object>, 'model_binpath': <property object>, 'meta_cfg': <property object>, 'dirs': <property object>, 'cluster_mode': <property object>, 'cluster_params': <property object>, 'resolved_cluster_params': <property object>, 'dm': <property object>, 'wm': <property object>, 'pm': <property object>, 'run': <function Multiverse.run>, 'run_single': <function Multiverse.run_single>, 'run_sweep': <function Multiverse.run_sweep>, 'renew_plot_manager': <function Multiverse.renew_plot_manager>, '_create_meta_cfg': <function Multiverse._create_meta_cfg>, '_create_run_dir': <function Multiverse._create_run_dir>, '_setup_pm': <function Multiverse._setup_pm>, '_perform_backup': <function Multiverse._perform_backup>, '_perform_pspace_backup': <function Multiverse._perform_pspace_backup>, '_prepare_executable': <function Multiverse._prepare_executable>, '_resolve_cluster_params': <function Multiverse._resolve_cluster_params>, '_add_sim_task': <function Multiverse._add_sim_task>, '_add_sim_tasks': <function Multiverse._add_sim_tasks>, '_validate_meta_cfg': <function Multiverse._validate_meta_cfg>, '__dict__': <attribute '__dict__' of 'Multiverse' objects>, '__weakref__': <attribute '__weakref__' of 'Multiverse' objects>, '__annotations__': {}})#
- __module__ = 'utopya.multiverse'#
- __weakref__#
list of weak references to the object (if defined)
- class utopya.multiverse.FrozenMultiverse(*, model_name: Optional[str] = None, info_bundle: Optional[utopya.model_registry.info_bundle.ModelInfoBundle] = None, run_dir: Optional[str] = None, run_cfg_path: Optional[str] = None, user_cfg_path: Optional[str] = None, use_meta_cfg_from_run_dir: bool = False, **update_meta_cfg)[source]#
Bases:
utopya.multiverse.Multiverse
A frozen Multiverse is like a Multiverse, but frozen.
It is initialized from a finished Multiverse run and re-creates all the attributes from that data, e.g.: the meta configuration, a DataManager, and a PlotManager.
Note that it is no longer able to perform any simulations.
Initializes the FrozenMultiverse from a model name and the name of a run directory.
Note that this also takes arguments to specify the run configuration to use.
- Parameters
model_name (str) – The name of the model to load. From this, the model output directory is determined and the run_dir will be seen as relative to that directory.
info_bundle (ModelInfoBundle, optional) – The model information bundle that includes information about the binary path etc. If not given, will attempt to read it from the model registry.
run_dir (str, optional) – The run directory to load. Can be a path relative to the current working directory, an absolute path, or the timestamp of the run directory. If not given, will use the most recent timestamp.
run_cfg_path (str, optional) – The path to the run configuration.
user_cfg_path (str, optional) – If given, this is used to update the base configuration. If None, will look for it in the default path, see Multiverse.USER_CFG_SEARCH_PATH.
use_meta_cfg_from_run_dir (bool, optional) – If True, will load the meta configuration from the given run directory; only works for absolute run directories.
**update_meta_cfg – Can be used to update the meta configuration generated from the previous configuration levels
- __init__(*, model_name: Optional[str] = None, info_bundle: Optional[utopya.model_registry.info_bundle.ModelInfoBundle] = None, run_dir: Optional[str] = None, run_cfg_path: Optional[str] = None, user_cfg_path: Optional[str] = None, use_meta_cfg_from_run_dir: bool = False, **update_meta_cfg)[source]#
Initializes the FrozenMultiverse from a model name and the name of a run directory.
Note that this also takes arguments to specify the run configuration to use.
- Parameters
model_name (str) – The name of the model to load. From this, the model output directory is determined and the run_dir will be seen as relative to that directory.
info_bundle (ModelInfoBundle, optional) – The model information bundle that includes information about the binary path etc. If not given, will attempt to read it from the model registry.
run_dir (str, optional) – The run directory to load. Can be a path relative to the current working directory, an absolute path, or the timestamp of the run directory. If not given, will use the most recent timestamp.
run_cfg_path (str, optional) – The path to the run configuration.
user_cfg_path (str, optional) – If given, this is used to update the base configuration. If None, will look for it in the default path, see Multiverse.USER_CFG_SEARCH_PATH.
use_meta_cfg_from_run_dir (bool, optional) – If True, will load the meta configuration from the given run directory; only works for absolute run directories.
**update_meta_cfg – Can be used to update the meta configuration generated from the previous configuration levels
- __module__ = 'utopya.multiverse'#
utopya.parameter module#
This module implements the Parameter
class
which is used when validating model and simulation parameters.
- exception utopya.parameter.ValidationError[source]#
Bases:
ValueError
Raised upon failure to validate a parameter
- __module__ = 'utopya.parameter'#
- __weakref__#
list of weak references to the object (if defined)
- class utopya.parameter.Parameter(*, default: Any, name: Optional[str] = None, description: Optional[str] = None, is_any_of: Optional[Sequence[Any]] = None, limits: Optional[Tuple[Union[None, float], Union[None, float]]] = None, limits_mode: str = '[]', dtype: Optional[Union[str, type]] = None)[source]#
Bases:
object
The parameter class is used when a model parameter needs to be validated before commencing the model run. It can hold information on the parameter itself as well as its valid range and type and other meta-data.
Per default, the
Parameter
class should be assumed to handle scalar parameters like numerical values or strings. For validating sequence-like parameters, corresponding specializing classes are to be implemeted.Creates a new Parameter object, which holds a default value as well as some constraints on the possible values this parameter can assume.
- Parameters
default (Any) – the default value of the parameter.
name (str, optional) – the name of the parameter.
description (str, optional) – a description of this parameter or its effects.
is_any_of (Sequence[Any], optional) – a sequence of possible values this parameter can assume. If this parameter is given,
limits
cannot be used.limits (Tuple[Union[None, float], Union[None, float]], optional) – the upper and lower bounds of the parameter (only applicable to scalar numerals). If None, the bound is assumed to be negative or positive infinity, respectively. Whether boundary values are included into the interval is controlled by the
limits_mode
argument. This argument is mutually exclusive withis_any_of
!limits_mode (str, optional) – whether to interpret the limits as an open, closed, or semi-closed interval. Possible values:
'[]'
(closed, default),'()'
(open),'[)'
, and'(]'
.dtype (Union[str, type], optional) – expected data type of this parameter. Accepts all strings that are accepted by numpy.dtype , eg.
int
,float
,uint16
,string
.
- Raises
TypeError – On a
limits
argument that was not tuple-like or if alimits
argument was given but thedefault
was aValueError – if an invalid
limits_mode
is passed, iflimits
andis_any_of
are both passed, or if thelimits
argument did not have length 2.
- SHORTHAND_MODES = ('is-probability', 'is-positive', 'is-int', 'is-negative', 'is-positive-int', 'is-string', 'is-negative-int', 'is-bool', 'is-unsigned')#
- LIMIT_COMPS = {'(': <built-in function gt>, ')': <built-in function lt>, '[': <built-in function ge>, ']': <built-in function le>}#
- LIMIT_MODES = ('[]', '()', '[)', '(]')#
- yaml_tag = '!param'#
- __init__(*, default: Any, name: Optional[str] = None, description: Optional[str] = None, is_any_of: Optional[Sequence[Any]] = None, limits: Optional[Tuple[Union[None, float], Union[None, float]]] = None, limits_mode: str = '[]', dtype: Optional[Union[str, type]] = None)[source]#
Creates a new Parameter object, which holds a default value as well as some constraints on the possible values this parameter can assume.
- Parameters
default (Any) – the default value of the parameter.
name (str, optional) – the name of the parameter.
description (str, optional) – a description of this parameter or its effects.
is_any_of (Sequence[Any], optional) – a sequence of possible values this parameter can assume. If this parameter is given,
limits
cannot be used.limits (Tuple[Union[None, float], Union[None, float]], optional) – the upper and lower bounds of the parameter (only applicable to scalar numerals). If None, the bound is assumed to be negative or positive infinity, respectively. Whether boundary values are included into the interval is controlled by the
limits_mode
argument. This argument is mutually exclusive withis_any_of
!limits_mode (str, optional) – whether to interpret the limits as an open, closed, or semi-closed interval. Possible values:
'[]'
(closed, default),'()'
(open),'[)'
, and'(]'
.dtype (Union[str, type], optional) –
expected data type of this parameter. Accepts all strings that are accepted by numpy.dtype , eg.
int
,float
,uint16
,string
.
- Raises
TypeError – On a
limits
argument that was not tuple-like or if alimits
argument was given but thedefault
was aValueError – if an invalid
limits_mode
is passed, iflimits
andis_any_of
are both passed, or if thelimits
argument did not have length 2.
- property default#
The default value for this parameter
- property name#
The name of this parameter
- property description#
The description of this parameter
- property limits: Optional[tuple]#
The limits of this parameter
- property limits_mode: str#
The mode used when evaluating the limits
- property is_any_of: Tuple[Any]#
Possible values this parameter may assume
- property dtype: Optional[numpy.dtype]#
The expected data type of this parameter
- validate(value: Any, *, raise_exc: bool = True) bool [source]#
Checks whether the given value would be a valid parameter.
The checks for the corresponding arguments are carried out in the following order:
is_any_of
dtype
limits
The data type is checked according to the numpy type hierarchy, see docs. To reduce strictness, the following additional compatibilities are taken into account:
for unsigned integer
dtype
, a signed integer-typevalue
is compatible ifvalue >= 0
for floating-point
dtype
, integer-typevalue
are always considered compatiblefor floating-point
dtype
,value
of all floating-point- types are considered compatible, even if they have a lower precision (note the coercion test below, though)
Additionally, it is checked whether
value
is representable as the target data type. This is done by coercingvalue
todtype
and then checking for equality (using np.isclose).- Parameters
value (Any) – The value to test.
raise_exc (bool, optional) – Whether to raise an exception or not.
- Returns
Whether or not the given value is a valid parameter.
- Return type
bool
- Raises
ValidationError – If validation failed or is impossible (for instance due to ambiguous validity parameters). This error message contains further information on why validation failed.
- classmethod from_shorthand(value, *, mode, **kwargs)[source]#
Constructs a Parameter object from a given shorthand mode.
- Parameters
value – A given value, typically the
default
argument.mode – A valid shorthand mode, see
SHORTHAND_MODES
**kwargs – any further arguments for Parameter ininitialization, see
__init__()
.
- Returns
a Parameter object
- classmethod to_yaml(representer, node)[source]#
Represent this Parameter object as a YAML mapping.
- Parameters
representer (ruamel.yaml.representer) – The representer module
node (type(self)) – The node, i.e. an instance of this class
- Returns
a yaml mapping that is able to recreate this object
- classmethod from_yaml(constructor, node)[source]#
The default constructor for Parameter objects, expecting a YAML node that is mapping-like.
- __dict__ = mappingproxy({'__module__': 'utopya.parameter', '__doc__': 'The parameter class is used when a model parameter needs to be validated\n before commencing the model run. It can hold information on the parameter\n itself as well as its valid range and type and other meta-data.\n\n Per default, the ``Parameter`` class should be assumed to handle *scalar*\n parameters like numerical values or strings. For validating sequence-like\n parameters, corresponding specializing classes are to be implemeted.\n ', 'SHORTHAND_MODES': ('is-probability', 'is-positive', 'is-int', 'is-negative', 'is-positive-int', 'is-string', 'is-negative-int', 'is-bool', 'is-unsigned'), 'LIMIT_COMPS': {'[': <built-in function ge>, '(': <built-in function gt>, ']': <built-in function le>, ')': <built-in function lt>}, 'LIMIT_MODES': ('[]', '()', '[)', '(]'), 'yaml_tag': '!param', '__init__': <function Parameter.__init__>, '__eq__': <function Parameter.__eq__>, '__str__': <function Parameter.__str__>, 'default': <property object>, 'name': <property object>, 'description': <property object>, 'limits': <property object>, 'limits_mode': <property object>, 'is_any_of': <property object>, 'dtype': <property object>, 'validate': <function Parameter.validate>, 'from_shorthand': <classmethod object>, 'to_yaml': <classmethod object>, 'from_yaml': <classmethod object>, '__dict__': <attribute '__dict__' of 'Parameter' objects>, '__weakref__': <attribute '__weakref__' of 'Parameter' objects>, '__hash__': None, '__annotations__': {}})#
- __hash__ = None#
- __module__ = 'utopya.parameter'#
- __weakref__#
list of weak references to the object (if defined)
- utopya.parameter.extract_validation_objects(model_cfg: dict, *, model_name: str) Tuple[dict, dict] [source]#
Extracts all
Parameter
objects from a model configuration (a nested dict), replacing them with their default values. Returns both the modified model configuration well as the Parameter objects (keyed by the key sequence necessary to reach them within the model configuration).- Parameters
model_cfg (dict) – the model configuration to inspect
model_name (str) – the name of the model
- Returns
- a tuple of (model config, parameters to validate).
The model config contains the passed config dict in which all Parameter class elements have been replaced by their default entries. The second entry is a dictionary consisting of the Parameter class objects (requiring validation) with keys being key sequences to those Parameter objects. Note that the key sequence is relative to the level above the model configuration, with
model_name
as a common entry for all returned values.
- Return type
Tuple[dict, dict]
utopya.plotting module#
Implements a plotting framework based on dantro
In order to make the plotting framework specific to Utopia, this module derives both from the dantro PlotManager and some PlotCreator classes.
- utopya.plotting.register_operation(*, skip_existing=True, **kws) None [source]#
Register an operation with the dantro data operations database.
This invokes
register_operation()
, but hasskip_existing == True
as default in order to reduce number of arguments that need to be specified in Utopia model plots.- Parameters
skip_existing (bool, optional) – Whether to skip (without an error) if an operation
**kws – Passed to
register_operation()
- class utopya.plotting.PlotHelper(*, out_path: str, helper_defaults: Optional[dict] = None, update_helper_cfg: Optional[dict] = None, raise_on_error: bool = True, animation_enabled: bool = False)[source]#
Bases:
dantro.plot_creators._plot_helper.PlotHelper
A specialization of the dantro
PlotHelper
used in plot creators that are derived fromExternalPlotCreator
.This can be used to add additional helpers for use in Utopia without requiring changes on dantro-side.
Note
The helpers implemented here should try to adhere to the interface exemplified by the dantro
PlotHelper
class, with the aim that they can then be migrated into dantro in the long run.Initialize a Plot Helper with a certain configuration.
This configuration is the so-called “base” configuration and is not axis-specific. There is the possibility to specify axis-specific configuration entries.
All entries in the helper configuration are deemed ‘enabled’ unless they explicitly specify enabled: false in their configuration.
- Parameters
out_path (str) – path to store the created figure. This may be an absolute path or a relative path; the latter is regarded as relative to the current working directory. The home directory indicator
~
is expanded.helper_defaults (dict, optional) – The basic configuration of the helpers.
update_helper_cfg (dict, optional) – A configuration used to update the existing helper defaults
raise_on_error (bool, optional) – Whether to raise on an exception created on helper invocation or just log the error
animation_enabled (bool, optional) – Whether animation mode is enabled.
- __module__ = 'utopya.plotting'#
- class utopya.plotting.ExternalPlotCreator(name: str, *, base_module_file_dir: Optional[str] = None, style: Optional[dict] = None, **parent_kwargs)[source]#
Bases:
dantro.plot_creators.pcr_ext.ExternalPlotCreator
This is the Utopia-specific version of dantro’s
ExternalPlotCreator
.Its main purpose is to define common settings for plotting. By adding this extra layer, it allows for future extensibility as well.
One of the common settings is that it sets as
BASE_PKG
the utopyautopya.plot_funcs
, which is an extension of those functions supplied by dantro.Initialize an ExternalPlotCreator.
- Parameters
name (str) – The name of this plot
base_module_file_dir (str, optional) – If given,
module_file
arguments to the_plot
method that are relative paths will be seen relative to this directorystyle (dict, optional) – The default style context defintion to enter before calling the plot function. This can be used to specify the aesthetics of a plot. It is evaluated here once, stored as attribute, and can be updated when the plot method is called.
**parent_kwargs – Passed to the parent __init__
- Raises
ValueError – On invalid
base_module_file_dir
argument
- EXTENSIONS = 'all'#
- DEFAULT_EXT = 'pdf'#
- BASE_PKG = 'utopya.plot_funcs'#
- PLOT_HELPER_CLS#
alias of
utopya.plotting.PlotHelper
- CUSTOM_PLOT_MODULE_NAME = 'model_plots'#
- CUSTOM_PLOT_MODULE_PATHS = {'Utopia': '/builds/utopia-project/docs/utopia/python/model_plots'}#
- __abstractmethods__ = frozenset({})#
- __module__ = 'utopya.plotting'#
- class utopya.plotting.UniversePlotCreator(*args, psgrp_path: Optional[str] = None, **kwargs)[source]#
Bases:
dantro.plot_creators.pcr_psp.UniversePlotCreator
,utopya.plotting.ExternalPlotCreator
Makes plotting with data from a single universe more convenient
Initialize a UniversePlotCreator
- PSGRP_PATH = 'multiverse'#
- __abstractmethods__ = frozenset({})#
- __module__ = 'utopya.plotting'#
- class utopya.plotting.MultiversePlotCreator(*args, psgrp_path: Optional[str] = None, **kwargs)[source]#
Bases:
dantro.plot_creators.pcr_psp.MultiversePlotCreator
,utopya.plotting.ExternalPlotCreator
Makes plotting with data from a all universes more convenient
Initialize a MultiversePlotCreator
- Parameters
*args – Passed on to parent
psgrp_path (str, optional) – The path to the associated
ParamSpaceGroup
that is to be used for these multiverse plots.**kwargs – Passed on to parent
- PSGRP_PATH = 'multiverse'#
- __abstractmethods__ = frozenset({})#
- __module__ = 'utopya.plotting'#
- class utopya.plotting.PlotManager(*args, _model_info_bundle: Optional[utopya.model_registry.info_bundle.ModelInfoBundle] = None, **kwargs)[source]#
Bases:
dantro.plot_mngr.PlotManager
This is the Utopia-specific version of the dantro PlotManager class
It registers the Utopia-specific plot creators and allows for custom interface specifications.
Sets up a PlotManager.
This additionally stores some Utopia-specific metadata about the model this PlotManager is used with. That information is then used to load some additional model-specific information once a creator is invoked.
- CREATORS = {'multiverse': <class 'utopya.plotting.MultiversePlotCreator'>, 'universe': <class 'utopya.plotting.UniversePlotCreator'>}#
- __init__(*args, _model_info_bundle: Optional[utopya.model_registry.info_bundle.ModelInfoBundle] = None, **kwargs)[source]#
Sets up a PlotManager.
This additionally stores some Utopia-specific metadata about the model this PlotManager is used with. That information is then used to load some additional model-specific information once a creator is invoked.
- property common_out_dir: str#
The common output directory of all plots that were created with this plot manager instance. This uses the plot output paths stored in the plot information dict, specifically the
target_dir
entry.If there was no plot information yet, the return value will be empty.
- plot_from_cfg(*args, plots_cfg: Optional[Union[dict, str]] = None, **kwargs)[source]#
Thin wrapper around parent method that shows which plot configuration file will be used.
- __module__ = 'utopya.plotting'#
utopya.reporter module#
Implementation of the Reporter class.
- class utopya.reporter.ReportFormat(*, parser: Callable, writers: List[Callable], min_report_intv: Optional[float] = None)[source]#
Bases:
object
Initialises a ReportFormat object, which gathers callables needed to create a report in a certain format.
- Parameters
parser (Callable) – The parser method to use
writers (List[Callable]) – The writer method(s) to use
min_report_intv (float, optional) – The minimum report interval of reports in this format.
- __init__(*, parser: Callable, writers: List[Callable], min_report_intv: Optional[float] = None)[source]#
Initialises a ReportFormat object, which gathers callables needed to create a report in a certain format.
- Parameters
parser (Callable) – The parser method to use
writers (List[Callable]) – The writer method(s) to use
min_report_intv (float, optional) – The minimum report interval of reports in this format.
- property min_report_intv: Optional[datetime.timedelta]#
Returns the minimum report intv
- property reporting_blocked: bool#
Determines whether this ReportFormat was generated
- report(*, force: bool = False, parser_kwargs: Optional[dict] = None) bool [source]#
Parses and writes a report corresponding to this object’s format.
If within the minimum report interval, will return False.
- Parameters
force (bool, optional) – If True, will ignore the min_report_intv
parser_kwargs (dict, optional) – Keyword arguments passed on to the parser
- Returns
Whether a report was generated or not
- Return type
bool
- __dict__ = mappingproxy({'__module__': 'utopya.reporter', '__init__': <function ReportFormat.__init__>, 'min_report_intv': <property object>, 'reporting_blocked': <property object>, 'report': <function ReportFormat.report>, '__dict__': <attribute '__dict__' of 'ReportFormat' objects>, '__weakref__': <attribute '__weakref__' of 'ReportFormat' objects>, '__doc__': None, '__annotations__': {}})#
- __module__ = 'utopya.reporter'#
- __weakref__#
list of weak references to the object (if defined)
- class utopya.reporter.Reporter(*, report_formats: Optional[Union[List[str], Dict[str, dict]]] = None, default_format: Optional[str] = None, report_dir: Optional[str] = None, suppress_cr: bool = False)[source]#
Bases:
object
The Reporter class holds general reporting capabilities.
It needs to be subclassed in order to specialise its reporting functions.
Initialize the Reporter for the WorkerManager.
- Parameters
report_formats (Union[List[str], Dict[str, dict]], optional) – The report formats to use with this reporter. If given as list of strings, the strings are the names of the report formats as well as those of the parsers; all other parameters are the defaults. If given as dict of dicts, the keys are the names of the formats and the inner dicts are the parameters to create report formats from.
default_format (str, optional) – The name of the default report format; if None is given, the .report method requires the name of a report format.
report_dir (str, optional) – if reporting to a file; this is the base directory that is reported to.
suppress_cr (bool, optional) – Whether to suppress carriage return characters in writers. This option is useful when the reporter is not the only class that writes to a stream.
- __init__(*, report_formats: Optional[Union[List[str], Dict[str, dict]]] = None, default_format: Optional[str] = None, report_dir: Optional[str] = None, suppress_cr: bool = False)[source]#
Initialize the Reporter for the WorkerManager.
- Parameters
report_formats (Union[List[str], Dict[str, dict]], optional) – The report formats to use with this reporter. If given as list of strings, the strings are the names of the report formats as well as those of the parsers; all other parameters are the defaults. If given as dict of dicts, the keys are the names of the formats and the inner dicts are the parameters to create report formats from.
default_format (str, optional) – The name of the default report format; if None is given, the .report method requires the name of a report format.
report_dir (str, optional) – if reporting to a file; this is the base directory that is reported to.
suppress_cr (bool, optional) – Whether to suppress carriage return characters in writers. This option is useful when the reporter is not the only class that writes to a stream.
- property report_formats: dict#
Returns the dict of ReportFormat objects.
- property default_format: Union[None, utopya.reporter.ReportFormat]#
Returns the default report format or None, if not set.
- property suppress_cr: bool#
Whether to suppress a carriage return. Objects using the reporter can set this property to communicate that they will be putting content into the stdout stream as well. The writers can check this property and adjust their behaviour accordingly.
- add_report_format(name: str, *, parser: Optional[str] = None, write_to: Union[str, Dict[str, dict]] = 'stdout', min_report_intv: Optional[float] = None, rf_kwargs: Optional[dict] = None, **parser_kwargs)[source]#
Add a report format to this reporter.
- Parameters
name (str) – The name of this format
parser (str, optional) – The name of the parser; if not given, the name of the report format is assumed
write_to (Union[str, Dict[str, dict]], optional) – The name of the writer. If this is a dict of dict, the keys will be interpreted as the names of the writers and the nested dict as the
**kwargs
to the writer function.min_report_intv (float, optional) – The minimum report interval (in seconds) for this report format
rf_kwargs (dict, optional) – Further kwargs to ReportFormat.__init__
**parser_kwargs – The kwargs to the parser function
- Raises
ValueError – A report format with this name already exists
- report(report_format: Optional[str] = None, **kwargs) bool [source]#
Create a report with the given format; if none is given, the default format is used.
- Parameters
report_format (str, optional) – The report format to use
**kwargs – Passed on to the ReportFormat.report() call
- Returns
Whether there was a report
- Return type
bool
- Raises
ValueError – If no default format was set and no report format name was given
- parse_and_write(*, parser: Union[str, Callable], write_to: Union[str, Callable], **parser_kwargs)[source]#
This function allows to select a parser and writer explicitly.
- Parameters
parser (Union[str, Callable]) – The parser method to use.
write_to (Union[str, Callable]) – The write method to use. Can also be a sequence of names and/or callables or a Dict. For allowed specification formats, see the ._resolve_writers method.
**parser_kwargs – Passed to the parser, if given
- __dict__ = mappingproxy({'__module__': 'utopya.reporter', '__doc__': 'The Reporter class holds general reporting capabilities.\n\n It needs to be subclassed in order to specialise its reporting functions.\n ', '__init__': <function Reporter.__init__>, 'report_formats': <property object>, 'default_format': <property object>, 'suppress_cr': <property object>, 'add_report_format': <function Reporter.add_report_format>, 'report': <function Reporter.report>, 'parse_and_write': <function Reporter.parse_and_write>, '_resolve_parser': <function Reporter._resolve_parser>, '_resolve_writers': <function Reporter._resolve_writers>, '_write_to_stdout': <function Reporter._write_to_stdout>, '_write_to_stdout_noreturn': <function Reporter._write_to_stdout_noreturn>, '_write_to_log': <function Reporter._write_to_log>, '_write_to_file': <function Reporter._write_to_file>, '__dict__': <attribute '__dict__' of 'Reporter' objects>, '__weakref__': <attribute '__weakref__' of 'Reporter' objects>, '__annotations__': {}})#
- __module__ = 'utopya.reporter'#
- __weakref__#
list of weak references to the object (if defined)
- class utopya.reporter.WorkerManagerReporter(wm: utopya.workermanager.WorkerManager, *, mv: utopya.multiverse.Multiverse = None, **reporter_kwargs)[source]#
Bases:
utopya.reporter.Reporter
This class reports on the state of the WorkerManager.
Initialize the Reporter for the WorkerManager.
- Parameters
wm (utopya.workermanager.WorkerManager) – The associated WorkerManager instance
mv (utopya.multiverse.Multiverse, optional) – The Multiverse this reporter is used in. If this is provided, it can be used in report parsers, e.g. to provide additional information on simulations.
**reporter_kwargs – Passed on to parent method
- TTY_MARGIN = 4#
- PROGRESS_BAR_SYMBOLS = {'active': '░', 'active_progress': '▒', 'finished': '▓', 'space': ' '}#
- __init__(wm: utopya.workermanager.WorkerManager, *, mv: utopya.multiverse.Multiverse = None, **reporter_kwargs)[source]#
Initialize the Reporter for the WorkerManager.
- Parameters
wm (utopya.workermanager.WorkerManager) – The associated WorkerManager instance
mv (utopya.multiverse.Multiverse, optional) – The Multiverse this reporter is used in. If this is provided, it can be used in report parsers, e.g. to provide additional information on simulations.
**reporter_kwargs – Passed on to parent method
- property wm#
Returns the associated WorkerManager.
- property task_counters: collections.OrderedDict#
Returns a dict of task counters:
total
: total number of registered WorkerManager tasksactive
: number of currently active tasksfinished
: number of finished tasks, including tasks that were stopped via a stop conditionstopped
: number of tasks for which stop conditions were fulfilled, see Stop Conditions
- property wm_progress: float#
The WorkerManager progress, between 0 and 1.
- property wm_active_tasks_progress: float#
The active tasks’ progress
If there are no active tasks in the worker manager, returns 0
- property wm_elapsed: Optional[datetime.timedelta]#
Seconds elapsed since start of working or None if not yet started
- property wm_times: dict#
Return the characteristics WorkerManager times. Calls
get_progress_info()
without any additional arguments.
- register_task(task: utopya.task.WorkerTask)[source]#
Given the task object, extracts and stores some information.
The information currently extracted is the run time and the exit code.
This can be used as a callback function from a WorkerTask object.
- Parameters
task (utopya.task.WorkerTask) – The WorkerTask to extract information from.
- calc_runtime_statistics(min_num: int = 10) collections.OrderedDict [source]#
Calculates the current runtime statistics.
- Returns
- name of the calculated statistic and its value, i.e.
the runtime in seconds
- Return type
OrderedDict
- get_progress_info(**eta_options) Dict[str, float] [source]#
Compiles a dict containing progress information for the current work session.
- Parameters
**eta_options – Passed on to method calculating
est_left
,_compute_est_left()
.- Returns
- Progress information. Guaranteed to contain the
keys
start
,now
,elapsed
,est_left
,est_end
, andend
.
- Return type
Dict[str, float]
- __module__ = 'utopya.reporter'#
utopya.stopcond module#
This module implements the StopCondition class, which is used by the WorkerManager to stop a worker process in certain situations.
- class utopya.stopcond.StopCondition(*, to_check: Optional[List[dict]] = None, name: Optional[str] = None, description: Optional[str] = None, enabled: bool = True, func: Optional[Union[Callable, str]] = None, **func_kwargs)[source]#
Bases:
object
A StopCondition object holds information on the conditions in which a worker process should be stopped.
It is formulated in a general way, applying to all Workers. The attributes of this class store the information required to deduce whether the condition if fulfilled or not.
Create a new stop condition
- Parameters
to_check (List[dict], optional) – A list of dicts, that holds the functions to call and the arguments to call them with. The only requirement for the dict is that the func key is available. All other keys are unpacked and passed as kwargs to the given function. The func key can be either a callable or a string corresponding to a name in the utopya.stopcond_funcs module.
name (str, optional) – The name of this stop condition
description (str, optional) – A short description of this stop condition
enabled (bool, optional) – Whether this stop condition should be checked; if False, it will be created but will always be un- fulfilled when checked.
func (Union[Callable, str], optional) – (For the short syntax only!) If no to_check argument is given, a function can be given here that will be the only one that is checked. If this argument is a string, it is also resolved from the utopya stopcond_funcs module.
**func_kwargs – (For the short syntax) The kwargs that are passed to the single stop condition function
- __init__(*, to_check: Optional[List[dict]] = None, name: Optional[str] = None, description: Optional[str] = None, enabled: bool = True, func: Optional[Union[Callable, str]] = None, **func_kwargs)[source]#
Create a new stop condition
- Parameters
to_check (List[dict], optional) – A list of dicts, that holds the functions to call and the arguments to call them with. The only requirement for the dict is that the func key is available. All other keys are unpacked and passed as kwargs to the given function. The func key can be either a callable or a string corresponding to a name in the utopya.stopcond_funcs module.
name (str, optional) – The name of this stop condition
description (str, optional) – A short description of this stop condition
enabled (bool, optional) – Whether this stop condition should be checked; if False, it will be created but will always be un- fulfilled when checked.
func (Union[Callable, str], optional) – (For the short syntax only!) If no to_check argument is given, a function can be given here that will be the only one that is checked. If this argument is a string, it is also resolved from the utopya stopcond_funcs module.
**func_kwargs – (For the short syntax) The kwargs that are passed to the single stop condition function
- property fulfilled_for: Set[utopya.task.Task]#
The set of tasks this stop condition was fulfilled for
- __str__() str [source]#
A string representation for this StopCondition, including the name and, if given, the description.
- fulfilled(task: utopya.task.Task) bool [source]#
Checks if the stop condition is fulfilled for the given worker, using the information from the dict.
All given stop condition functions are evaluated; if all of them return True, this method will also return True.
Furthermore, if the stop condition is fulfilled, the task’s set of fulfilled stop conditions will
- Parameters
task (utopya.task.Task) – Task object that is to be checked
- Returns
- If all stop condition functions returned true for the given
worker and its current information
- Return type
bool
- yaml_tag = '!stop-condition'#
- classmethod to_yaml(representer, node)[source]#
Creates a yaml representation of the StopCondition object by storing the initialization kwargs as a yaml mapping.
- Parameters
representer (ruamel.yaml.representer) – The representer module
node (type(self)) – The node, i.e. an instance of this class
- Returns
a yaml mapping that is able to recreate this object
- classmethod from_yaml(constructor, node)[source]#
Creates a StopCondition object by unpacking the given mapping such that all stored arguments are available to __init__.
- __dict__ = mappingproxy({'__module__': 'utopya.stopcond', '__doc__': 'A StopCondition object holds information on the conditions in which a\n worker process should be stopped.\n\n It is formulated in a general way, applying to all Workers. The attributes\n of this class store the information required to deduce whether the\n condition if fulfilled or not.\n ', '__init__': <function StopCondition.__init__>, 'fulfilled_for': <property object>, '_resolve_sc_funcs': <staticmethod object>, '__str__': <function StopCondition.__str__>, 'fulfilled': <function StopCondition.fulfilled>, 'yaml_tag': '!stop-condition', 'to_yaml': <classmethod object>, 'from_yaml': <classmethod object>, '__dict__': <attribute '__dict__' of 'StopCondition' objects>, '__weakref__': <attribute '__weakref__' of 'StopCondition' objects>, '__annotations__': {}})#
- __module__ = 'utopya.stopcond'#
- __weakref__#
list of weak references to the object (if defined)
utopya.stopcond_funcs module#
Here, functions that are used in the StopCondition class are defined.
These all get passed the worker task, information and additional kwargs.
Required signature: (task: WorkerTask, **kws) -> bool
- utopya.stopcond_funcs.timeout_wall(task: utopya.task.WorkerTask, *, seconds: float) bool [source]#
Checks the wall timeout of the given worker
- Parameters
task (utopya.task.WorkerTask) – The WorkerTask object to check
seconds (float) – After how many seconds to trigger the wall timeout
- Returns
Whether the timeout is fulfilled
- Return type
bool
- utopya.stopcond_funcs.check_monitor_entry(task: utopya.task.WorkerTask, *, entry_name: str, operator: str, value: float) bool [source]#
Checks if a monitor entry compares in a certain way to a given value
- Parameters
task (utopya.task.WorkerTask) – The WorkerTask object to check
entry_name (str) – The name of the monitor entry, leading to the value to the left-hand side of the operator
operator (str) – The binary operator to use
value (float) – The right-hand side value to compare to
- Returns
Result of op(entry, value)
- Return type
bool
utopya.task module#
The Task class supplies a container for all information needed for a task.
The WorkerTask and ProcessTask classes specialize on tasks for the WorkerManager that work on subprocesses or multiprocessing processes.
- utopya.task.enqueue_lines(*, queue: queue.Queue, stream: TextIO, follow: bool = False, parse_func: Optional[Callable] = None) None [source]#
From the given text stream, read line-buffered lines and add them to the provided queue as 2-tuples, (line, parsed object).
This function is meant to be passed to an individual thread in which it can read individual lines separately from the main thread. Before exiting this function, the stream is closed.
- Parameters
queue (queue.Queue) – The queue object to put the read line and parsed objects into.
stream (TextIO) – The stream identifier. If this is not a text stream, be aware that the elements added to the queue might need decoding.
follow (bool, optional) – If instead of
iter(stream.readline)
, the_follow()
function should be used instead. This should be selected if the stream is file-like instead ofsys.stdout
-like.parse_func (Callable, optional) – A parse function that the read line is passed through. This should be a unary function that either returns a successfully parsed line or None.
- utopya.task.parse_yaml_dict(line: str, *, start_str: str = '!!map') Union[None, dict] [source]#
A yaml parse function that can be passed to enqueue_lines. It only tries parsing the line if it starts with the provided start string.
It tries to decode the line, and parse it as a yaml. If that fails, it will still try to decode the string. If that fails yet again, the unchanged line will be returned.
- Parameters
line (str) – The line to decode, assumed byte-string, utf8-encoded
start_str (str, optional) – Description
- Returns
either the decoded dict, or, if that failed:
- Return type
Union[None, dict]
- class utopya.task.Task(*, name: Optional[str] = None, priority: Optional[float] = None, callbacks: Optional[Dict[str, Callable]] = None, progress_func: Optional[Callable] = None)[source]#
Bases:
object
The Task is a container for a task handled by the WorkerManager.
It aims to provide the necessary interfaces for the WorkerManager to easily associate tasks with the corresponding workers and vice versa.
Initialize a Task object.
- Parameters
name (str, optional) – The task’s name. If none is given, the generated uuid will be used.
priority (float, optional) – The priority of this task; if None, default is +np.inf, i.e. the lowest priority. If two priority values are the same, the task created earlier has a higher priority.
callbacks (Dict[str, Callable], optional) – A dict of callback funcs that are called at different points of the life of this task. The function gets passed as only argument this task object.
progress_func (Callable, optional) – Invoked by the
progress
property and used to calculate the progress given the current task object as argument
- __slots__ = ('_name', '_priority', '_uid', '_progress_func', '_stop_conditions', 'callbacks')#
- __init__(*, name: Optional[str] = None, priority: Optional[float] = None, callbacks: Optional[Dict[str, Callable]] = None, progress_func: Optional[Callable] = None)[source]#
Initialize a Task object.
- Parameters
name (str, optional) – The task’s name. If none is given, the generated uuid will be used.
priority (float, optional) – The priority of this task; if None, default is +np.inf, i.e. the lowest priority. If two priority values are the same, the task created earlier has a higher priority.
callbacks (Dict[str, Callable], optional) – A dict of callback funcs that are called at different points of the life of this task. The function gets passed as only argument this task object.
progress_func (Callable, optional) – Invoked by the
progress
property and used to calculate the progress given the current task object as argument
- callbacks#
- property name: str#
The task’s name, if given; else the uid.
- property uid: int#
The task’s unique ID
- property priority: float#
The task’s priority. Default is +inf, which is the lowest priority
- property order_tuple: tuple#
Returns the ordering tuple (priority, uid.time)
- property progress: float#
If a progress function is given, invokes it; otherwise returns 0
This also performs checks that the progress is in [0, 1]
- property fulfilled_stop_conditions: Set[StopCondition]#
The set of fulfilled stop conditions for this task. Typically, this is set by the
StopCondition
itself as part of its evaluation in theutopya.stopcond.StopCondition.fulfilled()
method.
- __eq__(other) bool [source]#
Evaluates equality of two tasks: returns true only if identical.
Note
We trust that the unique ID of each task (generated with
uuid
) is really unique, therefore different tasks can never be fully equivalent.
- __module__ = 'utopya.task'#
- class utopya.task.WorkerTask(*, setup_func: Optional[Callable] = None, setup_kwargs: Optional[dict] = None, worker_kwargs: Optional[dict] = None, **task_kwargs)[source]#
Bases:
utopya.task.Task
A specialisation of
Task
for use in theWorkerManager
.It is able to spawn a worker process using
subprocess.Popen
, executing the task in a non-blocking manner. At the same time, the worker’s stream can be read in via another non-blocking thread and stream information can be parsed. Furthermore, this class provides most of the interface for signalling the spawned process.For an equivalent class that uses
multiprocessing
instead ofsubprocess
, see the derivedMPProcessTask
.Initialize a WorkerTask.
This is a specialization of
Task
for use in theWorkerManager
.- Parameters
setup_func (Callable, optional) – The setup function to use before this task is spawned; this allows to dynamically handle the worker arguments. It is called with the
worker_kwargs
keyword argument, containing the dict passed here. Additionally,setup_kwargs
are unpacked into the funtion call. The function should return a dict that is then used asworker_kwargs
for the individual task.setup_kwargs (dict, optional) – The keyword arguments unpacked into the
setup_func
call.worker_kwargs (dict, optional) – The keyword arguments needed to spawn the worker. Note that these are also passed to
setup_func
and, if asetup_func
is given, the return value of that function will be used for theworker_kwargs
.**task_kwargs – Arguments to be passed to
__init__()
, including the callbacks dictionary among other things.
- Raises
ValueError – If neither
setup_func
norworker_kwargs
were given, thus lacking information on how to spawn the worker.
- __slots__ = ('setup_func', 'setup_kwargs', 'worker_kwargs', '_worker', '_worker_pid', '_worker_status', 'streams', 'profiling')#
- STREAM_PARSE_FUNCS = {'default': None, 'yaml_dict': <function parse_yaml_dict>}#
- __init__(*, setup_func: Optional[Callable] = None, setup_kwargs: Optional[dict] = None, worker_kwargs: Optional[dict] = None, **task_kwargs)[source]#
Initialize a WorkerTask.
This is a specialization of
Task
for use in theWorkerManager
.- Parameters
setup_func (Callable, optional) – The setup function to use before this task is spawned; this allows to dynamically handle the worker arguments. It is called with the
worker_kwargs
keyword argument, containing the dict passed here. Additionally,setup_kwargs
are unpacked into the funtion call. The function should return a dict that is then used asworker_kwargs
for the individual task.setup_kwargs (dict, optional) – The keyword arguments unpacked into the
setup_func
call.worker_kwargs (dict, optional) – The keyword arguments needed to spawn the worker. Note that these are also passed to
setup_func
and, if asetup_func
is given, the return value of that function will be used for theworker_kwargs
.**task_kwargs – Arguments to be passed to
__init__()
, including the callbacks dictionary among other things.
- Raises
ValueError – If neither
setup_func
norworker_kwargs
were given, thus lacking information on how to spawn the worker.
- setup_func#
- setup_kwargs#
- worker_kwargs#
- streams#
- profiling#
- property worker: subprocess.Popen#
The associated worker process object or None, if not yet created.
- property worker_pid: int#
The process ID of the associated worker process
- property worker_status: Optional[int]#
The worker processe’s current status or False, if there is no worker spawned yet.
Note that this invokes a poll to the worker process if one was spawned.
- Returns
- Current worker status. False, if there was no
worker associated yet.
- Return type
Union[int, None]
- property outstream_objs: list#
Returns the list of objects parsed from the ‘out’ stream
- spawn_worker() subprocess.Popen [source]#
Spawn a worker process using subprocess.Popen and manage the corresponding queue and thread for reading the stdout stream.
If there is a
setup_func
, this function will be called first.Afterwards, from the worker_kwargs returned by that function or from the ones given during initialisation (if no
setup_func
was given), the worker process is spawned and associated with this task.- Returns
The created process object
- Return type
subprocess.Popen
- Raises
RuntimeError – If a worker was already spawned for this task.
TypeError – For invalid
args
argument
- read_streams(stream_names: list = 'all', *, max_num_reads: int = 10, forward_directly: bool = False) None [source]#
Read the streams associated with this task’s worker.
- Parameters
stream_names (list, optional) – The list of stream names to read. If
all
(default), will read all streams.max_num_reads (int, optional) –
How many lines should be read from the buffer. For -1, reads the whole buffer.
Warning
Do not make this value too large as it could block the whole reader thread of this worker.
forward_directly (bool, optional) – Whether to call the
forward_streams()
method; this is done before the callback and can be useful if the callback should not happen before the streams are forwarded.
- Returns
None
- save_streams(stream_names: list = 'all', *, final: bool = False)[source]#
For each stream, checks if it is to be saved, and if yes: saves it.
The saving location is stored in the streams dict. The relevant keys are the save flag and the save_path string.
Note that this function does not save the whole stream log, but only those part of the stream log that have not already been saved. The position up to which the stream was saved is stored under the lines_saved key in the stream dict.
- Parameters
stream_names (list, optional) – The list of stream names to _check_. If ‘all’ (default), will check all streams whether the save flag is set.
save_raw (bool, optional) – If True, stores the raw log; otherwise stores the regular log, i.e. the lines that were parseable not included.
final (bool, optional) – If True, this is regarded as the final save operation for the stream, which will lead to additional information being saved to the end of the log.
remove_ansi (bool, optional) – If True, will remove ANSI escape characters (e.g. from colored logging) from the log before saving to file.
- Returns
None
- forward_streams(stream_names: list = 'all', forward_raw: bool = False) bool [source]#
Forwards the streams to stdout, either via logging module or print
This function can be periodically called to forward the part of the stream logs that was not already forwarded to stdout.
The information for that is stored in the stream dict. The log_level entry is used to determine whether the logging module should be used or (in case of None) the print method.
- Parameters
stream_names (list, optional) – The list of streams to print
- Returns
whether there was any output
- Return type
bool
- signal_worker(signal: str) tuple [source]#
Sends a signal to this WorkerTask’s worker.
- Parameters
signal (str) – The signal to send. Needs to be a valid signal name, i.e.: available in python signal module.
- Raises
ValueError – When an invalid signal argument was given
- Returns
(signal: str, signum: int) sent to the worker
- Return type
tuple
- __module__ = 'utopya.task'#
- class utopya.task.PopenMPProcess(args: tuple, kwargs: dict = {}, stdin=None, stdout=None, stderr=None, bufsize: int = - 1, encoding: str = 'utf8')[source]#
Bases:
object
A wrapper around multiprocessing.Process that replicates (wide parts of) the interface of subprocess.Popen.
Creates a
multiprocessing.Process
and starts it.The interface here is a subset of
subprocess.Popen
that makes those features available that make sense for amultiprocessing.Process
, mainly: stream reading.Subsequently, the interface is quite a bit different to that of the
multiprocessing.Process
. The most important arguments of that interface aretarget
,args
, andkwargs
, which can be set as follows:target
will beargs[0]
args
will beargs[1:]
kwargs
is an additional keyword argument that is not part ofthe
subprocess.Popen
interface typically.
Regarding the stream arguments, the following steps are done to attach custom pipes: If any argument is a
subprocess.PIPE
or another stream specifier that is notsubprocess.DEVNULL
, a newmultiprocessing.Pipe
and a reader thread will be established.- Parameters
args (tuple) – The
target
callable (args[0]
) and subsequent positional arguments.kwargs (dict, optional) – Keyword arguments for the
target
.stdin (None, optional) – The stdin stream
stdout (None, optional) – The stdout stream
stderr (None, optional) – The stderr stream
bufsize (int, optional) – The buffersize to use.
encoding (str, optional) – The encoding to use for the streams; should typically remain
utf8
, using other values is not encouraged!
- __init__(args: tuple, kwargs: dict = {}, stdin=None, stdout=None, stderr=None, bufsize: int = - 1, encoding: str = 'utf8')[source]#
Creates a
multiprocessing.Process
and starts it.The interface here is a subset of
subprocess.Popen
that makes those features available that make sense for amultiprocessing.Process
, mainly: stream reading.Subsequently, the interface is quite a bit different to that of the
multiprocessing.Process
. The most important arguments of that interface aretarget
,args
, andkwargs
, which can be set as follows:target
will beargs[0]
args
will beargs[1:]
kwargs
is an additional keyword argument that is not part ofthe
subprocess.Popen
interface typically.
Regarding the stream arguments, the following steps are done to attach custom pipes: If any argument is a
subprocess.PIPE
or another stream specifier that is notsubprocess.DEVNULL
, a newmultiprocessing.Pipe
and a reader thread will be established.- Parameters
args (tuple) – The
target
callable (args[0]
) and subsequent positional arguments.kwargs (dict, optional) – Keyword arguments for the
target
.stdin (None, optional) – The stdin stream
stdout (None, optional) – The stdout stream
stderr (None, optional) – The stderr stream
bufsize (int, optional) – The buffersize to use.
encoding (str, optional) – The encoding to use for the streams; should typically remain
utf8
, using other values is not encouraged!
- poll() Optional[int] [source]#
Check if child process has terminated. Set and return
returncode
attribute. Otherwise, returns None.With the underlying process being a multiprocessing.Process, this method is equivalent to the
returncode
property.
- wait(timeout=None)[source]#
Wait for the process to finish; blocking call.
This method is not yet implemented, but will be!
- communicate(input=None, timeout=None)[source]#
Communicate with the process.
This method is not yet implemented! Not sure if it will be …
- property args: tuple#
The
args
argument to this process. Note that the returned tuple includes the target callable as its first entry.Note that these have already been passed to the process; changing them has no effect.
- property kwargs#
Keyword arguments passed to the target callable.
Note that these have already been passed to the process; changing them has no effect.
- property stdin#
The attached
stdin
stream
- property stdout#
The attached
stdout
stream
- property stderr#
The attached
stderr
stream
- property pid#
Process ID of the child process
- property returncode: Optional[int]#
The child return code, set by
poll()
andwait()
(and indirectly bycommunicate()
). A None value indicates that the process hasn’t terminated yet.A negative value
-N
indicates that the child was terminated by signalN
(POSIX only).
- __dict__ = mappingproxy({'__module__': 'utopya.task', '__doc__': 'A wrapper around multiprocessing.Process that replicates (wide parts of)\n the interface of subprocess.Popen.\n ', '__init__': <function PopenMPProcess.__init__>, '_prepare_target_args': <function PopenMPProcess._prepare_target_args>, '__del__': <function PopenMPProcess.__del__>, '__str__': <function PopenMPProcess.__str__>, 'poll': <function PopenMPProcess.poll>, 'wait': <function PopenMPProcess.wait>, 'communicate': <function PopenMPProcess.communicate>, 'send_signal': <function PopenMPProcess.send_signal>, 'terminate': <function PopenMPProcess.terminate>, 'kill': <function PopenMPProcess.kill>, 'args': <property object>, 'kwargs': <property object>, 'stdin': <property object>, 'stdout': <property object>, 'stderr': <property object>, 'pid': <property object>, 'returncode': <property object>, '__dict__': <attribute '__dict__' of 'PopenMPProcess' objects>, '__weakref__': <attribute '__weakref__' of 'PopenMPProcess' objects>, '__annotations__': {}})#
- __module__ = 'utopya.task'#
- __weakref__#
list of weak references to the object (if defined)
- class utopya.task.MPProcessTask(*, setup_func: Optional[Callable] = None, setup_kwargs: Optional[dict] = None, worker_kwargs: Optional[dict] = None, **task_kwargs)[source]#
Bases:
utopya.task.WorkerTask
A WorkerTask specialization that uses multiprocessing.Process instead of subprocess.Popen.
It is mostly equivalent to
WorkerTask
but adjusts the private methods that take care of spawning the actual process and setting up the stream readers, such that the particularities of thePopenMPProcess
wrapper are accounted for.Initialize a WorkerTask.
This is a specialization of
Task
for use in theWorkerManager
.- Parameters
setup_func (Callable, optional) – The setup function to use before this task is spawned; this allows to dynamically handle the worker arguments. It is called with the
worker_kwargs
keyword argument, containing the dict passed here. Additionally,setup_kwargs
are unpacked into the funtion call. The function should return a dict that is then used asworker_kwargs
for the individual task.setup_kwargs (dict, optional) – The keyword arguments unpacked into the
setup_func
call.worker_kwargs (dict, optional) – The keyword arguments needed to spawn the worker. Note that these are also passed to
setup_func
and, if asetup_func
is given, the return value of that function will be used for theworker_kwargs
.**task_kwargs – Arguments to be passed to
__init__()
, including the callbacks dictionary among other things.
- Raises
ValueError – If neither
setup_func
norworker_kwargs
were given, thus lacking information on how to spawn the worker.
- setup_func#
- setup_kwargs#
- worker_kwargs#
- streams#
- profiling#
- __dict__ = mappingproxy({'__module__': 'utopya.task', '__doc__': 'A WorkerTask specialization that uses multiprocessing.Process instead\n of subprocess.Popen.\n\n It is mostly equivalent to :py:class:`~utopya.task.WorkerTask` but adjusts\n the private methods that take care of spawning the actual process and\n setting up the stream readers, such that the particularities of the\n :py:class:`~utopya.task.PopenMPProcess` wrapper are accounted for.\n ', '_spawn_process': <function MPProcessTask._spawn_process>, '_setup_stream_reader': <function MPProcessTask._setup_stream_reader>, '_stop_stream_reader': <function MPProcessTask._stop_stream_reader>, '__dict__': <attribute '__dict__' of 'MPProcessTask' objects>, '__weakref__': <attribute '__weakref__' of 'MPProcessTask' objects>, '__annotations__': {}})#
- __module__ = 'utopya.task'#
- __weakref__#
list of weak references to the object (if defined)
- class utopya.task.TaskList[source]#
Bases:
object
The TaskList stores Task objects in it, ensuring that none is in there twice and allows to lock it to prevent adding new tasks.
Initialize an empty TaskList.
- __contains__(val: utopya.task.Task) bool [source]#
Checks if the given object is contained in this TaskList.
- __getitem__(idx: int) utopya.task.Task [source]#
Returns the item at the given index in the TaskList.
- lock()[source]#
If called, the TaskList becomes locked and allows no further calls to the append method.
- append(val: utopya.task.Task)[source]#
Append a Task object to this TaskList
- Parameters
val (Task) – The task to add
- Raises
RuntimeError – If TaskList object was locked
TypeError – Tried to add a non-Task type object
ValueError – Task already added to this TaskList
- __add__(tasks: Sequence[utopya.task.Task])[source]#
Appends all the tasks in the given iterable to the task list
- __dict__ = mappingproxy({'__module__': 'utopya.task', '__doc__': 'The TaskList stores Task objects in it, ensuring that none is in there\n twice and allows to lock it to prevent adding new tasks.\n ', '__init__': <function TaskList.__init__>, '__len__': <function TaskList.__len__>, '__contains__': <function TaskList.__contains__>, '__getitem__': <function TaskList.__getitem__>, '__iter__': <function TaskList.__iter__>, '__eq__': <function TaskList.__eq__>, 'lock': <function TaskList.lock>, 'append': <function TaskList.append>, '__add__': <function TaskList.__add__>, '__dict__': <attribute '__dict__' of 'TaskList' objects>, '__weakref__': <attribute '__weakref__' of 'TaskList' objects>, '__hash__': None, '__annotations__': {}})#
- __hash__ = None#
- __module__ = 'utopya.task'#
- __weakref__#
list of weak references to the object (if defined)
utopya.testtools module#
Tools that help testing models.
This mainly supplies the ModelTest class, which is a specialization of the
Model
for usage in tests.
- class utopya.testtools.ModelTest(model_name: str, *, test_file: Optional[str] = None, use_tmpdir: bool = True, **kwargs)[source]#
Bases:
utopya.model.Model
A class to use for testing Utopia models.
It attaches to a certain model and makes it easy to load config files with which test should be carried out.
Initialize the ModelTest class for the given model name.
This is basically like the base class __init__ just that it sets the default value of
use_tmpdir
to True and renames TODO- Parameters
model_name (str) – Name of the model to test
test_file (str, optional) – The file this ModelTest is used in. If given, will look for config files relative to the folder this file is located in.
use_tmpdir (bool, optional) – Whether to use a temporary directory to write data to. The default value can be set here; but the flag can be overwritten in the create_mv and create_run_load methods. For false, the regular model output directory is used.
- Raises
ValueError – If the directory extracted from test_file is invalid
- __init__(model_name: str, *, test_file: Optional[str] = None, use_tmpdir: bool = True, **kwargs)[source]#
Initialize the ModelTest class for the given model name.
This is basically like the base class __init__ just that it sets the default value of
use_tmpdir
to True and renames TODO- Parameters
model_name (str) – Name of the model to test
test_file (str, optional) – The file this ModelTest is used in. If given, will look for config files relative to the folder this file is located in.
use_tmpdir (bool, optional) – Whether to use a temporary directory to write data to. The default value can be set here; but the flag can be overwritten in the create_mv and create_run_load methods. For false, the regular model output directory is used.
- Raises
ValueError – If the directory extracted from test_file is invalid
- __module__ = 'utopya.testtools'#
utopya.tools module#
For functions that are not bound to classes, but generally useful.
- utopya.tools.recursive_update(d: dict, u: dict) dict [source]#
Update dict d with values from dict u.
NOTE: This method does _not_ copy mutable entries! If you want to assure that the contents of u cannot be changed by changing its counterparts in the updated d, you need to supply a deep copy of u to this method.
- Parameters
d (dict) – The dict to be updated
u (dict) – The dict used to update
- Returns
updated version of d
- Return type
dict
- utopya.tools.load_selected_keys(src: dict, *, add_to: dict, keys: Sequence[Tuple[str, type, bool]], err_msg_prefix: Optional[str] = None, prohibit_unexpected: bool = True) None [source]#
Loads (only) selected keys from dict
src
into dictadd_to
.- Parameters
src (dict) – The dict to load values from
add_to (dict) – The dict to load values into
keys (Sequence[Tuple[str, type, bool]]) – Which keys to load, given as sequence of (key, allowed types, [required=False]) tuples.
description (str) – A description string, used in error message
prohibit_unexpected (bool, optional) – Whether to raise on keys that were unexpected, i.e. not given in
keys
argument.
- Raises
KeyError – On missing key in
src
TypeError – On bad type of value in
src
ValueError – On unexpected keys in
src
- utopya.tools.add_item(value, *, add_to: dict, key_path: Sequence[str], value_func: Optional[Callable] = None, is_valid: Optional[Callable] = None, ErrorMsg: Optional[Callable] = None) None [source]#
Adds the given value to the
add_to
dict, traversing the given key path. This operation happens in-place.- Parameters
value – The value of what is to be stored. If this is a callable, the result of the call is stored.
add_to (dict) – The dict to add the entry to
key_path (Sequence[str]) – The path at which to add it
value_func (Callable, optional) – If given, calls it with value as argument and uses the return value to add to the dict
is_valid (Callable, optional) – Used to determine whether value is valid or not; should take single positional argument, return bool
ErrorMsg (Callable, optional) – A raisable object that prints an error message; gets passed value as positional argument.
- Raises
Exception – type depends on specified
ErrorMsg
callable
- utopya.tools.format_time(duration: Union[float, datetime.timedelta], *, ms_precision: int = 0, max_num_parts: Optional[int] = None) str [source]#
Given a duration (in seconds), formats it into a string.
The formatting divisors are: days, hours, minutes, seconds
If ms_precision > 0 and duration < 60, decimal places will be shown for the seconds.
- Parameters
duration (Union[float, timedelta]) – The duration in seconds to format into a duration string; it can also be a timedelta object.
ms_precision (int, optional) – The precision of the seconds slot
max_num_parts (int, optional) – How many parts to include when creating the formatted time string. For example, if the time consists of the parts seconds, minutes, and hours, and the argument is
2
, only the hours and minutes parts will be shown. If None, all parts are included.
- Returns
The formatted duration string
- Return type
str
- utopya.tools.fill_line(s: str, *, num_cols: int = 79, fill_char: str = ' ', align: str = 'left') str [source]#
Extends the given string such that it fills a whole line of num_cols columns.
- Parameters
s (str) – The string to extend to a whole line
num_cols (int, optional) – The number of colums of the line; defaults to the number of TTY columns or – if those are not available – 79
fill_char (str, optional) – The fill character
align (str, optional) – The alignment. Can be: ‘left’, ‘right’, ‘center’ or the one-letter equivalents.
- Returns
The string of length num_cols
- Return type
str
- Raises
ValueError – For invalid align or fill_char argument
- utopya.tools.center_in_line(s: str, *, num_cols: int = 79, fill_char: str = '·', spacing: int = 1) str [source]#
Shortcut for a common fill_line use case.
- Parameters
s (str) – The string to center in the line
num_cols (int, optional) – The number of columns in the line
fill_char (str, optional) – The fill character
spacing (int, optional) – The spacing around the string s
- Returns
The string centered in the line
- Return type
str
- utopya.tools.pprint(obj: Any, **kwargs)[source]#
Prints a “pretty” string representation of the given object.
- Parameters
obj (Any) – The object to print
**kwargs – Passed to print
- utopya.tools.pformat(obj) str [source]#
Creates a “pretty” string representation of the given object.
This is achieved by creating a yaml representation.
- utopya.tools.open_folder(path: str)[source]#
Opens the folder at the specified path.
Note
This refuses to open a file.
- Parameters
path (str) – The absolute path to the folder that is to be opened. The home directory
~
is expanded.
- utopya.tools.parse_si_multiplier(s: str) int [source]#
Parses a string like
1.23M
or-2.34 k
into an integer.If it is a string, parses the SI multiplier and returns the appropriate integer for use as number of simulation steps. Supported multipliers are
k
,M
,G
andT
. These need to be used as the suffix of the string.Note
This is only intended to be used with integer values and does not support float values like
1u
.The used regex can be found here.
- Parameters
s (str) – A string representing an integer number, potentially including a supported SI multiplier as suffix.
- Returns
- The parsed number of steps as integer. If the value has decimal
places, integer rounding is applied.
- Return type
int
- Raises
ValueError – Upon string that does not match the expected pattern
- utopya.tools.parse_num_steps(N: Union[str, int], *, raise_if_negative: bool = True) int [source]#
Given a string like
1.23M
or an integer, prepares the num_steps argument for a single universe simulation.For string arguments, uses
parse_si_multiplier()
for string parsing. If that fails, attempts to read it in float notation by callingint(float(N))
.Note
This function always applies integer rounding.
- Parameters
N (Union[str, int]) – The
num_steps
argument as a string or integer.raise_if_negative (bool, optional) – Whether to raise an error if the value is negative.
- Returns
The parsed value for
num_steps
- Return type
int
- Raises
ValueError – Result invalid, i.e. not parseable or of negative value.
utopya.workermanager module#
The WorkerManager class.
- class utopya.workermanager.WorkerManager(num_workers: typing.Union[int, str] = 'auto', poll_delay: float = 0.05, lines_per_poll: int = 20, periodic_task_callback: typing.Optional[int] = None, QueueCls=<class 'queue.Queue'>, reporter: typing.Optional[utopya.reporter.WorkerManagerReporter] = None, rf_spec: typing.Optional[typing.Dict[str, typing.Union[str, typing.List[str]]]] = None, save_streams_on: typing.Sequence[str] = (), nonzero_exit_handling: str = 'ignore', interrupt_params: typing.Optional[dict] = None, cluster_mode: bool = False, resolved_cluster_params: typing.Optional[dict] = None)[source]#
Bases:
object
The WorkerManager class orchestrates
WorkerTask
objects: setting them up, invoking them, tracking their progress, and starting new workers if previous workers finished.- pending_exceptions#
A (FiFo) queue of Exception objects that will be handled by the WorkerManager during working. This is the interface that allows for other threads that have access to the WorkerManager to add an exception and let it be handled in the main thread.
- Type
queue.Queue
- rf_spec#
The report format specifications that are used throughout the WorkerManager. These are invoked at different points of the operation of the WorkerManager:
while_working
,after_work
,after_abort
,task_spawn
,task_finished
.- Type
dict
- times#
Holds profiling information for the WorkerManager
- Type
dict
Initialize the worker manager.
- Parameters
num_workers (Union[int, str], optional) – The number of workers that can work in parallel. If ‘auto’ (default), uses os.cpu_count(). If below zero, deduces abs(num_workers) from the CPU count.
poll_delay (float, optional) – How long (in seconds) the delay between worker polls should be. For too small delays (<0.01), the CPU load will become significant.
lines_per_poll (int, optional) – How many lines to read from each stream during polling of the tasks. This value should not be too large, otherwise the polling is delayed by too much. By setting it to -1, all available lines are read.
periodic_task_callback (int, optional) – If given, an additional task callback will be invoked after every
periodic_task_callback
poll events.QueueCls (Class, optional) – Which class to use for the Queue. Defaults to FiFo.
reporter (WorkerManagerReporter, optional) – The reporter associated with this WorkerManager, reporting on the progress.
rf_spec (Dict[str, Union[str, List[str]]], optional) –
The names of report formats that should be invoked at different points of the WorkerManager’s operation. Possible keys:
before_working
,while_working
,after_work
,after_abort
,task_spawn
,task_finished
. All other keys are ignored.The values of the dict can be either strings or lists of strings, where the strings always refer to report formats registered with the WorkerManagerReporter. This argument updates the default report format specifications.
save_streams_on (Sequence[str], optional) – On which events to invoke
save_streams()
during work. Should be a sequence containing one or both of the keyson_monitor_update
,periodic_callback
.nonzero_exit_handling (str, optional) – How to react if a WorkerTask exits with a non-zero exit code. For ‘ignore’, nothing happens. For ‘warn’, a warning is printed and the last 5 lines of the log are shown. For ‘raise’, the last 20 lines of the log is shown, all other tasks are terminated, and the WorkerManager exits with the same exit code as the WorkerTask exited with. Note that ‘warn’ will not lead to any messages if the worker died by SIGTERM, which presumable originated from a fulfilled stop condition. Use ‘warn_all’ to also receive warnings in this case.
interrupt_params (dict, optional) –
Parameters that determine how the WorkerManager behaves when receiving KeyboardInterrupts during working. Possible keys:
send_signal
: Which signal to send to the workers. Canbe SIGINT (default), SIGTERM, SIGKILL, or any valid signal as integer.
grace_period
: how long to wait for the other workers togracefully shut down. After this period (in seconds), the workers will be killed via SIGKILL. Default is 5s.
exit
: whether to sys.exit at the end of start_working.Default is True.
cluster_mode (bool, optional) – Whether similar tasks to those that are managed by this WorkerManager are, at the same time, worked on by other WorkerManager. This is relevant because the output of files might be affected by whether another WorkerManager instance is currently working on the same output directory. Also, in the future, this argument might be used to communicate between nodes.
resolved_cluster_params (dict, optional) – The corresponding cluster parameters.
- Raises
ValueError – For too negative
num_workers
argument
- __init__(num_workers: typing.Union[int, str] = 'auto', poll_delay: float = 0.05, lines_per_poll: int = 20, periodic_task_callback: typing.Optional[int] = None, QueueCls=<class 'queue.Queue'>, reporter: typing.Optional[utopya.reporter.WorkerManagerReporter] = None, rf_spec: typing.Optional[typing.Dict[str, typing.Union[str, typing.List[str]]]] = None, save_streams_on: typing.Sequence[str] = (), nonzero_exit_handling: str = 'ignore', interrupt_params: typing.Optional[dict] = None, cluster_mode: bool = False, resolved_cluster_params: typing.Optional[dict] = None)[source]#
Initialize the worker manager.
- Parameters
num_workers (Union[int, str], optional) – The number of workers that can work in parallel. If ‘auto’ (default), uses os.cpu_count(). If below zero, deduces abs(num_workers) from the CPU count.
poll_delay (float, optional) – How long (in seconds) the delay between worker polls should be. For too small delays (<0.01), the CPU load will become significant.
lines_per_poll (int, optional) – How many lines to read from each stream during polling of the tasks. This value should not be too large, otherwise the polling is delayed by too much. By setting it to -1, all available lines are read.
periodic_task_callback (int, optional) – If given, an additional task callback will be invoked after every
periodic_task_callback
poll events.QueueCls (Class, optional) – Which class to use for the Queue. Defaults to FiFo.
reporter (WorkerManagerReporter, optional) – The reporter associated with this WorkerManager, reporting on the progress.
rf_spec (Dict[str, Union[str, List[str]]], optional) –
The names of report formats that should be invoked at different points of the WorkerManager’s operation. Possible keys:
before_working
,while_working
,after_work
,after_abort
,task_spawn
,task_finished
. All other keys are ignored.The values of the dict can be either strings or lists of strings, where the strings always refer to report formats registered with the WorkerManagerReporter. This argument updates the default report format specifications.
save_streams_on (Sequence[str], optional) – On which events to invoke
save_streams()
during work. Should be a sequence containing one or both of the keyson_monitor_update
,periodic_callback
.nonzero_exit_handling (str, optional) – How to react if a WorkerTask exits with a non-zero exit code. For ‘ignore’, nothing happens. For ‘warn’, a warning is printed and the last 5 lines of the log are shown. For ‘raise’, the last 20 lines of the log is shown, all other tasks are terminated, and the WorkerManager exits with the same exit code as the WorkerTask exited with. Note that ‘warn’ will not lead to any messages if the worker died by SIGTERM, which presumable originated from a fulfilled stop condition. Use ‘warn_all’ to also receive warnings in this case.
interrupt_params (dict, optional) –
Parameters that determine how the WorkerManager behaves when receiving KeyboardInterrupts during working. Possible keys:
send_signal
: Which signal to send to the workers. Canbe SIGINT (default), SIGTERM, SIGKILL, or any valid signal as integer.
grace_period
: how long to wait for the other workers togracefully shut down. After this period (in seconds), the workers will be killed via SIGKILL. Default is 5s.
exit
: whether to sys.exit at the end of start_working.Default is True.
cluster_mode (bool, optional) – Whether similar tasks to those that are managed by this WorkerManager are, at the same time, worked on by other WorkerManager. This is relevant because the output of files might be affected by whether another WorkerManager instance is currently working on the same output directory. Also, in the future, this argument might be used to communicate between nodes.
resolved_cluster_params (dict, optional) – The corresponding cluster parameters.
- Raises
ValueError – For too negative
num_workers
argument
- property tasks: utopya.task.TaskList#
The list of all tasks.
- property task_queue: queue.Queue#
The task queue.
- property task_count: int#
Returns the number of tasks that this manager ever took care of. Careful: This is NOT the current number of tasks in the queue!
- property num_workers: int#
The number of workers that may work in parallel
- property active_tasks: List[utopya.task.WorkerTask]#
The list of currently active tasks.
Note that this information might not be up-to-date; a process might quit just after the list has been updated.
- property num_finished_tasks: int#
The number of finished tasks. Incremented whenever a task leaves the active_tasks list, regardless of its exit status.
- property num_free_workers: int#
Returns the number of free workers.
- property poll_delay: float#
Returns the delay between two polls
- property stop_conditions: Set[utopya.stopcond.StopCondition]#
All stop conditions that were ever passed to
start_working()
during the life time of this WorkerManager.
- property nonzero_exit_handling: str#
Behavior upon a worker exiting with a non-zero exit code.
with
ignore
, nothing happenswith
warn
, a warning is printedwith
raise
, the log is shown and the WorkerManager exits with the same exit code as the correspondingWorkerTask
exited with.
- property reporter: Optional[utopya.reporter.WorkerManagerReporter]#
The associated
WorkerManagerReporter
or None, if no reporter is set.
- property cluster_mode: bool#
Returns whether the WorkerManager is in cluster mode
- property resolved_cluster_params: dict#
Returns a copy of the cluster configuration with all parameters resolved. This makes some additional keys available on the top level.
- add_task(*, TaskCls: type = <class 'utopya.task.WorkerTask'>, **task_kwargs) utopya.task.WorkerTask [source]#
Adds a task to the WorkerManager.
- Parameters
TaskCls (type, optional) – The WorkerTask-like type to use
**task_kwargs – All arguments needed for WorkerTask initialization. See
utopya.task.WorkerTask
for all valid arguments.
- Returns
The created WorkerTask object
- Return type
- start_working(*, detach: bool = False, timeout: Optional[float] = None, stop_conditions: Optional[Sequence[utopya.stopcond.StopCondition]] = None, post_poll_func: Optional[Callable] = None) None [source]#
Upon call, all enqueued tasks will be worked on sequentially.
- Parameters
detach (bool, optional) – If False (default), the WorkerManager will block here, as it continuously polls the workers and distributes tasks.
timeout (float, optional) – If given, the number of seconds this work session is allowed to take. Workers will be aborted if the number is exceeded. Note that this is not measured in CPU time, but the host systems wall time.
stop_conditions (Sequence[StopCondition], optional) – During the run these StopCondition objects will be checked
post_poll_func (Callable, optional) – If given, this is called after all workers have been polled. It can be used to perform custom actions during a the polling loop.
- Raises
NotImplementedError – for detach True
ValueError – For invalid (i.e., negative) timeout value
WorkerManagerTotalTimeout – Upon a total timeout
- __dict__ = mappingproxy({'__module__': 'utopya.workermanager', '__doc__': 'The WorkerManager class orchestrates :py:class:`~utopya.task.WorkerTask`\n objects: setting them up, invoking them, tracking their progress, and\n starting new workers if previous workers finished.\n\n Attributes:\n pending_exceptions (queue.Queue): A (FiFo) queue of Exception objects\n that will be handled by the WorkerManager during working. This is\n the interface that allows for other threads that have access to\n the WorkerManager to add an exception and let it be handled in the\n main thread.\n rf_spec (dict): The report format specifications that are used\n throughout the WorkerManager. These are invoked at different points\n of the operation of the WorkerManager: ``while_working``,\n ``after_work``, ``after_abort``, ``task_spawn``, ``task_finished``.\n times (dict): Holds profiling information for the WorkerManager\n ', '__init__': <function WorkerManager.__init__>, 'tasks': <property object>, 'task_queue': <property object>, 'task_count': <property object>, 'num_workers': <property object>, 'active_tasks': <property object>, 'num_finished_tasks': <property object>, 'num_free_workers': <property object>, 'poll_delay': <property object>, 'stop_conditions': <property object>, 'nonzero_exit_handling': <property object>, 'reporter': <property object>, 'cluster_mode': <property object>, 'resolved_cluster_params': <property object>, 'add_task': <function WorkerManager.add_task>, 'start_working': <function WorkerManager.start_working>, '_invoke_report': <function WorkerManager._invoke_report>, '_grab_task': <function WorkerManager._grab_task>, '_poll_workers': <function WorkerManager._poll_workers>, '_check_stop_conds': <function WorkerManager._check_stop_conds>, '_invoke_periodic_callbacks': <function WorkerManager._invoke_periodic_callbacks>, '_signal_workers': <function WorkerManager._signal_workers>, '_handle_pending_exceptions': <function WorkerManager._handle_pending_exceptions>, '__dict__': <attribute '__dict__' of 'WorkerManager' objects>, '__weakref__': <attribute '__weakref__' of 'WorkerManager' objects>, '__annotations__': {}})#
- __module__ = 'utopya.workermanager'#
- __weakref__#
list of weak references to the object (if defined)
- exception utopya.workermanager.WorkerManagerError[source]#
Bases:
BaseException
The base exception class for WorkerManager errors
- __module__ = 'utopya.workermanager'#
- __weakref__#
list of weak references to the object (if defined)
- exception utopya.workermanager.WorkerManagerTotalTimeout[source]#
Bases:
utopya.workermanager.WorkerManagerError
Raised when a total timeout occurred
- __module__ = 'utopya.workermanager'#
- exception utopya.workermanager.WorkerTaskError[source]#
Bases:
utopya.workermanager.WorkerManagerError
Raised when there was an error in a WorkerTask
- __module__ = 'utopya.workermanager'#
- exception utopya.workermanager.WorkerTaskNonZeroExit(task: utopya.task.WorkerTask, *args, **kwargs)[source]#
Bases:
utopya.workermanager.WorkerTaskError
Can be raised when a WorkerTask exited with a non-zero exit code.
- __module__ = 'utopya.workermanager'#
- __init__(task: utopya.task.WorkerTask, *args, **kwargs)[source]#
- exception utopya.workermanager.WorkerTaskStopConditionFulfilled(task: utopya.task.WorkerTask, *args, **kwargs)[source]#
Bases:
utopya.workermanager.WorkerTaskNonZeroExit
An exception that is raised when a worker-specific stop condition was fulfilled. This allows being handled separately to other non-zero exits.
- __module__ = 'utopya.workermanager'#
utopya.yaml module#
Takes care of the YAML setup for Utopya