utopya package

The utopya package implements the frontend of Utopia

Submodules

utopya._cluster module

This module holds functions used in the Multiverse cluster mode

utopya._cluster.parse_node_list(node_list_str: str, *, mode: str, rcps: dict) List[str][source]

Parses the node list to a list of node names and checks against the given resolved cluster parameters.

Depending on mode, different forms of the node list are parsable. For condensed mode:

node042
node[002,004-011,016]
m05s[0204,0402,0504]
m05s[0204,0402,0504],m08s[0504,0604,0701],m13s0603,m14s[0501-0502]

utopya._path_setup module

Helper module that can manipulate system path, e.g. to make additional utopya-related modules available.

class utopya._path_setup.temporary_sys_path(path: str)[source]

Bases: object

A sys.path context manager, temporarily adding a path and removing it again upon exiting. If the given path already exists in the sys.path, it is neither added nor removed and the sys.path remains unchanged.

__init__(path: str)[source]
__enter__()[source]
__exit__(*_)[source]
__dict__ = mappingproxy({'__module__': 'utopya._path_setup', '__doc__': 'A sys.path context manager, temporarily adding a path and removing it\n    again upon exiting. If the given path already exists in the sys.path, it\n    is neither added nor removed and the sys.path remains unchanged.\n    ', '__init__': <function temporary_sys_path.__init__>, '__enter__': <function temporary_sys_path.__enter__>, '__exit__': <function temporary_sys_path.__exit__>, '__dict__': <attribute '__dict__' of 'temporary_sys_path' objects>, '__weakref__': <attribute '__weakref__' of 'temporary_sys_path' objects>, '__annotations__': {}})
__module__ = 'utopya._path_setup'
__weakref__

list of weak references to the object (if defined)

class utopya._path_setup.temporary_sys_modules[source]

Bases: object

A context manager for the sys.modules cache, ensuring that it is in the same state after exiting as it was before entering the context manager.

__init__()[source]
__enter__()[source]
__exit__(*_)[source]
__dict__ = mappingproxy({'__module__': 'utopya._path_setup', '__doc__': 'A context manager for the sys.modules cache, ensuring that it is in the\n    same state after exiting as it was before entering the context manager.\n    ', '__init__': <function temporary_sys_modules.__init__>, '__enter__': <function temporary_sys_modules.__enter__>, '__exit__': <function temporary_sys_modules.__exit__>, '__dict__': <attribute '__dict__' of 'temporary_sys_modules' objects>, '__weakref__': <attribute '__weakref__' of 'temporary_sys_modules' objects>, '__annotations__': {}})
__module__ = 'utopya._path_setup'
__weakref__

list of weak references to the object (if defined)

utopya._path_setup.add_modules_to_path(*modules, cfg_name: str = 'external_module_paths', ignore_missing: bool = False)[source]

Reads the modules configuration file from the configuration directory and adds the paths stores under the keys given in modules to sys.path

utopya._yaml module

Supplies basic YAML interface, inherited from dantro and paramspace

utopya.batch module

Implements batch running and evaluation of simulations

class utopya.batch.BatchTaskManager(*, batch_cfg_path: Optional[str] = None, **update_batch_cfg)[source]

Bases: object

A manager for batch tasks

Sets up a BatchTaskManager.

Parameters
  • batch_cfg_path (str, optional) – The batch file with all the task definitions.

  • **update_batch_cfg – Additional arguments that are used to update the batch configuration.

Raises

NotImplementedError – If run_tasks or cluster_mode were set in the batch configuration.

RUN_DIR_TIME_FSTR = '%y%m%d-%H%M%S'
__init__(*, batch_cfg_path: Optional[str] = None, **update_batch_cfg)[source]

Sets up a BatchTaskManager.

Parameters
  • batch_cfg_path (str, optional) – The batch file with all the task definitions.

  • **update_batch_cfg – Additional arguments that are used to update the batch configuration.

Raises

NotImplementedError – If run_tasks or cluster_mode were set in the batch configuration.

__str__() str[source]

Return str(self).

property debug: bool

Whether debug mode was enabled.

property parallelization_level: str
property run_defaults: dict

A deepcopy of the run task defaults

property eval_defaults: dict

A deepcopy of the eval task defaults

property dirs: dict

The directories associated with this BatchTaskManager

perform_tasks()[source]

Perform all run and eval tasks.

__dict__ = mappingproxy({'__module__': 'utopya.batch', '__doc__': 'A manager for batch tasks', 'RUN_DIR_TIME_FSTR': '%y%m%d-%H%M%S', '__init__': <function BatchTaskManager.__init__>, '__str__': <function BatchTaskManager.__str__>, 'debug': <property object>, 'parallelization_level': <property object>, 'run_defaults': <property object>, 'eval_defaults': <property object>, 'dirs': <property object>, 'perform_tasks': <function BatchTaskManager.perform_tasks>, '_setup_batch_cfg': <staticmethod object>, '_setup_dirs': <function BatchTaskManager._setup_dirs>, '_perform_backup': <function BatchTaskManager._perform_backup>, '_add_tasks': <function BatchTaskManager._add_tasks>, '_add_run_task': <function BatchTaskManager._add_run_task>, '_add_eval_task': <function BatchTaskManager._add_eval_task>, '__dict__': <attribute '__dict__' of 'BatchTaskManager' objects>, '__weakref__': <attribute '__weakref__' of 'BatchTaskManager' objects>, '__annotations__': {}})
__module__ = 'utopya.batch'
__weakref__

list of weak references to the object (if defined)

utopya.cfg module

Module that coordinates the Utopia Configuration Directory

utopya.cfg.get_cfg_path(cfg_name: str) str[source]

Returns the absolute path to the specified configuration file

utopya.cfg.load_from_cfg_dir(cfg_name: str) dict[source]

Load a configuration file; returns empty dict if no file exists.

Parameters

cfg_name (str) – The name of the configuration to read

Returns

The configuration as read from the config directory; if no file

is available, will return an empty dict.

Return type

dict

utopya.cfg.write_to_cfg_dir(cfg_name: str, obj: dict)[source]

Writes a YAML represetation of the given object to the configuration directory. Always overwrites a possibly existing file.

Parameters
  • cfg_name (str) – The configuration name

  • obj (dict) – The yaml-representable object that is to be written; usually a dict.

utopya.cltools module

Methods needed to implement the utopia command line interface

class utopya.cltools.ANSIesc[source]

Bases: object

Some selected ANSI escape codes; usable in format strings

RESET = '\x1b[0m'
BOLD = '\x1b[1m'
DIM = '\x1b[2m'
UNDERLINE = '\x1b[4m'
BLACK = '\x1b[30m'
RED = '\x1b[31m'
GREEN = '\x1b[32m'
YELLOW = '\x1b[33m'
BLUE = '\x1b[34m'
MAGENTA = '\x1b[35m'
CYAN = '\x1b[36m'
WHITE = '\x1b[37m'
__dict__ = mappingproxy({'__module__': 'utopya.cltools', '__doc__': 'Some selected ANSI escape codes; usable in format strings', 'RESET': '\x1b[0m', 'BOLD': '\x1b[1m', 'DIM': '\x1b[2m', 'UNDERLINE': '\x1b[4m', 'BLACK': '\x1b[30m', 'RED': '\x1b[31m', 'GREEN': '\x1b[32m', 'YELLOW': '\x1b[33m', 'BLUE': '\x1b[34m', 'MAGENTA': '\x1b[35m', 'CYAN': '\x1b[36m', 'WHITE': '\x1b[37m', '__dict__': <attribute '__dict__' of 'ANSIesc' objects>, '__weakref__': <attribute '__weakref__' of 'ANSIesc' objects>, '__annotations__': {}})
__module__ = 'utopya.cltools'
__weakref__

list of weak references to the object (if defined)

utopya.cltools.add_from_kv_pairs(*pairs, add_to: dict, attempt_conversion: bool = True, allow_eval: bool = False, allow_deletion: bool = True) None[source]

Parses the key=value pairs and adds them to the given dict.

Note that this happens directly on the object, i.e. making use of the mutability of the given dict. This function has no return value!

Parameters
  • *pairs – Sequence of key=value strings

  • add_to (dict) – The dict to add the pairs to

  • attempt_conversion (bool, optional) – Whether to attempt converting the strings to bool, float, int types

  • allow_eval (bool, optional) – Whether to try calling eval() on the value strings during conversion

  • allow_deletion (bool, optional) – If set, can pass DELETE string to a key to remove the corresponding entry.

utopya.cltools.register_models(args, *, registry)[source]

Handles registration of multiple models given argparse args

utopya.cltools.register_project(args: list, *, arg_prefix: str = '') dict[source]

Register or update information of an Utopia project, i.e. a repository that implements models.

Parameters
  • args (list) – The CLI arguments object

  • arg_prefix (str, optional) – The prefix to use when using attribute access to these arguments. Useful if the names as defined in the CLI are different depending on the invocation

Returns

Information on the newly added or updated project

Return type

dict

utopya.cltools.deploy_user_cfg(user_cfg_path: str = '/root/.config/utopia/user_cfg.yml') None[source]

Deploys a copy of the full config to the specified location (usually the user config search path of the Multiverse class)

Instead of just copying the full config, it is written line by line, commenting out lines that are not already commented out, and changing the header.

Parameters

user_cfg_path (str, optional) – The path the file is expected at. Is an argument in order to make testing easier.

Returns

None

utopya.cltools.copy_model_files(*, model_name: str, new_name: Optional[str] = None, target_project: Optional[str] = None, add_to_cmakelists: bool = True, skip_exts: Optional[Sequence[str]] = None, use_prompts: bool = True, dry_run: bool = False) None[source]

A helper function to conveniently copy model-related files, rename them, and adjust their content to the new name as well.

Parameters
  • model_name (str) – The name of the model to copy

  • new_name (str, optional) – The new name of the model. This may not conflict with any already existing model name in the model registry.

  • target_project (str, optional) – The name of the project to copy the model to. It needs to be a registered Utopia project.

  • add_to_cmakelists (bool, optional) – Whether to add the new model to the corresponding CMakeLists.txt file.

  • use_prompts (bool, optional) – Whether to interactively prompt for confirmation or missing arguments.

  • dry_run (bool, optional) – If given, no write or copy operations will be carried out.

Raises

ValueError – Upon bad arguments

Returns

None

utopya.cltools.prompt_for_new_plot_args(*, old_argv: List[str], old_args: argparse.Namespace, parser: argparse.ArgumentParser) Tuple[dict, argparse.Namespace][source]

Given some old arguments, prompts for new ones and returns a new list of argument values and the parsed argparse namespace result.

Parameters
  • old_argv (List[str]) – The old argument value list

  • old_args (argparse.Namespace) – The old set of parsed arguments

  • parser (argparse.ArgumentParser) – The parser to use for evaluating the newly specified argument value list

Returns

The new argument values list and the

parsed argument namespace.

Return type

Tuple[dict, argparse.Namespace]

Raises

ValueError – Upon error in parsing the new arguments.

utopya.datacontainer module

Implements data container classes specialised on Utopia output data.

It is based on the dantro.DataContainer classes, especially its numeric form, the NumpyDataContainer.

class utopya.datacontainer.NumpyDC(*, name: str, data: numpy.ndarray, **dc_kwargs)[source]

Bases: dantro.mixins.proxy_support.Hdf5ProxySupportMixin, dantro.containers.numeric.NumpyDataContainer

This is the base class for numpy data containers used in Utopia.

It is based on the NumpyDataContainer provided by dantro and extends it with the Hdf5ProxySupportMixin, allowing to load the data from the Hdf5 file only once it becomes necessary.

Initialize a NumpyDataContainer, storing data that is ndarray-like.

Parameters
  • name (str) – The name of this container

  • data (np.ndarray) – The numpy data to store

  • **dc_kwargs – Additional arguments for container initialization, passed on to parent method

__abstractmethods__ = frozenset({})
__module__ = 'utopya.datacontainer'
class utopya.datacontainer.XarrayDC(*, name: str, data: Union[numpy.ndarray, xarray.core.dataarray.DataArray], dims: Optional[Sequence[str]] = None, coords: Optional[dict] = None, extract_metadata: bool = True, apply_metadata: bool = True, **dc_kwargs)[source]

Bases: dantro.mixins.proxy_support.Hdf5ProxySupportMixin, dantro.containers.xrdatactr.XrDataContainer

This is the base class for xarray data containers used in Utopia.

It is based on the XrDataContainer provided by dantro. As of now, it has no proxy support, but will gain it once available on dantro side.

Initialize a XrDataContainer and extract dimension and coordinate labels.

Parameters
  • name (str) – which name to give to the XrDataContainer

  • data (Union[np.ndarray, xr.DataArray]) – The data to store; anything that an xr.DataArray can take

  • dims (Sequence[str], optional) – The dimension names.

  • coords (dict, optional) – The coordinates. The keys of this dict have to correspond to the dimension names.

  • extract_metadata (bool, optional) – If True, missing dims or coords arguments are tried to be populated from the container attributes.

  • apply_metadata (bool, optional) – Whether to apply the extracted or passed dims and coords to the underlying data. This might not be desired in cases where the given data already is a labelled xr.DataArray or where the data is a proxy and the labelling should be postponed.

  • **dc_kwargs – passed to parent

PROXY_RESOLVE_ASTYPE = None
PROXY_RETAIN = True
PROXY_REINSTATE_FAIL_ACTION = 'log_warning'
__abstractmethods__ = frozenset({})
__module__ = 'utopya.datacontainer'
class utopya.datacontainer.XarrayYamlDC(*, name: str, data: Union[numpy.ndarray, xarray.core.dataarray.DataArray], dims: Optional[Sequence[str]] = None, coords: Optional[dict] = None, extract_metadata: bool = True, apply_metadata: bool = True, **dc_kwargs)[source]

Bases: utopya.datacontainer.XarrayDC

An XarrayDC specialization that assumes that each array entry is a YAML string, which is subsequently loaded. This can be done alongside the metadata application of the XarrayDC.

Initialize a XrDataContainer and extract dimension and coordinate labels.

Parameters
  • name (str) – which name to give to the XrDataContainer

  • data (Union[np.ndarray, xr.DataArray]) – The data to store; anything that an xr.DataArray can take

  • dims (Sequence[str], optional) – The dimension names.

  • coords (dict, optional) – The coordinates. The keys of this dict have to correspond to the dimension names.

  • extract_metadata (bool, optional) – If True, missing dims or coords arguments are tried to be populated from the container attributes.

  • apply_metadata (bool, optional) – Whether to apply the extracted or passed dims and coords to the underlying data. This might not be desired in cases where the given data already is a labelled xr.DataArray or where the data is a proxy and the labelling should be postponed.

  • **dc_kwargs – passed to parent

__abstractmethods__ = frozenset({})
__module__ = 'utopya.datacontainer'
class utopya.datacontainer.GridDC(*, name: str, data: Union[numpy.ndarray, xarray.core.dataarray.DataArray], **dc_kwargs)[source]

Bases: utopya.datacontainer.XarrayDC

This is the base class for all grid data used in Utopia.

It is based on the XarrayDC and reshapes the data to the grid shape. The last dimension is assumed to be the dimension that goes along the grid cell IDs.

Initialize a GridDC which represents grid-like data.

Given the container attribute for _GDC_grid_shape_attr, this container takes care to reshape the underlying data such that it represents that grid, even if it is saved in another shape.

Parameters
  • name (str) – The name of the data container

  • data (np.ndarray) – The not yet reshaped data. If this is 1D, it is assumed that there is no time dimension. If it is 2D, it is assumed to be (time, cell ids).

  • **kwargs – Further initialization kwargs, e.g. attrs

__init__(*, name: str, data: Union[numpy.ndarray, xarray.core.dataarray.DataArray], **dc_kwargs)[source]

Initialize a GridDC which represents grid-like data.

Given the container attribute for _GDC_grid_shape_attr, this container takes care to reshape the underlying data such that it represents that grid, even if it is saved in another shape.

Parameters
  • name (str) – The name of the data container

  • data (np.ndarray) – The not yet reshaped data. If this is 1D, it is assumed that there is no time dimension. If it is 2D, it is assumed to be (time, cell ids).

  • **kwargs – Further initialization kwargs, e.g. attrs

property grid_shape: tuple

The shape of the grid

property space_extent: tuple

The space’s extent this grid is representing, read from attrs

property shape: tuple

Returns shape, proxy-aware

This is an overload of the property in Hdf5ProxySupportMixin which takes care that not the actual underlying proxy data shape is returned but whatever the container’s shape is to be after reshaping.

property ndim: int

Returns ndim, proxy-aware

This is an overload of the property in Hdf5ProxySupportMixin which takes care that not the actual underlying proxy data ndim is returned but whatever the container’s ndim is to be after reshaping.

__abstractmethods__ = frozenset({})
__module__ = 'utopya.datacontainer'

utopya.datagroup module

Implements data group classes specific to the Utopia output data structure.

They are based on dantro BaseDataGroup-derived implementations. In this module, they are imported and configured to suit the needs of the Utopia data structes.

class utopya.datagroup.UniverseGroup(*, name: str, containers: Optional[list] = None, attrs=None)[source]

Bases: dantro.groups.pspgrp.ParamSpaceStateGroup

This group represents the data of a single universe

Initialize a BaseDataGroup, which can store other containers and attributes.

Parameters
  • name (str) – The name of this data container

  • containers (list, optional) – The containers that are to be stored as members of this group. If given, these are added one by one using the .add method.

  • attrs (None, optional) – A mapping that is stored as attributes

__abstractmethods__ = frozenset({})
__module__ = 'utopya.datagroup'
class utopya.datagroup.MultiverseGroup(*, name: str, pspace: Optional[paramspace.paramspace.ParamSpace] = None, containers: Optional[list] = None, **kwargs)[source]

Bases: dantro.groups.pspgrp.ParamSpaceGroup

This group is meant to manage the uni group of the loaded data, i.e. the group where output of all universe groups is stored in.

Its main aim is to provide easy access to universes. By default, universes are only identified by their ID, which is a zero-padded _string_. This group adds the ability to access them via integer indices.

Furthermore, via dantro, an easy data selector method is available, see dantro.groups.ParamSpaceGroup.select.

Initialize a OrderedDataGroup from the list of given containers.

Parameters
  • name (str) – The name of this group.

  • pspace (ParamSpace, optional) – Can already pass a ParamSpace object here.

  • containers (list, optional) – A list of containers to add, which need to be ParamSpaceStateGroup objects.

  • **kwargs – Further initialisation kwargs, e.g. attrs

__abstractmethods__ = frozenset({})
__module__ = 'utopya.datagroup'
class utopya.datagroup.TimeSeriesGroup(*args, dims: Optional[Tuple[str]] = None, mode: Optional[str] = None, allow_deep_selection: Optional[bool] = None, **kwargs)[source]

Bases: dantro.groups.time_series.TimeSeriesGroup

This group is meant to manage time series data, with the container names being interpreted as the time coordinate.

Initialize a LabelledDataGroup

Parameters
  • *args – Passed on to OrderedDataGroup

  • dims (TDims, optional) – The dimensions associated with this group. If not given, will use those defined in the LDG_DIMS class variable. These can not be changed afterwards!

  • mode (str, optional) – By which coordinate extraction mode to get the coordinates from the group members. Can be attrs, name, data or anything else specified in extract_coords().

  • allow_deep_selection (bool, optional) – Whether to allow deep selection. If not given, will use the LDG_ALLOW_DEEP_SELECTION class variable’s value. Behaviour can be changed via the property of the same name.

  • **kwargs – Passed on to OrderedDataGroup

__abstractmethods__ = frozenset({})
__module__ = 'utopya.datagroup'
class utopya.datagroup.HeterogeneousTimeSeriesGroup(*args, dims: Optional[Tuple[str]] = None, mode: Optional[str] = None, allow_deep_selection: Optional[bool] = None, **kwargs)[source]

Bases: dantro.groups.time_series.HeterogeneousTimeSeriesGroup

This group is meant to manage time series data, with the container names being interpreted as the time coordinate.

Initialize a LabelledDataGroup

Parameters
  • *args – Passed on to OrderedDataGroup

  • dims (TDims, optional) – The dimensions associated with this group. If not given, will use those defined in the LDG_DIMS class variable. These can not be changed afterwards!

  • mode (str, optional) – By which coordinate extraction mode to get the coordinates from the group members. Can be attrs, name, data or anything else specified in extract_coords().

  • allow_deep_selection (bool, optional) – Whether to allow deep selection. If not given, will use the LDG_ALLOW_DEEP_SELECTION class variable’s value. Behaviour can be changed via the property of the same name.

  • **kwargs – Passed on to OrderedDataGroup

__abstractmethods__ = frozenset({})
__module__ = 'utopya.datagroup'
class utopya.datagroup.GraphGroup(*args, **kwargs)[source]

Bases: dantro.groups.graph.GraphGroup

This group is meant to manage graph data and create a NetworkX graph from it.

Initialize a GraphGroup.

Parameters
  • *args – passed to dantro.base.BaseDataGroup.__init__()

  • **kwargs – passed to dantro.base.BaseDataGroup.__init__()

__abstractmethods__ = frozenset({})
__module__ = 'utopya.datagroup'

utopya.datamanager module

Implements a class that manages data written out by Utopia models.

It is based on the dantro.DataManager class and the containers specialised for Utopia data.

class utopya.datamanager.DataManager(data_dir: str, *, name: Optional[str] = None, load_cfg: Optional[Union[dict, str]] = None, out_dir: Union[str, bool] = '_output/{timestamp:}', out_dir_kwargs: Optional[dict] = None, create_groups: Optional[List[Union[str, dict]]] = None, condensed_tree_params: Optional[dict] = None, default_tree_cache_path: Optional[str] = None)[source]

Bases: dantro.data_loaders.AllAvailableLoadersMixin, dantro.data_mngr.DataManager

This class manages the data that is written out by Utopia simulations.

It is based on the dantro.DataManager class and adds the functionality for specific loader functions that are needed in Utopia: Hdf5 and Yaml.

Furthermore, to enable file caching via the DAG framework, all available data loaders are included here.

Initializes a DataManager for the specified data directory.

Parameters
  • data_dir (str) – the directory the data can be found in. If this is a relative path, it is considered relative to the current working directory.

  • name (str, optional) – which name to give to the DataManager. If no name is given, the data directories basename will be used

  • load_cfg (Union[dict, str], optional) – The base configuration used for loading data. If a string is given, assumes it to be the path to a YAML file and loads it using the load_yml() function. If None is given, it can still be supplied to the load() method later on.

  • out_dir (Union[str, bool], optional) – where output is written to. If this is given as a relative path, it is considered relative to the data_dir. A formatting operation with the keys timestamp and name is performed on this, where the latter is the name of the data manager. If set to False, no output directory is created.

  • out_dir_kwargs (dict, optional) – Additional arguments that affect how the output directory is created.

  • create_groups (List[Union[str, dict]], optional) – If given, these groups will be created after initialization. If the list entries are strings, the default group class will be used; if they are dicts, the name key specifies the name of the group and the Cls key specifies the type. If a string is given instead of a type, the lookup happens from the _DATA_GROUP_CLASSES variable.

  • condensed_tree_params (dict, optional) – If given, will set the parameters used for the condensed tree representation. Available options: max_level and condense_thresh, where the latter may be a callable. See dantro.base.BaseDataGroup._tree_repr() for more information.

  • default_tree_cache_path (str, optional) – The path to the default tree cache file. If not given, uses the value from the class variable _DEFAULT_TREE_CACHE_PATH. Whichever value was chosen is then prepared using the _parse_file_path() method, which regards relative paths as being relative to the associated data directory.

__abstractmethods__ = frozenset({})
__module__ = 'utopya.datamanager'

utopya.dataprocessing module

This module holds data processing functionality that is used in the plot functions.

utopya.dataprocessing.create_mask(data: xarray.core.dataarray.DataArray, *, operator_name: str, rhs_value: float) xarray.core.dataarray.DataArray[source]

Given the data, returns a binary mask by applying the following comparison: data <operator> rhs value.

Parameters
  • data (xr.DataArray) – The data to apply the comparison to. This is the lhs of the comparison.

  • operator_name (str) – The name of the binary operator function as registered in utopya.tools.OPERATOR_MAP

  • rhs_value (float) – The right-hand-side value

Raises

KeyError – On invalid operator name

No Longer Returned:

xr.DataArray: Boolean mask

utopya.dataprocessing.where(data: xarray.core.dataarray.DataArray, *, operator_name: str, rhs_value: float) xarray.core.dataarray.DataArray[source]

Filter elements from the given data according to a condition. Only those elemens where the condition is fulfilled are not masked.

NOTE This leads to a dtype change to float.

utopya.dataprocessing.count_unique(data, **kwargs)[source]

Applies np.unique to the given data

utopya.dataprocessing.show_data(data, *_)[source]

Show the data and (if given) additional properties

utopya.dataprocessing.transform(data: xarray.core.dataarray.DataArray, *operations: Union[dict, str], aux_data: Optional[Union[xarray.core.dataset.Dataset, dantro.base.BaseDataGroup]] = None, log_level: Optional[int] = None) xarray.core.dataarray.DataArray[source]

Applies transformations to the given data, e.g.: reducing dimensionality or calculating

Parameters
  • data (xr.DataArray) – The data that is to be reduced in dimensionality

  • *operations (Union[dict, str]) – Which operations to apply and with which parameters. These should be operation names or dicts. For dicts, they should only have a single key, which is the name of the operation to perform. The available operations are defined in the TRANSFORMATIONS dict.

  • aux_data (Union[xr.Dataset, dantro.BaseDataGroup], optional) – The auxiliary data for binary operations. NOTE Needed in those cases.

  • log_level (int, optional) – Which level to log the progress of the operations on. If not given, will be 10.

Returns

A new object with dimensionality-reduced data.

Return type

xr.DataArray

Raises
  • TypeError – On bad operations specification

  • ValueError – On bad operation name

utopya.dataprocessing.find_endpoint(data: xarray.core.dataarray.DataArray, *, time: int = - 1, **kwargs) Tuple[bool, xarray.core.dataarray.DataArray][source]

Find the endpoint of a dataset wrt. time coordinate.

This function is compatible with the utopya.plot_funcs.attractor.bifurcation_diagram().

Parameters
  • data (xr.DataArray) – The data

  • time (int, optional) – The time index to select

  • **kwargs – Passed on to data.isel call

Returns

(endpoint found, endpoint)

Return type

Tuple[bool, xr.DataArray]

utopya.dataprocessing.find_fixpoint(data: xarray.core.dataset.Dataset, *, spin_up_time: int = 0, abs_std: Optional[float] = None, rel_std: Optional[float] = None, mean_kwargs=None, std_kwargs=None, isclose_kwargs=None, squeeze: bool = True) Tuple[bool, float][source]

Find the fixpoint(s) of a dataset and confirm it by standard deviation. For dimensions that are not ‘time’ the fixpoints are compared and duplicates removed.

This function is compatible with the utopya.plot_funcs.attractor.bifurcation_diagram().

Parameters
  • data (xr.Dataset) – The data

  • spin_up_time (int, optional) – The first timestep included

  • abs_std (float, optional) – The maximal allowed absolute standard deviation

  • rel_std (float, optional) – The maximal allowed relative standard deviation

  • mean_kwargs (dict, optional) – Additional keyword arguments passed on to the appropriate array function for calculating mean on data.

  • std_kwargs (dict, optional) – Additional keyword arguments passed on to the appropriate array function for calculating std on data.

  • isclose_kwargs (dict, optional) – Additional keyword arguments passed on to the appropriate array function for calculating np.isclose for fixpoint-duplicates across dimensions other than ‘time’.

  • squeeze (bool, optional) – Use the data.squeeze method to remove dimensions of length one. Default is True.

Returns

(fixpoint found, mean)

Return type

tuple

utopya.dataprocessing.find_multistability(*args, **kwargs) Tuple[bool, float][source]

Find the multistabilities of a dataset.

Performs find_fixpoint. Method conclusive if find_fixpoint conclusive with multiple entries.

Parameters
Returns

Tuple[bool, float]: (multistability found, mean)

utopya.dataprocessing.find_oscillation(data: xarray.core.dataset.Dataset, *, spin_up_time: int = 0, squeeze: bool = True, **find_peak_kwargs) Tuple[bool, list][source]

Find oscillations in a dataset.

This function is compatible with the utopya.plot_funcs.attractor.bifurcation_diagram().

Parameters
  • data (xr.Dataset) – The data

  • spin_up_time (int, optional) – The first timestep included

  • squeeze (bool, optional) – Use the data.squeeze method to remove dimensions of length one. Default is True.

  • **find_peak_kwargs – Passed on to scipy.signal.find_peaks. Default for kwarg height is set to -1.e+15.

Returns

(oscillation found, [min, max])

Return type

Tuple[bool, list]

utopya.model module

Provides the Model class to work interactively with Utopia models

class utopya.model.Model(*, name: Optional[str] = None, info_bundle: Optional[utopya.model_registry.info_bundle.ModelInfoBundle] = None, base_dir: Optional[str] = None, sim_errors: Optional[str] = None, use_tmpdir: bool = False)[source]

Bases: object

A class to work with Utopia models interactively.

It attaches to a certain model and makes it easy to load config files, create a Multiverse from them, run it, and work with it further…

Initialize the ModelTest for the given model name

Parameters
  • name (str, optional) – Name of the model to attach to. If not given, need to pass info_bundle.

  • info_bundle (ModelInfoBundle, optional) – The required information to work with this model. If not given, will attempt to find the model in the model registry via name.

  • base_dir (str, optional) – For convenience, can specify this path which will be seen as the base path for config files; if set, arguments that allow specifying configuration files can specify them relative to this directory.

  • sim_errors (str, optional) – Whether to raise errors from Multiverse

  • use_tmpdir (bool, optional) – Whether to use a temporary directory to write data to. The default value can be set here; but the flag can be overwritten in the create_mv and create_run_load methods. For false, the regular model output directory is used.

Raises

ValueError – Upon bad base_dir

__init__(*, name: Optional[str] = None, info_bundle: Optional[utopya.model_registry.info_bundle.ModelInfoBundle] = None, base_dir: Optional[str] = None, sim_errors: Optional[str] = None, use_tmpdir: bool = False)[source]

Initialize the ModelTest for the given model name

Parameters
  • name (str, optional) – Name of the model to attach to. If not given, need to pass info_bundle.

  • info_bundle (ModelInfoBundle, optional) – The required information to work with this model. If not given, will attempt to find the model in the model registry via name.

  • base_dir (str, optional) – For convenience, can specify this path which will be seen as the base path for config files; if set, arguments that allow specifying configuration files can specify them relative to this directory.

  • sim_errors (str, optional) – Whether to raise errors from Multiverse

  • use_tmpdir (bool, optional) – Whether to use a temporary directory to write data to. The default value can be set here; but the flag can be overwritten in the create_mv and create_run_load methods. For false, the regular model output directory is used.

Raises

ValueError – Upon bad base_dir

__str__() str[source]

Returns an informative string for this Model instance

property info_bundle: utopya.model_registry.info_bundle.ModelInfoBundle

The model info bundle

property name: str

The name of this Model object, which is at the same time the name of the attached model.

property base_dir: str

Returns the path to the base directory, if set during init.

This is the path to a directory from which config files can be loaded using relative paths.

property default_model_cfg: dict

Returns the default model configuration by loading it from the file specified in the info bundle.

property default_config_set_search_dirs: List[str]

Returns the default config set search directories for this model in the order of precedence.

Note

These may be relative paths.

property default_config_sets: Dict[str, dict]

Config sets at the default search locations.

To retrieve an individual config set, consider using get_config_set() instead of this property.

For more information, see Configuration sets.

create_mv(*, from_cfg: Optional[str] = None, run_cfg_path: Optional[str] = None, use_tmpdir: Optional[bool] = None, **update_meta_cfg) utopya.multiverse.Multiverse[source]

Creates a utopya.multiverse.Multiverse for this model, optionally loading a configuration from a file and updating it with further keys.

Parameters
  • from_cfg (str, optional) – The name of the config file (relative to the base directory) to be used.

  • run_cfg_path (str, optional) – The path of the run_cfg to use. Can not be passed if from_cfg argument evaluates to True.

  • use_tmpdir (bool, optional) – Whether to use a temporary directory to write the data to. If not given, uses default value set at initialization.

  • **update_meta_cfg – Can be used to update the meta configuration

Returns

The created Multiverse object

Return type

utopya.multiverse.Multiverse

Raises

ValueError – If both from_cfg and run_cfg_path were given

create_frozen_mv(**fmv_kwargs) utopya.multiverse.FrozenMultiverse[source]

Create a utopya.multiverse.FrozenMultiverse, coupling it to a run directory.

Use this method if you want to load an existing simulation run.

Parameters

**fmv_kwargs – Passed on to FrozenMultiverse.__init__

create_run_load(*, from_cfg: Optional[str] = None, run_cfg_path: Optional[str] = None, use_tmpdir: Optional[bool] = None, print_tree: bool = True, **update_meta_cfg) Tuple[utopya.multiverse.Multiverse, utopya.datamanager.DataManager][source]

Chains the create_mv, mv.run, and mv.dm.load_from_cfg methods together and returns a (Multiverse, DataManager) tuple.

Parameters
  • from_cfg (str, optional) – The name of the config file (relative to the base directory) to be used.

  • run_cfg_path (str, optional) – The path of the run_cfg to use. Can not be passed if from_cfg argument evaluates to True.

  • use_tmpdir (bool, optional) – Whether to use a temporary directory to write the data to. If not given, uses default value set at initialization.

  • print_tree (bool, optional) – Whether to print the loaded data tree

  • **update_meta_cfg – Arguments passed to the create_mv function

Returns

Return type

Tuple[Multiverse, DataManager]

get_config_set(name: Optional[str] = None) Dict[str, str][source]

Returns a configuration set: a dict containing paths to run and/or eval configuration files. These are accessible via the keys run and eval.

Config sets are retrieved from multiple locations:

  • The cfgs directory in the model’s source directory

  • The user-specified lookup directories, specified in the utopya configuration as config_set_search_dirs

  • If name is an absolute or relative path, and a directory exists at the specified location, the parent directory is interpreted as a search path.

This uses get_config_sets() to retrieve all available configuration sets from the above paths and then selects the one with the given name. Config sets that are found later overwrite those with the same name found in previous searches and log a warning message (which can be controlled with the warn argument); sets are not merged.

For more information, see Configuration sets.

Parameters

name (str, optional) – The name of the config set to retrieve. This may also be a local path, which is looked up prior to the default search directories.

get_config_sets(*, search_dirs: Optional[List[str]] = None, warn: bool = True, cfg_sets: Optional[dict] = None) Dict[str, dict][source]

Searches for all available configuration sets in the given search directories, aggregating them into one dict.

The search is done in reverse order of the paths given in search_dirs, i.e. starting from those directories with the lowest precedence. If configuration sets with the same name are encountered, warnings are emitted, but the one with higher precedence (appearing more towards the front of search_dirs, i.e. the later-searched one) will take precedence.

Note

This will not merge configuration sets from different search directories, e.g. if one contained only an eval configuration and the other contained only a run configuration, a warning will be emitted but the entry from the later-searched directory will be used.

Parameters
  • search_dirs (List[str], optional) – The directories to search sequentially for config sets. If not given, will use the default config set search directories, see default_config_set_search_dirs.

  • warn (bool, optional) – Whether to warn (via log message), if the search yields a config set with a name that already existed.

  • cfg_sets (dict, optional) – If given, aggregate newly found config sets into this dict. Otherwise, start with an empty one.

__dict__ = mappingproxy({'__module__': 'utopya.model', '__doc__': 'A class to work with Utopia models interactively.\n\n    It attaches to a certain model and makes it easy to load config files,\n    create a Multiverse from them, run it, and work with it further...\n    ', '__init__': <function Model.__init__>, '__str__': <function Model.__str__>, 'info_bundle': <property object>, 'name': <property object>, 'base_dir': <property object>, 'default_model_cfg': <property object>, 'default_config_set_search_dirs': <property object>, 'default_config_sets': <property object>, 'create_mv': <function Model.create_mv>, 'create_frozen_mv': <function Model.create_frozen_mv>, 'create_run_load': <function Model.create_run_load>, 'get_config_set': <function Model.get_config_set>, 'get_config_sets': <function Model.get_config_sets>, '_store_mv': <function Model._store_mv>, '_create_tmpdir': <function Model._create_tmpdir>, '_find_config_sets': <function Model._find_config_sets>, '__dict__': <attribute '__dict__' of 'Model' objects>, '__weakref__': <attribute '__weakref__' of 'Model' objects>, '__annotations__': {}})
__module__ = 'utopya.model'
__weakref__

list of weak references to the object (if defined)

utopya.multiverse module

Implementation of the Multiverse class.

The Multiverse supplies the main user interface of the frontend.

class utopya.multiverse.Multiverse(*, model_name: Optional[str] = None, info_bundle: Optional[utopya.model_registry.info_bundle.ModelInfoBundle] = None, run_cfg_path: Optional[str] = None, user_cfg_path: Optional[str] = None, _shared_worker_manager: Optional[utopya.workermanager.WorkerManager] = None, **update_meta_cfg)[source]

Bases: object

The Multiverse is where a single simulation run is orchestrated from.

It spawns multiple universes, each of which represents a single simulation of the selected model with the parameters specified by the meta configuration.

The WorkerManager takes care to perform these simulations in parallel, the DataManager allows loading the created data, and the PlotManager handles plotting of that data.

Initialize the Multiverse.

Parameters
  • model_name (str, optional) – The name of the model to run

  • info_bundle (ModelInfoBundle, optional) – The model information bundle that includes information about the binary path etc. If not given, will attempt to read it from the model registry.

  • run_cfg_path (str, optional) – The path to the run configuration.

  • user_cfg_path (str, optional) – If given, this is used to update the base configuration. If None, will look for it in the default path, see Multiverse.USER_CFG_SEARCH_PATH.

  • _shared_worker_manager (WorkerManager, optional) –

    If given, this already existing WorkerManager instance (and its reporter) will be used instead of initializing new instances.

    Warning

    This argument is only exposed for internal purposes. It should not be used for production code and behavior of this argument may change at any time.

  • **update_meta_cfg – Can be used to update the meta configuration generated from the previous configuration levels

BASE_META_CFG_PATH = '/builds/utopia-project/docs/utopia/python/utopya/utopya/cfg/base_cfg.yml'
USER_CFG_SEARCH_PATH = '/root/.config/utopia/user_cfg.yml'
RUN_DIR_TIME_FSTR = '%y%m%d-%H%M%S'
UTOPYA_BASE_PLOTS_PATH = '/builds/utopia-project/docs/utopia/python/utopya/utopya/plot_funcs/base_plots.yml'
__init__(*, model_name: Optional[str] = None, info_bundle: Optional[utopya.model_registry.info_bundle.ModelInfoBundle] = None, run_cfg_path: Optional[str] = None, user_cfg_path: Optional[str] = None, _shared_worker_manager: Optional[utopya.workermanager.WorkerManager] = None, **update_meta_cfg)[source]

Initialize the Multiverse.

Parameters
  • model_name (str, optional) – The name of the model to run

  • info_bundle (ModelInfoBundle, optional) – The model information bundle that includes information about the binary path etc. If not given, will attempt to read it from the model registry.

  • run_cfg_path (str, optional) – The path to the run configuration.

  • user_cfg_path (str, optional) – If given, this is used to update the base configuration. If None, will look for it in the default path, see Multiverse.USER_CFG_SEARCH_PATH.

  • _shared_worker_manager (WorkerManager, optional) –

    If given, this already existing WorkerManager instance (and its reporter) will be used instead of initializing new instances.

    Warning

    This argument is only exposed for internal purposes. It should not be used for production code and behavior of this argument may change at any time.

  • **update_meta_cfg – Can be used to update the meta configuration generated from the previous configuration levels

property info_bundle: utopya.model_registry.info_bundle.ModelInfoBundle

The model info bundle for this Multiverse

property model_name: str

The model name associated with this Multiverse

property model_binpath: str

The path to this model’s binary

property meta_cfg: dict

The meta configuration.

property dirs: dict

Information on managed directories.

property cluster_mode: bool

Whether the Multiverse should run in cluster mode

property cluster_params: dict

Returns a copy of the cluster mode configuration parameters

property resolved_cluster_params: dict

Returns a copy of the cluster configuration with all parameters resolved. This makes some additional keys available on the top level.

property dm: utopya.datamanager.DataManager

The Multiverse’s DataManager.

property wm: utopya.workermanager.WorkerManager

The Multiverse’s WorkerManager.

property pm: utopya.plotting.PlotManager

The Multiverse’s PlotManager.

run(*, sweep: Optional[bool] = None)[source]

Starts a Utopia simulation run.

Specifically, this method adds simulation tasks to the associated WorkerManager, locks its task list, and then invokes the start_working() method which performs all the simulation tasks.

If cluster mode is enabled, this will split up the parameter space into (ideally) equally sized parts and only run one of these parts, depending on the cluster node this Multiverse is being invoked on.

Note

As this method locks the task list of the WorkerManager, no further tasks can be added henceforth. This means, that each Multiverse instance can only perform a single simulation run.

Parameters

sweep (bool, optional) – Whether to perform a sweep or not. If None, the value will be read from the perform_sweep key of the meta-configuration.

run_single()[source]

Runs a single simulation using the parameter space’s default value.

See run() for more information.

run_sweep()[source]

Runs a parameter sweep.

See run() for more information.

renew_plot_manager(**update_kwargs)[source]

Tries to set up a new PlotManager. If this succeeds, the old one is discarded and the new one is associated with this Multiverse.

Parameters

**update_kwargs – Passed on to PlotManager.__init__

__dict__ = mappingproxy({'__module__': 'utopya.multiverse', '__doc__': 'The Multiverse is where a single simulation run is orchestrated from.\n\n    It spawns multiple universes, each of which represents a single simulation\n    of the selected model with the parameters specified by the meta\n    configuration.\n\n    The WorkerManager takes care to perform these simulations in parallel, the\n    DataManager allows loading the created data, and the PlotManager handles\n    plotting of that data.\n    ', 'BASE_META_CFG_PATH': '/builds/utopia-project/docs/utopia/python/utopya/utopya/cfg/base_cfg.yml', 'USER_CFG_SEARCH_PATH': '/root/.config/utopia/user_cfg.yml', 'RUN_DIR_TIME_FSTR': '%y%m%d-%H%M%S', 'UTOPYA_BASE_PLOTS_PATH': '/builds/utopia-project/docs/utopia/python/utopya/utopya/plot_funcs/base_plots.yml', '__init__': <function Multiverse.__init__>, 'info_bundle': <property object>, 'model_name': <property object>, 'model_binpath': <property object>, 'meta_cfg': <property object>, 'dirs': <property object>, 'cluster_mode': <property object>, 'cluster_params': <property object>, 'resolved_cluster_params': <property object>, 'dm': <property object>, 'wm': <property object>, 'pm': <property object>, 'run': <function Multiverse.run>, 'run_single': <function Multiverse.run_single>, 'run_sweep': <function Multiverse.run_sweep>, 'renew_plot_manager': <function Multiverse.renew_plot_manager>, '_create_meta_cfg': <function Multiverse._create_meta_cfg>, '_create_run_dir': <function Multiverse._create_run_dir>, '_setup_pm': <function Multiverse._setup_pm>, '_perform_backup': <function Multiverse._perform_backup>, '_prepare_executable': <function Multiverse._prepare_executable>, '_resolve_cluster_params': <function Multiverse._resolve_cluster_params>, '_add_sim_task': <function Multiverse._add_sim_task>, '_add_sim_tasks': <function Multiverse._add_sim_tasks>, '_validate_meta_cfg': <function Multiverse._validate_meta_cfg>, '__dict__': <attribute '__dict__' of 'Multiverse' objects>, '__weakref__': <attribute '__weakref__' of 'Multiverse' objects>, '__annotations__': {}})
__module__ = 'utopya.multiverse'
__weakref__

list of weak references to the object (if defined)

class utopya.multiverse.FrozenMultiverse(*, model_name: Optional[str] = None, info_bundle: Optional[utopya.model_registry.info_bundle.ModelInfoBundle] = None, run_dir: Optional[str] = None, run_cfg_path: Optional[str] = None, user_cfg_path: Optional[str] = None, use_meta_cfg_from_run_dir: bool = False, **update_meta_cfg)[source]

Bases: utopya.multiverse.Multiverse

A frozen Multiverse is like a Multiverse, but frozen.

It is initialized from a finished Multiverse run and re-creates all the attributes from that data, e.g.: the meta configuration, a DataManager, and a PlotManager.

Note that it is no longer able to perform any simulations.

Initializes the FrozenMultiverse from a model name and the name of a run directory.

Note that this also takes arguments to specify the run configuration to use.

Parameters
  • model_name (str) – The name of the model to load. From this, the model output directory is determined and the run_dir will be seen as relative to that directory.

  • info_bundle (ModelInfoBundle, optional) – The model information bundle that includes information about the binary path etc. If not given, will attempt to read it from the model registry.

  • run_dir (str, optional) – The run directory to load. Can be a path relative to the current working directory, an absolute path, or the timestamp of the run directory. If not given, will use the most recent timestamp.

  • run_cfg_path (str, optional) – The path to the run configuration.

  • user_cfg_path (str, optional) – If given, this is used to update the base configuration. If None, will look for it in the default path, see Multiverse.USER_CFG_SEARCH_PATH.

  • use_meta_cfg_from_run_dir (bool, optional) – If True, will load the meta configuration from the given run directory; only works for absolute run directories.

  • **update_meta_cfg – Can be used to update the meta configuration generated from the previous configuration levels

__init__(*, model_name: Optional[str] = None, info_bundle: Optional[utopya.model_registry.info_bundle.ModelInfoBundle] = None, run_dir: Optional[str] = None, run_cfg_path: Optional[str] = None, user_cfg_path: Optional[str] = None, use_meta_cfg_from_run_dir: bool = False, **update_meta_cfg)[source]

Initializes the FrozenMultiverse from a model name and the name of a run directory.

Note that this also takes arguments to specify the run configuration to use.

Parameters
  • model_name (str) – The name of the model to load. From this, the model output directory is determined and the run_dir will be seen as relative to that directory.

  • info_bundle (ModelInfoBundle, optional) – The model information bundle that includes information about the binary path etc. If not given, will attempt to read it from the model registry.

  • run_dir (str, optional) – The run directory to load. Can be a path relative to the current working directory, an absolute path, or the timestamp of the run directory. If not given, will use the most recent timestamp.

  • run_cfg_path (str, optional) – The path to the run configuration.

  • user_cfg_path (str, optional) – If given, this is used to update the base configuration. If None, will look for it in the default path, see Multiverse.USER_CFG_SEARCH_PATH.

  • use_meta_cfg_from_run_dir (bool, optional) – If True, will load the meta configuration from the given run directory; only works for absolute run directories.

  • **update_meta_cfg – Can be used to update the meta configuration generated from the previous configuration levels

__module__ = 'utopya.multiverse'

utopya.parameter module

This module implements the Parameter class which is used when validating model and simulation parameters.

exception utopya.parameter.ValidationError[source]

Bases: ValueError

Raised upon failure to validate a parameter

__module__ = 'utopya.parameter'
__weakref__

list of weak references to the object (if defined)

class utopya.parameter.Parameter(*, default: Any, name: Optional[str] = None, description: Optional[str] = None, is_any_of: Optional[Sequence[Any]] = None, limits: Optional[Tuple[Union[None, float], Union[None, float]]] = None, limits_mode: str = '[]', dtype: Optional[Union[str, type]] = None)[source]

Bases: object

The parameter class is used when a model parameter needs to be validated before commencing the model run. It can hold information on the parameter itself as well as its valid range and type and other meta-data.

Per default, the Parameter class should be assumed to handle scalar parameters like numerical values or strings. For validating sequence-like parameters, corresponding specializing classes are to be implemeted.

Creates a new Parameter object, which holds a default value as well as some constraints on the possible values this parameter can assume.

Parameters
  • default (Any) – the default value of the parameter.

  • name (str, optional) – the name of the parameter.

  • description (str, optional) – a description of this parameter or its effects.

  • is_any_of (Sequence[Any], optional) – a sequence of possible values this parameter can assume. If this parameter is given, limits cannot be used.

  • limits (Tuple[Union[None, float], Union[None, float]], optional) – the upper and lower bounds of the parameter (only applicable to scalar numerals). If None, the bound is assumed to be negative or positive infinity, respectively. Whether boundary values are included into the interval is controlled by the limits_mode argument. This argument is mutually exclusive with is_any_of!

  • limits_mode (str, optional) – whether to interpret the limits as an open, closed, or semi-closed interval. Possible values: '[]' (closed, default), '()' (open), '[)', and '(]'.

  • dtype (Union[str, type], optional) – expected data type of this parameter. Accepts all strings that are accepted by numpy.dtype , eg. int, float, uint16, string.

Raises
  • TypeError – On a limits argument that was not tuple-like or if a limits argument was given but the default was a

  • ValueError – if an invalid limits_mode is passed, if limits and is_any_of are both passed, or if the limits argument did not have length 2.

SHORTHAND_MODES = ('is-probability', 'is-positive', 'is-int', 'is-negative', 'is-positive-int', 'is-string', 'is-negative-int', 'is-bool', 'is-unsigned')
LIMIT_COMPS = {'(': <built-in function gt>, ')': <built-in function lt>, '[': <built-in function ge>, ']': <built-in function le>}
LIMIT_MODES = ('[]', '()', '[)', '(]')
yaml_tag = '!param'
__init__(*, default: Any, name: Optional[str] = None, description: Optional[str] = None, is_any_of: Optional[Sequence[Any]] = None, limits: Optional[Tuple[Union[None, float], Union[None, float]]] = None, limits_mode: str = '[]', dtype: Optional[Union[str, type]] = None)[source]

Creates a new Parameter object, which holds a default value as well as some constraints on the possible values this parameter can assume.

Parameters
  • default (Any) – the default value of the parameter.

  • name (str, optional) – the name of the parameter.

  • description (str, optional) – a description of this parameter or its effects.

  • is_any_of (Sequence[Any], optional) – a sequence of possible values this parameter can assume. If this parameter is given, limits cannot be used.

  • limits (Tuple[Union[None, float], Union[None, float]], optional) – the upper and lower bounds of the parameter (only applicable to scalar numerals). If None, the bound is assumed to be negative or positive infinity, respectively. Whether boundary values are included into the interval is controlled by the limits_mode argument. This argument is mutually exclusive with is_any_of!

  • limits_mode (str, optional) – whether to interpret the limits as an open, closed, or semi-closed interval. Possible values: '[]' (closed, default), '()' (open), '[)', and '(]'.

  • dtype (Union[str, type], optional) –

    expected data type of this parameter. Accepts all strings that are accepted by numpy.dtype , eg. int, float, uint16, string.

Raises
  • TypeError – On a limits argument that was not tuple-like or if a limits argument was given but the default was a

  • ValueError – if an invalid limits_mode is passed, if limits and is_any_of are both passed, or if the limits argument did not have length 2.

__eq__(other) bool[source]

Return self==value.

__str__() str[source]

Return str(self).

property default

The default value for this parameter

property name

The name of this parameter

property description

The description of this parameter

property limits: Optional[tuple]

The limits of this parameter

property limits_mode: str

The mode used when evaluating the limits

property is_any_of: Tuple[Any]

Possible values this parameter may assume

property dtype: Optional[numpy.dtype]

The expected data type of this parameter

validate(value: Any, *, raise_exc: bool = True) bool[source]

Checks whether the given value would be a valid parameter.

The checks for the corresponding arguments are carried out in the following order:

  1. is_any_of

  2. dtype

  3. limits

The data type is checked according to the numpy type hierarchy, see docs. To reduce strictness, the following additional compatibilities are taken into account:

  • for unsigned integer dtype, a signed integer-type value is compatible if value >= 0

  • for floating-point dtype, integer-type value are always considered compatible

  • for floating-point dtype, value of all floating-point- types are considered compatible, even if they have a lower precision (note the coercion test below, though)

Additionally, it is checked whether value is representable as the target data type. This is done by coercing value to dtype and then checking for equality (using np.isclose).

Parameters
  • value (Any) – The value to test.

  • raise_exc (bool, optional) – Whether to raise an exception or not.

Returns

Whether or not the given value is a valid parameter.

Return type

bool

Raises

ValidationError – If validation failed or is impossible (for instance due to ambiguous validity parameters). This error message contains further information on why validation failed.

classmethod from_shorthand(value, *, mode, **kwargs)[source]

Constructs a Parameter object from a given shorthand mode.

Parameters
  • value – A given value, typically the default argument.

  • mode – A valid shorthand mode, see SHORTHAND_MODES

  • **kwargs – any further arguments for Parameter ininitialization, see __init__().

Returns

a Parameter object

classmethod to_yaml(representer, node)[source]

Represent this Parameter object as a YAML mapping.

Parameters
  • representer (ruamel.yaml.representer) – The representer module

  • node (type(self)) – The node, i.e. an instance of this class

Returns

a yaml mapping that is able to recreate this object

classmethod from_yaml(constructor, node)[source]

The default constructor for Parameter objects, expecting a YAML node that is mapping-like.

__dict__ = mappingproxy({'__module__': 'utopya.parameter', '__doc__': 'The parameter class is used when a model parameter needs to be validated\n    before commencing the model run. It can hold information on the parameter\n    itself as well as its valid range and type and other meta-data.\n\n    Per default, the ``Parameter`` class should be assumed to handle *scalar*\n    parameters like numerical values or strings. For validating sequence-like\n    parameters, corresponding specializing classes are to be implemeted.\n    ', 'SHORTHAND_MODES': ('is-probability', 'is-positive', 'is-int', 'is-negative', 'is-positive-int', 'is-string', 'is-negative-int', 'is-bool', 'is-unsigned'), 'LIMIT_COMPS': {'[': <built-in function ge>, '(': <built-in function gt>, ']': <built-in function le>, ')': <built-in function lt>}, 'LIMIT_MODES': ('[]', '()', '[)', '(]'), 'yaml_tag': '!param', '__init__': <function Parameter.__init__>, '__eq__': <function Parameter.__eq__>, '__str__': <function Parameter.__str__>, 'default': <property object>, 'name': <property object>, 'description': <property object>, 'limits': <property object>, 'limits_mode': <property object>, 'is_any_of': <property object>, 'dtype': <property object>, 'validate': <function Parameter.validate>, 'from_shorthand': <classmethod object>, 'to_yaml': <classmethod object>, 'from_yaml': <classmethod object>, '__dict__': <attribute '__dict__' of 'Parameter' objects>, '__weakref__': <attribute '__weakref__' of 'Parameter' objects>, '__hash__': None, '__annotations__': {}})
__hash__ = None
__module__ = 'utopya.parameter'
__weakref__

list of weak references to the object (if defined)

utopya.parameter.extract_validation_objects(model_cfg: dict, *, model_name: str) Tuple[dict, dict][source]

Extracts all Parameter objects from a model configuration (a nested dict), replacing them with their default values. Returns both the modified model configuration well as the Parameter objects (keyed by the key sequence necessary to reach them within the model configuration).

Parameters
  • model_cfg (dict) – the model configuration to inspect

  • model_name (str) – the name of the model

Returns

a tuple of (model config, parameters to validate).

The model config contains the passed config dict in which all Parameter class elements have been replaced by their default entries. The second entry is a dictionary consisting of the Parameter class objects (requiring validation) with keys being key sequences to those Parameter objects. Note that the key sequence is relative to the level above the model configuration, with model_name as a common entry for all returned values.

Return type

Tuple[dict, dict]

utopya.plotting module

Implements a plotting framework based on dantro

In order to make the plotting framework specific to Utopia, this module derives both from the dantro PlotManager and some PlotCreator classes.

utopya.plotting.register_operation(*, skip_existing=True, **kws) None[source]

Register an operation with the dantro data operations database.

This invokes register_operation(), but has skip_existing == True as default in order to reduce number of arguments that need to be specified in Utopia model plots.

Parameters
  • skip_existing (bool, optional) – Whether to skip (without an error) if an operation

  • **kws – Passed to register_operation()

class utopya.plotting.PlotHelper(*, out_path: str, helper_defaults: Optional[dict] = None, update_helper_cfg: Optional[dict] = None, raise_on_error: bool = True, animation_enabled: bool = False)[source]

Bases: dantro.plot_creators._plot_helper.PlotHelper

A specialization of the dantro PlotHelper used in plot creators that are derived from ExternalPlotCreator.

This can be used to add additional helpers for use in Utopia without requiring changes on dantro-side.

Note

The helpers implemented here should try to adhere to the interface exemplified by the dantro PlotHelper class, with the aim that they can then be migrated into dantro in the long run.

Initialize a Plot Helper with a certain configuration.

This configuration is the so-called “base” configuration and is not axis-specific. There is the possibility to specify axis-specific configuration entries.

All entries in the helper configuration are deemed ‘enabled’ unless they explicitly specify enabled: false in their configuration.

Parameters
  • out_path (str) – path to store the created figure. This may be an absolute path or a relative path; the latter is regarded as relative to the current working directory. The home directory indicator ~ is expanded.

  • helper_defaults (dict, optional) – The basic configuration of the helpers.

  • update_helper_cfg (dict, optional) – A configuration used to update the existing helper defaults

  • raise_on_error (bool, optional) – Whether to raise on an exception created on helper invocation or just log the error

  • animation_enabled (bool, optional) – Whether animation mode is enabled.

__module__ = 'utopya.plotting'
class utopya.plotting.ExternalPlotCreator(name: str, *, base_module_file_dir: Optional[str] = None, style: Optional[dict] = None, **parent_kwargs)[source]

Bases: dantro.plot_creators.pcr_ext.ExternalPlotCreator

This is the Utopia-specific version of dantro’s ExternalPlotCreator.

Its main purpose is to define common settings for plotting. By adding this extra layer, it allows for future extensibility as well.

One of the common settings is that it sets as BASE_PKG the utopya utopya.plot_funcs, which is an extension of those functions supplied by dantro.

Initialize an ExternalPlotCreator.

Parameters
  • name (str) – The name of this plot

  • base_module_file_dir (str, optional) – If given, module_file arguments to the _plot method that are relative paths will be seen relative to this directory

  • style (dict, optional) – The default style context defintion to enter before calling the plot function. This can be used to specify the aesthetics of a plot. It is evaluated here once, stored as attribute, and can be updated when the plot method is called.

  • **parent_kwargs – Passed to the parent __init__

Raises

ValueError – On invalid base_module_file_dir argument

EXTENSIONS = 'all'
DEFAULT_EXT = 'pdf'
BASE_PKG = 'utopya.plot_funcs'
PLOT_HELPER_CLS

alias of utopya.plotting.PlotHelper

CUSTOM_PLOT_MODULE_NAME = 'model_plots'
CUSTOM_PLOT_MODULE_PATHS = {'Utopia': '/builds/utopia-project/docs/utopia/python/model_plots'}
__abstractmethods__ = frozenset({})
__module__ = 'utopya.plotting'
class utopya.plotting.UniversePlotCreator(*args, psgrp_path: Optional[str] = None, **kwargs)[source]

Bases: dantro.plot_creators.pcr_psp.UniversePlotCreator, utopya.plotting.ExternalPlotCreator

Makes plotting with data from a single universe more convenient

Initialize a UniversePlotCreator

PSGRP_PATH = 'multiverse'
__abstractmethods__ = frozenset({})
__module__ = 'utopya.plotting'
class utopya.plotting.MultiversePlotCreator(*args, psgrp_path: Optional[str] = None, **kwargs)[source]

Bases: dantro.plot_creators.pcr_psp.MultiversePlotCreator, utopya.plotting.ExternalPlotCreator

Makes plotting with data from a all universes more convenient

Initialize a MultiversePlotCreator

Parameters
  • *args – Passed on to parent

  • psgrp_path (str, optional) – The path to the associated ParamSpaceGroup that is to be used for these multiverse plots.

  • **kwargs – Passed on to parent

PSGRP_PATH = 'multiverse'
__abstractmethods__ = frozenset({})
__module__ = 'utopya.plotting'
class utopya.plotting.PlotManager(*args, _model_info_bundle: Optional[utopya.model_registry.info_bundle.ModelInfoBundle] = None, **kwargs)[source]

Bases: dantro.plot_mngr.PlotManager

This is the Utopia-specific version of the dantro PlotManager class

It registers the Utopia-specific plot creators and allows for custom interface specifications.

Sets up a PlotManager.

This additionally stores some Utopia-specific metadata about the model this PlotManager is used with. That information is then used to load some additional model-specific information once a creator is invoked.

CREATORS = {'multiverse': <class 'utopya.plotting.MultiversePlotCreator'>, 'universe': <class 'utopya.plotting.UniversePlotCreator'>}
__init__(*args, _model_info_bundle: Optional[utopya.model_registry.info_bundle.ModelInfoBundle] = None, **kwargs)[source]

Sets up a PlotManager.

This additionally stores some Utopia-specific metadata about the model this PlotManager is used with. That information is then used to load some additional model-specific information once a creator is invoked.

property common_out_dir: str

The common output directory of all plots that were created with this plot manager instance. This uses the plot output paths stored in the plot information dict, specifically the target_dir entry.

If there was no plot information yet, the return value will be empty.

plot_from_cfg(*args, plots_cfg: Optional[Union[dict, str]] = None, **kwargs)[source]

Thin wrapper around parent method that shows which plot configuration file will be used.

__module__ = 'utopya.plotting'

utopya.reporter module

Implementation of the Reporter class.

class utopya.reporter.ReportFormat(*, parser: Callable, writers: List[Callable], min_report_intv: Optional[float] = None)[source]

Bases: object

Initialises a ReportFormat object, which gathers callables needed to create a report in a certain format.

Parameters
  • parser (Callable) – The parser method to use

  • writers (List[Callable]) – The writer method(s) to use

  • min_report_intv (float, optional) – The minimum report interval of reports in this format.

__init__(*, parser: Callable, writers: List[Callable], min_report_intv: Optional[float] = None)[source]

Initialises a ReportFormat object, which gathers callables needed to create a report in a certain format.

Parameters
  • parser (Callable) – The parser method to use

  • writers (List[Callable]) – The writer method(s) to use

  • min_report_intv (float, optional) – The minimum report interval of reports in this format.

property min_report_intv: Optional[datetime.timedelta]

Returns the minimum report intv

property reporting_blocked: bool

Determines whether this ReportFormat was generated

report(*, force: bool = False, parser_kwargs: Optional[dict] = None) bool[source]

Parses and writes a report corresponding to this object’s format.

If within the minimum report interval, will return False.

Parameters
  • force (bool, optional) – If True, will ignore the min_report_intv

  • parser_kwargs (dict, optional) – Keyword arguments passed on to the parser

Returns

Whether a report was generated or not

Return type

bool

__dict__ = mappingproxy({'__module__': 'utopya.reporter', '__init__': <function ReportFormat.__init__>, 'min_report_intv': <property object>, 'reporting_blocked': <property object>, 'report': <function ReportFormat.report>, '__dict__': <attribute '__dict__' of 'ReportFormat' objects>, '__weakref__': <attribute '__weakref__' of 'ReportFormat' objects>, '__doc__': None, '__annotations__': {}})
__module__ = 'utopya.reporter'
__weakref__

list of weak references to the object (if defined)

class utopya.reporter.Reporter(*, report_formats: Optional[Union[List[str], Dict[str, dict]]] = None, default_format: Optional[str] = None, report_dir: Optional[str] = None, suppress_cr: bool = False)[source]

Bases: object

The Reporter class holds general reporting capabilities.

It needs to be subclassed in order to specialise its reporting functions.

Initialize the Reporter for the WorkerManager.

Parameters
  • report_formats (Union[List[str], Dict[str, dict]], optional) – The report formats to use with this reporter. If given as list of strings, the strings are the names of the report formats as well as those of the parsers; all other parameters are the defaults. If given as dict of dicts, the keys are the names of the formats and the inner dicts are the parameters to create report formats from.

  • default_format (str, optional) – The name of the default report format; if None is given, the .report method requires the name of a report format.

  • report_dir (str, optional) – if reporting to a file; this is the base directory that is reported to.

  • suppress_cr (bool, optional) – Whether to suppress carriage return characters in writers. This option is useful when the reporter is not the only class that writes to a stream.

__init__(*, report_formats: Optional[Union[List[str], Dict[str, dict]]] = None, default_format: Optional[str] = None, report_dir: Optional[str] = None, suppress_cr: bool = False)[source]

Initialize the Reporter for the WorkerManager.

Parameters
  • report_formats (Union[List[str], Dict[str, dict]], optional) – The report formats to use with this reporter. If given as list of strings, the strings are the names of the report formats as well as those of the parsers; all other parameters are the defaults. If given as dict of dicts, the keys are the names of the formats and the inner dicts are the parameters to create report formats from.

  • default_format (str, optional) – The name of the default report format; if None is given, the .report method requires the name of a report format.

  • report_dir (str, optional) – if reporting to a file; this is the base directory that is reported to.

  • suppress_cr (bool, optional) – Whether to suppress carriage return characters in writers. This option is useful when the reporter is not the only class that writes to a stream.

property report_formats: dict

Returns the dict of ReportFormat objects.

property default_format: Union[None, utopya.reporter.ReportFormat]

Returns the default report format or None, if not set.

property suppress_cr: bool

Whether to suppress a carriage return. Objects using the reporter can set this property to communicate that they will be putting content into the stdout stream as well. The writers can check this property and adjust their behaviour accordingly.

add_report_format(name: str, *, parser: Optional[str] = None, write_to: Union[str, Dict[str, dict]] = 'stdout', min_report_intv: Optional[float] = None, rf_kwargs: Optional[dict] = None, **parser_kwargs)[source]

Add a report format to this reporter.

Parameters
  • name (str) – The name of this format

  • parser (str, optional) – The name of the parser; if not given, the name of the report format is assumed

  • write_to (Union[str, Dict[str, dict]], optional) – The name of the writer. If this is a dict of dict, the keys will be interpreted as the names of the writers and the nested dict as the **kwargs to the writer function.

  • min_report_intv (float, optional) – The minimum report interval (in seconds) for this report format

  • rf_kwargs (dict, optional) – Further kwargs to ReportFormat.__init__

  • **parser_kwargs – The kwargs to the parser function

Raises

ValueError – A report format with this name already exists

report(report_format: Optional[str] = None, **kwargs) bool[source]

Create a report with the given format; if none is given, the default format is used.

Parameters
  • report_format (str, optional) – The report format to use

  • **kwargs – Passed on to the ReportFormat.report() call

Returns

Whether there was a report

Return type

bool

Raises

ValueError – If no default format was set and no report format name was given

parse_and_write(*, parser: Union[str, Callable], write_to: Union[str, Callable], **parser_kwargs)[source]

This function allows to select a parser and writer explicitly.

Parameters
  • parser (Union[str, Callable]) – The parser method to use.

  • write_to (Union[str, Callable]) – The write method to use. Can also be a sequence of names and/or callables or a Dict. For allowed specification formats, see the ._resolve_writers method.

  • **parser_kwargs – Passed to the parser, if given

__dict__ = mappingproxy({'__module__': 'utopya.reporter', '__doc__': 'The Reporter class holds general reporting capabilities.\n\n    It needs to be subclassed in order to specialise its reporting functions.\n    ', '__init__': <function Reporter.__init__>, 'report_formats': <property object>, 'default_format': <property object>, 'suppress_cr': <property object>, 'add_report_format': <function Reporter.add_report_format>, 'report': <function Reporter.report>, 'parse_and_write': <function Reporter.parse_and_write>, '_resolve_parser': <function Reporter._resolve_parser>, '_resolve_writers': <function Reporter._resolve_writers>, '_write_to_stdout': <function Reporter._write_to_stdout>, '_write_to_stdout_noreturn': <function Reporter._write_to_stdout_noreturn>, '_write_to_log': <function Reporter._write_to_log>, '_write_to_file': <function Reporter._write_to_file>, '__dict__': <attribute '__dict__' of 'Reporter' objects>, '__weakref__': <attribute '__weakref__' of 'Reporter' objects>, '__annotations__': {}})
__module__ = 'utopya.reporter'
__weakref__

list of weak references to the object (if defined)

class utopya.reporter.WorkerManagerReporter(wm: utopya.workermanager.WorkerManager, *, mv: utopya.multiverse.Multiverse = None, **reporter_kwargs)[source]

Bases: utopya.reporter.Reporter

This class reports on the state of the WorkerManager.

Initialize the Reporter for the WorkerManager.

Parameters
  • wm (utopya.workermanager.WorkerManager) – The associated WorkerManager instance

  • mv (utopya.multiverse.Multiverse, optional) – The Multiverse this reporter is used in. If this is provided, it can be used in report parsers, e.g. to provide additional information on simulations.

  • **reporter_kwargs – Passed on to parent method

TTY_MARGIN = 4
PROGRESS_BAR_SYMBOLS = {'active': '░', 'active_progress': '▒', 'finished': '▓', 'space': ' '}
__init__(wm: utopya.workermanager.WorkerManager, *, mv: utopya.multiverse.Multiverse = None, **reporter_kwargs)[source]

Initialize the Reporter for the WorkerManager.

Parameters
  • wm (utopya.workermanager.WorkerManager) – The associated WorkerManager instance

  • mv (utopya.multiverse.Multiverse, optional) – The Multiverse this reporter is used in. If this is provided, it can be used in report parsers, e.g. to provide additional information on simulations.

  • **reporter_kwargs – Passed on to parent method

property wm

Returns the associated WorkerManager.

property task_counters: collections.OrderedDict

Returns a dict of task counters:

  • total: total number of registered WorkerManager tasks

  • active: number of currently active tasks

  • finished: number of finished tasks, including tasks that were stopped via a stop condition

  • stopped: number of tasks for which stop conditions were fulfilled, see Stop Conditions

property wm_progress: float

The WorkerManager progress, between 0 and 1.

property wm_active_tasks_progress: float

The active tasks’ progress

If there are no active tasks in the worker manager, returns 0

property wm_elapsed: Optional[datetime.timedelta]

Seconds elapsed since start of working or None if not yet started

property wm_times: dict

Return the characteristics WorkerManager times. Calls get_progress_info() without any additional arguments.

register_task(task: utopya.task.WorkerTask)[source]

Given the task object, extracts and stores some information.

The information currently extracted is the run time and the exit code.

This can be used as a callback function from a WorkerTask object.

Parameters

task (utopya.task.WorkerTask) – The WorkerTask to extract information from.

calc_runtime_statistics(min_num: int = 10) collections.OrderedDict[source]

Calculates the current runtime statistics.

Returns

name of the calculated statistic and its value, i.e.

the runtime in seconds

Return type

OrderedDict

get_progress_info(**eta_options) Dict[str, float][source]

Compiles a dict containing progress information for the current work session.

Parameters

**eta_options – Passed on to method calculating est_left, _compute_est_left().

Returns

Progress information. Guaranteed to contain the

keys start, now, elapsed, est_left, est_end, and end.

Return type

Dict[str, float]

__module__ = 'utopya.reporter'

utopya.stopcond module

This module implements the StopCondition class, which is used by the WorkerManager to stop a worker process in certain situations.

class utopya.stopcond.StopCondition(*, to_check: Optional[List[dict]] = None, name: Optional[str] = None, description: Optional[str] = None, enabled: bool = True, func: Optional[Union[Callable, str]] = None, **func_kwargs)[source]

Bases: object

A StopCondition object holds information on the conditions in which a worker process should be stopped.

It is formulated in a general way, applying to all Workers. The attributes of this class store the information required to deduce whether the condition if fulfilled or not.

Create a new stop condition

Parameters
  • to_check (List[dict], optional) – A list of dicts, that holds the functions to call and the arguments to call them with. The only requirement for the dict is that the func key is available. All other keys are unpacked and passed as kwargs to the given function. The func key can be either a callable or a string corresponding to a name in the utopya.stopcond_funcs module.

  • name (str, optional) – The name of this stop condition

  • description (str, optional) – A short description of this stop condition

  • enabled (bool, optional) – Whether this stop condition should be checked; if False, it will be created but will always be un- fulfilled when checked.

  • func (Union[Callable, str], optional) – (For the short syntax only!) If no to_check argument is given, a function can be given here that will be the only one that is checked. If this argument is a string, it is also resolved from the utopya stopcond_funcs module.

  • **func_kwargs – (For the short syntax) The kwargs that are passed to the single stop condition function

__init__(*, to_check: Optional[List[dict]] = None, name: Optional[str] = None, description: Optional[str] = None, enabled: bool = True, func: Optional[Union[Callable, str]] = None, **func_kwargs)[source]

Create a new stop condition

Parameters
  • to_check (List[dict], optional) – A list of dicts, that holds the functions to call and the arguments to call them with. The only requirement for the dict is that the func key is available. All other keys are unpacked and passed as kwargs to the given function. The func key can be either a callable or a string corresponding to a name in the utopya.stopcond_funcs module.

  • name (str, optional) – The name of this stop condition

  • description (str, optional) – A short description of this stop condition

  • enabled (bool, optional) – Whether this stop condition should be checked; if False, it will be created but will always be un- fulfilled when checked.

  • func (Union[Callable, str], optional) – (For the short syntax only!) If no to_check argument is given, a function can be given here that will be the only one that is checked. If this argument is a string, it is also resolved from the utopya stopcond_funcs module.

  • **func_kwargs – (For the short syntax) The kwargs that are passed to the single stop condition function

property fulfilled_for: Set[utopya.task.Task]

The set of tasks this stop condition was fulfilled for

__str__() str[source]

A string representation for this StopCondition, including the name and, if given, the description.

fulfilled(task: utopya.task.Task) bool[source]

Checks if the stop condition is fulfilled for the given worker, using the information from the dict.

All given stop condition functions are evaluated; if all of them return True, this method will also return True.

Furthermore, if the stop condition is fulfilled, the task’s set of fulfilled stop conditions will

Parameters

task (utopya.task.Task) – Task object that is to be checked

Returns

If all stop condition functions returned true for the given

worker and its current information

Return type

bool

yaml_tag = '!stop-condition'
classmethod to_yaml(representer, node)[source]

Creates a yaml representation of the StopCondition object by storing the initialization kwargs as a yaml mapping.

Parameters
  • representer (ruamel.yaml.representer) – The representer module

  • node (type(self)) – The node, i.e. an instance of this class

Returns

a yaml mapping that is able to recreate this object

classmethod from_yaml(constructor, node)[source]

Creates a StopCondition object by unpacking the given mapping such that all stored arguments are available to __init__.

__dict__ = mappingproxy({'__module__': 'utopya.stopcond', '__doc__': 'A StopCondition object holds information on the conditions in which a\n    worker process should be stopped.\n\n    It is formulated in a general way, applying to all Workers. The attributes\n    of this class store the information required to deduce whether the\n    condition if fulfilled or not.\n    ', '__init__': <function StopCondition.__init__>, 'fulfilled_for': <property object>, '_resolve_sc_funcs': <staticmethod object>, '__str__': <function StopCondition.__str__>, 'fulfilled': <function StopCondition.fulfilled>, 'yaml_tag': '!stop-condition', 'to_yaml': <classmethod object>, 'from_yaml': <classmethod object>, '__dict__': <attribute '__dict__' of 'StopCondition' objects>, '__weakref__': <attribute '__weakref__' of 'StopCondition' objects>, '__annotations__': {}})
__module__ = 'utopya.stopcond'
__weakref__

list of weak references to the object (if defined)

utopya.stopcond_funcs module

Here, functions that are used in the StopCondition class are defined.

These all get passed the worker task, information and additional kwargs.

Required signature: (task: WorkerTask, **kws) -> bool

utopya.stopcond_funcs.timeout_wall(task: utopya.task.WorkerTask, *, seconds: float) bool[source]

Checks the wall timeout of the given worker

Parameters
  • task (utopya.task.WorkerTask) – The WorkerTask object to check

  • seconds (float) – After how many seconds to trigger the wall timeout

Returns

Whether the timeout is fulfilled

Return type

bool

utopya.stopcond_funcs.check_monitor_entry(task: utopya.task.WorkerTask, *, entry_name: str, operator: str, value: float) bool[source]

Checks if a monitor entry compares in a certain way to a given value

Parameters
  • task (utopya.task.WorkerTask) – The WorkerTask object to check

  • entry_name (str) – The name of the monitor entry, leading to the value to the left-hand side of the operator

  • operator (str) – The binary operator to use

  • value (float) – The right-hand side value to compare to

Returns

Result of op(entry, value)

Return type

bool

utopya.task module

The Task class supplies a container for all information needed for a task.

The WorkerTask and ProcessTask classes specialize on tasks for the WorkerManager that work on subprocesses or multiprocessing processes.

utopya.task.enqueue_lines(*, queue: queue.Queue, stream: TextIO, follow: bool = False, parse_func: Optional[Callable] = None) None[source]

From the given text stream, read line-buffered lines and add them to the provided queue as 2-tuples, (line, parsed object).

This function is meant to be passed to an individual thread in which it can read individual lines separately from the main thread. Before exiting this function, the stream is closed.

Parameters
  • queue (queue.Queue) – The queue object to put the read line and parsed objects into.

  • stream (TextIO) – The stream identifier. If this is not a text stream, be aware that the elements added to the queue might need decoding.

  • follow (bool, optional) – If instead of iter(stream.readline), the _follow() function should be used instead. This should be selected if the stream is file-like instead of sys.stdout-like.

  • parse_func (Callable, optional) – A parse function that the read line is passed through. This should be a unary function that either returns a successfully parsed line or None.

utopya.task.parse_yaml_dict(line: str, *, start_str: str = '!!map') Union[None, dict][source]

A yaml parse function that can be passed to enqueue_lines. It only tries parsing the line if it starts with the provided start string.

It tries to decode the line, and parse it as a yaml. If that fails, it will still try to decode the string. If that fails yet again, the unchanged line will be returned.

Parameters
  • line (str) – The line to decode, assumed byte-string, utf8-encoded

  • start_str (str, optional) – Description

Returns

either the decoded dict, or, if that failed:

Return type

Union[None, dict]

class utopya.task.Task(*, name: Optional[str] = None, priority: Optional[float] = None, callbacks: Optional[Dict[str, Callable]] = None, progress_func: Optional[Callable] = None)[source]

Bases: object

The Task is a container for a task handled by the WorkerManager.

It aims to provide the necessary interfaces for the WorkerManager to easily associate tasks with the corresponding workers and vice versa.

Initialize a Task object.

Parameters
  • name (str, optional) – The task’s name. If none is given, the generated uuid will be used.

  • priority (float, optional) – The priority of this task; if None, default is +np.inf, i.e. the lowest priority. If two priority values are the same, the task created earlier has a higher priority.

  • callbacks (Dict[str, Callable], optional) – A dict of callback funcs that are called at different points of the life of this task. The function gets passed as only argument this task object.

  • progress_func (Callable, optional) – Invoked by the progress property and used to calculate the progress given the current task object as argument

__slots__ = ('_name', '_priority', '_uid', '_progress_func', '_stop_conditions', 'callbacks')
__init__(*, name: Optional[str] = None, priority: Optional[float] = None, callbacks: Optional[Dict[str, Callable]] = None, progress_func: Optional[Callable] = None)[source]

Initialize a Task object.

Parameters
  • name (str, optional) – The task’s name. If none is given, the generated uuid will be used.

  • priority (float, optional) – The priority of this task; if None, default is +np.inf, i.e. the lowest priority. If two priority values are the same, the task created earlier has a higher priority.

  • callbacks (Dict[str, Callable], optional) – A dict of callback funcs that are called at different points of the life of this task. The function gets passed as only argument this task object.

  • progress_func (Callable, optional) – Invoked by the progress property and used to calculate the progress given the current task object as argument

callbacks
property name: str

The task’s name, if given; else the uid.

property uid: int

The task’s unique ID

property priority: float

The task’s priority. Default is +inf, which is the lowest priority

property order_tuple: tuple

Returns the ordering tuple (priority, uid.time)

property progress: float

If a progress function is given, invokes it; otherwise returns 0

This also performs checks that the progress is in [0, 1]

property fulfilled_stop_conditions: Set[StopCondition]

The set of fulfilled stop conditions for this task. Typically, this is set by the StopCondition itself as part of its evaluation in the utopya.stopcond.StopCondition.fulfilled() method.

__hash__() int[source]

Return hash(self).

__str__() str[source]

Return str(self).

__lt__(other) bool[source]

Return self<value.

__le__(other) bool[source]

Return self<=value.

__eq__(other) bool[source]

Evaluates equality of two tasks: returns true only if identical.

Note

We trust that the unique ID of each task (generated with uuid) is really unique, therefore different tasks can never be fully equivalent.

__module__ = 'utopya.task'
class utopya.task.WorkerTask(*, setup_func: Optional[Callable] = None, setup_kwargs: Optional[dict] = None, worker_kwargs: Optional[dict] = None, **task_kwargs)[source]

Bases: utopya.task.Task

A specialisation of Task for use in the WorkerManager.

It is able to spawn a worker process using subprocess.Popen, executing the task in a non-blocking manner. At the same time, the worker’s stream can be read in via another non-blocking thread and stream information can be parsed. Furthermore, this class provides most of the interface for signalling the spawned process.

For an equivalent class that uses multiprocessing instead of subprocess, see the derived MPProcessTask.

Initialize a WorkerTask.

This is a specialization of Task for use in the WorkerManager.

Parameters
  • setup_func (Callable, optional) – The setup function to use before this task is spawned; this allows to dynamically handle the worker arguments. It is called with the worker_kwargs keyword argument, containing the dict passed here. Additionally, setup_kwargs are unpacked into the funtion call. The function should return a dict that is then used as worker_kwargs for the individual task.

  • setup_kwargs (dict, optional) – The keyword arguments unpacked into the setup_func call.

  • worker_kwargs (dict, optional) – The keyword arguments needed to spawn the worker. Note that these are also passed to setup_func and, if a setup_func is given, the return value of that function will be used for the worker_kwargs.

  • **task_kwargs – Arguments to be passed to __init__(), including the callbacks dictionary among other things.

Raises

ValueError – If neither setup_func nor worker_kwargs were given, thus lacking information on how to spawn the worker.

__slots__ = ('setup_func', 'setup_kwargs', 'worker_kwargs', '_worker', '_worker_pid', '_worker_status', 'streams', 'profiling')
STREAM_PARSE_FUNCS = {'default': None, 'yaml_dict': <function parse_yaml_dict>}
__init__(*, setup_func: Optional[Callable] = None, setup_kwargs: Optional[dict] = None, worker_kwargs: Optional[dict] = None, **task_kwargs)[source]

Initialize a WorkerTask.

This is a specialization of Task for use in the WorkerManager.

Parameters
  • setup_func (Callable, optional) – The setup function to use before this task is spawned; this allows to dynamically handle the worker arguments. It is called with the worker_kwargs keyword argument, containing the dict passed here. Additionally, setup_kwargs are unpacked into the funtion call. The function should return a dict that is then used as worker_kwargs for the individual task.

  • setup_kwargs (dict, optional) – The keyword arguments unpacked into the setup_func call.

  • worker_kwargs (dict, optional) – The keyword arguments needed to spawn the worker. Note that these are also passed to setup_func and, if a setup_func is given, the return value of that function will be used for the worker_kwargs.

  • **task_kwargs – Arguments to be passed to __init__(), including the callbacks dictionary among other things.

Raises

ValueError – If neither setup_func nor worker_kwargs were given, thus lacking information on how to spawn the worker.

setup_func
setup_kwargs
worker_kwargs
streams
profiling
property worker: subprocess.Popen

The associated worker process object or None, if not yet created.

property worker_pid: int

The process ID of the associated worker process

property worker_status: Optional[int]

The worker processe’s current status or False, if there is no worker spawned yet.

Note that this invokes a poll to the worker process if one was spawned.

Returns

Current worker status. False, if there was no

worker associated yet.

Return type

Union[int, None]

property outstream_objs: list

Returns the list of objects parsed from the ‘out’ stream

__str__() str[source]

Return basic WorkerTask information.

spawn_worker() subprocess.Popen[source]

Spawn a worker process using subprocess.Popen and manage the corresponding queue and thread for reading the stdout stream.

If there is a setup_func, this function will be called first.

Afterwards, from the worker_kwargs returned by that function or from the ones given during initialisation (if no setup_func was given), the worker process is spawned and associated with this task.

Returns

The created process object

Return type

subprocess.Popen

Raises
  • RuntimeError – If a worker was already spawned for this task.

  • TypeError – For invalid args argument

read_streams(stream_names: list = 'all', *, max_num_reads: int = 10, forward_directly: bool = False) None[source]

Read the streams associated with this task’s worker.

Parameters
  • stream_names (list, optional) – The list of stream names to read. If all (default), will read all streams.

  • max_num_reads (int, optional) –

    How many lines should be read from the buffer. For -1, reads the whole buffer.

    Warning

    Do not make this value too large as it could block the whole reader thread of this worker.

  • forward_directly (bool, optional) – Whether to call the forward_streams() method; this is done before the callback and can be useful if the callback should not happen before the streams are forwarded.

Returns

None

save_streams(stream_names: list = 'all', *, final: bool = False)[source]

For each stream, checks if it is to be saved, and if yes: saves it.

The saving location is stored in the streams dict. The relevant keys are the save flag and the save_path string.

Note that this function does not save the whole stream log, but only those part of the stream log that have not already been saved. The position up to which the stream was saved is stored under the lines_saved key in the stream dict.

Parameters
  • stream_names (list, optional) – The list of stream names to _check_. If ‘all’ (default), will check all streams whether the save flag is set.

  • save_raw (bool, optional) – If True, stores the raw log; otherwise stores the regular log, i.e. the lines that were parseable not included.

  • final (bool, optional) – If True, this is regarded as the final save operation for the stream, which will lead to additional information being saved to the end of the log.

  • remove_ansi (bool, optional) – If True, will remove ANSI escape characters (e.g. from colored logging) from the log before saving to file.

Returns

None

forward_streams(stream_names: list = 'all', forward_raw: bool = False) bool[source]

Forwards the streams to stdout, either via logging module or print

This function can be periodically called to forward the part of the stream logs that was not already forwarded to stdout.

The information for that is stored in the stream dict. The log_level entry is used to determine whether the logging module should be used or (in case of None) the print method.

Parameters

stream_names (list, optional) – The list of streams to print

Returns

whether there was any output

Return type

bool

signal_worker(signal: str) tuple[source]

Sends a signal to this WorkerTask’s worker.

Parameters

signal (str) – The signal to send. Needs to be a valid signal name, i.e.: available in python signal module.

Raises

ValueError – When an invalid signal argument was given

Returns

(signal: str, signum: int) sent to the worker

Return type

tuple

__module__ = 'utopya.task'
class utopya.task.PopenMPProcess(args: tuple, kwargs: dict = {}, stdin=None, stdout=None, stderr=None, bufsize: int = - 1, encoding: str = 'utf8')[source]

Bases: object

A wrapper around multiprocessing.Process that replicates (wide parts of) the interface of subprocess.Popen.

Creates a multiprocessing.Process and starts it.

The interface here is a subset of subprocess.Popen that makes those features available that make sense for a multiprocessing.Process, mainly: stream reading.

Subsequently, the interface is quite a bit different to that of the multiprocessing.Process. The most important arguments of that interface are target, args, and kwargs, which can be set as follows:

  • target will be args[0]

  • args will be args[1:]

  • kwargs is an additional keyword argument that is not part of

    the subprocess.Popen interface typically.

Regarding the stream arguments, the following steps are done to attach custom pipes: If any argument is a subprocess.PIPE or another stream specifier that is not subprocess.DEVNULL, a new multiprocessing.Pipe and a reader thread will be established.

Parameters
  • args (tuple) – The target callable (args[0]) and subsequent positional arguments.

  • kwargs (dict, optional) – Keyword arguments for the target.

  • stdin (None, optional) – The stdin stream

  • stdout (None, optional) – The stdout stream

  • stderr (None, optional) – The stderr stream

  • bufsize (int, optional) – The buffersize to use.

  • encoding (str, optional) – The encoding to use for the streams; should typically remain utf8, using other values is not encouraged!

__init__(args: tuple, kwargs: dict = {}, stdin=None, stdout=None, stderr=None, bufsize: int = - 1, encoding: str = 'utf8')[source]

Creates a multiprocessing.Process and starts it.

The interface here is a subset of subprocess.Popen that makes those features available that make sense for a multiprocessing.Process, mainly: stream reading.

Subsequently, the interface is quite a bit different to that of the multiprocessing.Process. The most important arguments of that interface are target, args, and kwargs, which can be set as follows:

  • target will be args[0]

  • args will be args[1:]

  • kwargs is an additional keyword argument that is not part of

    the subprocess.Popen interface typically.

Regarding the stream arguments, the following steps are done to attach custom pipes: If any argument is a subprocess.PIPE or another stream specifier that is not subprocess.DEVNULL, a new multiprocessing.Pipe and a reader thread will be established.

Parameters
  • args (tuple) – The target callable (args[0]) and subsequent positional arguments.

  • kwargs (dict, optional) – Keyword arguments for the target.

  • stdin (None, optional) – The stdin stream

  • stdout (None, optional) – The stdout stream

  • stderr (None, optional) – The stderr stream

  • bufsize (int, optional) – The buffersize to use.

  • encoding (str, optional) – The encoding to use for the streams; should typically remain utf8, using other values is not encouraged!

__del__()[source]

Custom destructor that closes the process and file descriptors

__str__() str[source]

Return str(self).

poll() Optional[int][source]

Check if child process has terminated. Set and return returncode attribute. Otherwise, returns None.

With the underlying process being a multiprocessing.Process, this method is equivalent to the returncode property.

wait(timeout=None)[source]

Wait for the process to finish; blocking call.

This method is not yet implemented, but will be!

communicate(input=None, timeout=None)[source]

Communicate with the process.

This method is not yet implemented! Not sure if it will be …

send_signal(signal: int)[source]

Send a signal to the process. Only works for SIGKILL and SIGTERM.

terminate()[source]

Sends SIGTERM to the process

kill()[source]

Sends SIGKILL to the process

property args: tuple

The args argument to this process. Note that the returned tuple includes the target callable as its first entry.

Note that these have already been passed to the process; changing them has no effect.

property kwargs

Keyword arguments passed to the target callable.

Note that these have already been passed to the process; changing them has no effect.

property stdin

The attached stdin stream

property stdout

The attached stdout stream

property stderr

The attached stderr stream

property pid

Process ID of the child process

property returncode: Optional[int]

The child return code, set by poll() and wait() (and indirectly by communicate()). A None value indicates that the process hasn’t terminated yet.

A negative value -N indicates that the child was terminated by signal N (POSIX only).

__dict__ = mappingproxy({'__module__': 'utopya.task', '__doc__': 'A wrapper around multiprocessing.Process that replicates (wide parts of)\n    the interface of subprocess.Popen.\n    ', '__init__': <function PopenMPProcess.__init__>, '_prepare_target_args': <function PopenMPProcess._prepare_target_args>, '__del__': <function PopenMPProcess.__del__>, '__str__': <function PopenMPProcess.__str__>, 'poll': <function PopenMPProcess.poll>, 'wait': <function PopenMPProcess.wait>, 'communicate': <function PopenMPProcess.communicate>, 'send_signal': <function PopenMPProcess.send_signal>, 'terminate': <function PopenMPProcess.terminate>, 'kill': <function PopenMPProcess.kill>, 'args': <property object>, 'kwargs': <property object>, 'stdin': <property object>, 'stdout': <property object>, 'stderr': <property object>, 'pid': <property object>, 'returncode': <property object>, '__dict__': <attribute '__dict__' of 'PopenMPProcess' objects>, '__weakref__': <attribute '__weakref__' of 'PopenMPProcess' objects>, '__annotations__': {}})
__module__ = 'utopya.task'
__weakref__

list of weak references to the object (if defined)

class utopya.task.MPProcessTask(*, setup_func: Optional[Callable] = None, setup_kwargs: Optional[dict] = None, worker_kwargs: Optional[dict] = None, **task_kwargs)[source]

Bases: utopya.task.WorkerTask

A WorkerTask specialization that uses multiprocessing.Process instead of subprocess.Popen.

It is mostly equivalent to WorkerTask but adjusts the private methods that take care of spawning the actual process and setting up the stream readers, such that the particularities of the PopenMPProcess wrapper are accounted for.

Initialize a WorkerTask.

This is a specialization of Task for use in the WorkerManager.

Parameters
  • setup_func (Callable, optional) – The setup function to use before this task is spawned; this allows to dynamically handle the worker arguments. It is called with the worker_kwargs keyword argument, containing the dict passed here. Additionally, setup_kwargs are unpacked into the funtion call. The function should return a dict that is then used as worker_kwargs for the individual task.

  • setup_kwargs (dict, optional) – The keyword arguments unpacked into the setup_func call.

  • worker_kwargs (dict, optional) – The keyword arguments needed to spawn the worker. Note that these are also passed to setup_func and, if a setup_func is given, the return value of that function will be used for the worker_kwargs.

  • **task_kwargs – Arguments to be passed to __init__(), including the callbacks dictionary among other things.

Raises

ValueError – If neither setup_func nor worker_kwargs were given, thus lacking information on how to spawn the worker.

setup_func
setup_kwargs
worker_kwargs
streams
profiling
__dict__ = mappingproxy({'__module__': 'utopya.task', '__doc__': 'A WorkerTask specialization that uses multiprocessing.Process instead\n    of subprocess.Popen.\n\n    It is mostly equivalent to :py:class:`~utopya.task.WorkerTask` but adjusts\n    the private methods that take care of spawning the actual process and\n    setting up the stream readers, such that the particularities of the\n    :py:class:`~utopya.task.PopenMPProcess` wrapper are accounted for.\n    ', '_spawn_process': <function MPProcessTask._spawn_process>, '_setup_stream_reader': <function MPProcessTask._setup_stream_reader>, '_stop_stream_reader': <function MPProcessTask._stop_stream_reader>, '__dict__': <attribute '__dict__' of 'MPProcessTask' objects>, '__weakref__': <attribute '__weakref__' of 'MPProcessTask' objects>, '__annotations__': {}})
__module__ = 'utopya.task'
__weakref__

list of weak references to the object (if defined)

class utopya.task.TaskList[source]

Bases: object

The TaskList stores Task objects in it, ensuring that none is in there twice and allows to lock it to prevent adding new tasks.

Initialize an empty TaskList.

__init__()[source]

Initialize an empty TaskList.

__len__() int[source]

The length of the TaskList.

__contains__(val: utopya.task.Task) bool[source]

Checks if the given object is contained in this TaskList.

__getitem__(idx: int) utopya.task.Task[source]

Returns the item at the given index in the TaskList.

__iter__()[source]

Iterate over the TaskList

__eq__(other) bool[source]

Tests for equality of the task list by forwarding to _l attribute

lock()[source]

If called, the TaskList becomes locked and allows no further calls to the append method.

append(val: utopya.task.Task)[source]

Append a Task object to this TaskList

Parameters

val (Task) – The task to add

Raises
  • RuntimeError – If TaskList object was locked

  • TypeError – Tried to add a non-Task type object

  • ValueError – Task already added to this TaskList

__add__(tasks: Sequence[utopya.task.Task])[source]

Appends all the tasks in the given iterable to the task list

__dict__ = mappingproxy({'__module__': 'utopya.task', '__doc__': 'The TaskList stores Task objects in it, ensuring that none is in there\n    twice and allows to lock it to prevent adding new tasks.\n    ', '__init__': <function TaskList.__init__>, '__len__': <function TaskList.__len__>, '__contains__': <function TaskList.__contains__>, '__getitem__': <function TaskList.__getitem__>, '__iter__': <function TaskList.__iter__>, '__eq__': <function TaskList.__eq__>, 'lock': <function TaskList.lock>, 'append': <function TaskList.append>, '__add__': <function TaskList.__add__>, '__dict__': <attribute '__dict__' of 'TaskList' objects>, '__weakref__': <attribute '__weakref__' of 'TaskList' objects>, '__hash__': None, '__annotations__': {}})
__hash__ = None
__module__ = 'utopya.task'
__weakref__

list of weak references to the object (if defined)

utopya.testtools module

Tools that help testing models.

This mainly supplies the ModelTest class, which is a specialization of the Model for usage in tests.

class utopya.testtools.ModelTest(model_name: str, *, test_file: Optional[str] = None, use_tmpdir: bool = True, **kwargs)[source]

Bases: utopya.model.Model

A class to use for testing Utopia models.

It attaches to a certain model and makes it easy to load config files with which test should be carried out.

Initialize the ModelTest class for the given model name.

This is basically like the base class __init__ just that it sets the default value of use_tmpdir to True and renames TODO

Parameters
  • model_name (str) – Name of the model to test

  • test_file (str, optional) – The file this ModelTest is used in. If given, will look for config files relative to the folder this file is located in.

  • use_tmpdir (bool, optional) – Whether to use a temporary directory to write data to. The default value can be set here; but the flag can be overwritten in the create_mv and create_run_load methods. For false, the regular model output directory is used.

Raises

ValueError – If the directory extracted from test_file is invalid

__init__(model_name: str, *, test_file: Optional[str] = None, use_tmpdir: bool = True, **kwargs)[source]

Initialize the ModelTest class for the given model name.

This is basically like the base class __init__ just that it sets the default value of use_tmpdir to True and renames TODO

Parameters
  • model_name (str) – Name of the model to test

  • test_file (str, optional) – The file this ModelTest is used in. If given, will look for config files relative to the folder this file is located in.

  • use_tmpdir (bool, optional) – Whether to use a temporary directory to write data to. The default value can be set here; but the flag can be overwritten in the create_mv and create_run_load methods. For false, the regular model output directory is used.

Raises

ValueError – If the directory extracted from test_file is invalid

__module__ = 'utopya.testtools'

utopya.tools module

For functions that are not bound to classes, but generally useful.

utopya.tools.recursive_update(d: dict, u: dict) dict[source]

Update dict d with values from dict u.

NOTE: This method does _not_ copy mutable entries! If you want to assure that the contents of u cannot be changed by changing its counterparts in the updated d, you need to supply a deep copy of u to this method.

Parameters
  • d (dict) – The dict to be updated

  • u (dict) – The dict used to update

Returns

updated version of d

Return type

dict

utopya.tools.load_selected_keys(src: dict, *, add_to: dict, keys: Sequence[Tuple[str, type, bool]], err_msg_prefix: Optional[str] = None, prohibit_unexpected: bool = True) None[source]

Loads (only) selected keys from dict src into dict add_to.

Parameters
  • src (dict) – The dict to load values from

  • add_to (dict) – The dict to load values into

  • keys (Sequence[Tuple[str, type, bool]]) – Which keys to load, given as sequence of (key, allowed types, [required=False]) tuples.

  • description (str) – A description string, used in error message

  • prohibit_unexpected (bool, optional) – Whether to raise on keys that were unexpected, i.e. not given in keys argument.

Raises
  • KeyError – On missing key in src

  • TypeError – On bad type of value in src

  • ValueError – On unexpected keys in src

utopya.tools.add_item(value, *, add_to: dict, key_path: Sequence[str], value_func: Optional[Callable] = None, is_valid: Optional[Callable] = None, ErrorMsg: Optional[Callable] = None) None[source]

Adds the given value to the add_to dict, traversing the given key path. This operation happens in-place.

Parameters
  • value – The value of what is to be stored. If this is a callable, the result of the call is stored.

  • add_to (dict) – The dict to add the entry to

  • key_path (Sequence[str]) – The path at which to add it

  • value_func (Callable, optional) – If given, calls it with value as argument and uses the return value to add to the dict

  • is_valid (Callable, optional) – Used to determine whether value is valid or not; should take single positional argument, return bool

  • ErrorMsg (Callable, optional) – A raisable object that prints an error message; gets passed value as positional argument.

Raises

Exception – type depends on specified ErrorMsg callable

utopya.tools.format_time(duration: Union[float, datetime.timedelta], *, ms_precision: int = 0, max_num_parts: Optional[int] = None) str[source]

Given a duration (in seconds), formats it into a string.

The formatting divisors are: days, hours, minutes, seconds

If ms_precision > 0 and duration < 60, decimal places will be shown for the seconds.

Parameters
  • duration (Union[float, timedelta]) – The duration in seconds to format into a duration string; it can also be a timedelta object.

  • ms_precision (int, optional) – The precision of the seconds slot

  • max_num_parts (int, optional) – How many parts to include when creating the formatted time string. For example, if the time consists of the parts seconds, minutes, and hours, and the argument is 2, only the hours and minutes parts will be shown. If None, all parts are included.

Returns

The formatted duration string

Return type

str

utopya.tools.fill_line(s: str, *, num_cols: int = 79, fill_char: str = ' ', align: str = 'left') str[source]

Extends the given string such that it fills a whole line of num_cols columns.

Parameters
  • s (str) – The string to extend to a whole line

  • num_cols (int, optional) – The number of colums of the line; defaults to the number of TTY columns or – if those are not available – 79

  • fill_char (str, optional) – The fill character

  • align (str, optional) – The alignment. Can be: ‘left’, ‘right’, ‘center’ or the one-letter equivalents.

Returns

The string of length num_cols

Return type

str

Raises

ValueError – For invalid align or fill_char argument

utopya.tools.center_in_line(s: str, *, num_cols: int = 79, fill_char: str = '·', spacing: int = 1) str[source]

Shortcut for a common fill_line use case.

Parameters
  • s (str) – The string to center in the line

  • num_cols (int, optional) – The number of columns in the line

  • fill_char (str, optional) – The fill character

  • spacing (int, optional) – The spacing around the string s

Returns

The string centered in the line

Return type

str

utopya.tools.pprint(obj: Any, **kwargs)[source]

Prints a “pretty” string representation of the given object.

Parameters
  • obj (Any) – The object to print

  • **kwargs – Passed to print

utopya.tools.pformat(obj) str[source]

Creates a “pretty” string representation of the given object.

This is achieved by creating a yaml representation.

utopya.tools.open_folder(path: str)[source]

Opens the folder at the specified path.

Note

This refuses to open a file.

Parameters

path (str) – The absolute path to the folder that is to be opened. The home directory ~ is expanded.

utopya.tools.parse_si_multiplier(s: str) int[source]

Parses a string like 1.23M or -2.34 k into an integer.

If it is a string, parses the SI multiplier and returns the appropriate integer for use as number of simulation steps. Supported multipliers are k, M, G and T. These need to be used as the suffix of the string.

Note

This is only intended to be used with integer values and does not support float values like 1u.

The used regex can be found here.

Parameters

s (str) – A string representing an integer number, potentially including a supported SI multiplier as suffix.

Returns

The parsed number of steps as integer. If the value has decimal

places, integer rounding is applied.

Return type

int

Raises

ValueError – Upon string that does not match the expected pattern

utopya.tools.parse_num_steps(N: Union[str, int], *, raise_if_negative: bool = True) int[source]

Given a string like 1.23M or an integer, prepares the num_steps argument for a single universe simulation.

For string arguments, uses parse_si_multiplier() for string parsing. If that fails, attempts to read it in float notation by calling int(float(N)).

Note

This function always applies integer rounding.

Parameters
  • N (Union[str, int]) – The num_steps argument as a string or integer.

  • raise_if_negative (bool, optional) – Whether to raise an error if the value is negative.

Returns

The parsed value for num_steps

Return type

int

Raises

ValueError – Result invalid, i.e. not parseable or of negative value.

utopya.workermanager module

The WorkerManager class.

class utopya.workermanager.WorkerManager(num_workers: Union[int, str] = 'auto', poll_delay: float = 0.05, lines_per_poll: int = 20, periodic_task_callback: Optional[int] = None, QueueCls=<class 'queue.Queue'>, reporter: Optional[utopya.reporter.WorkerManagerReporter] = None, rf_spec: Optional[Dict[str, Union[str, List[str]]]] = None, save_streams_on: Sequence[str] = (), nonzero_exit_handling: str = 'ignore', interrupt_params: Optional[dict] = None, cluster_mode: bool = False, resolved_cluster_params: Optional[dict] = None)[source]

Bases: object

The WorkerManager class orchestrates WorkerTask objects: setting them up, invoking them, tracking their progress, and starting new workers if previous workers finished.

pending_exceptions

A (FiFo) queue of Exception objects that will be handled by the WorkerManager during working. This is the interface that allows for other threads that have access to the WorkerManager to add an exception and let it be handled in the main thread.

Type

queue.Queue

rf_spec

The report format specifications that are used throughout the WorkerManager. These are invoked at different points of the operation of the WorkerManager: while_working, after_work, after_abort, task_spawn, task_finished.

Type

dict

times

Holds profiling information for the WorkerManager

Type

dict

Initialize the worker manager.

Parameters
  • num_workers (Union[int, str], optional) – The number of workers that can work in parallel. If ‘auto’ (default), uses os.cpu_count(). If below zero, deduces abs(num_workers) from the CPU count.

  • poll_delay (float, optional) – How long (in seconds) the delay between worker polls should be. For too small delays (<0.01), the CPU load will become significant.

  • lines_per_poll (int, optional) – How many lines to read from each stream during polling of the tasks. This value should not be too large, otherwise the polling is delayed by too much. By setting it to -1, all available lines are read.

  • periodic_task_callback (int, optional) – If given, an additional task callback will be invoked after every periodic_task_callback poll events.

  • QueueCls (Class, optional) – Which class to use for the Queue. Defaults to FiFo.

  • reporter (WorkerManagerReporter, optional) – The reporter associated with this WorkerManager, reporting on the progress.

  • rf_spec (Dict[str, Union[str, List[str]]], optional) –

    The names of report formats that should be invoked at different points of the WorkerManager’s operation. Possible keys: before_working, while_working, after_work, after_abort, task_spawn, task_finished. All other keys are ignored.

    The values of the dict can be either strings or lists of strings, where the strings always refer to report formats registered with the WorkerManagerReporter. This argument updates the default report format specifications.

  • save_streams_on (Sequence[str], optional) – On which events to invoke save_streams() during work. Should be a sequence containing one or both of the keys on_monitor_update, periodic_callback.

  • nonzero_exit_handling (str, optional) – How to react if a WorkerTask exits with a non-zero exit code. For ‘ignore’, nothing happens. For ‘warn’, a warning is printed and the last 5 lines of the log are shown. For ‘raise’, the last 20 lines of the log is shown, all other tasks are terminated, and the WorkerManager exits with the same exit code as the WorkerTask exited with. Note that ‘warn’ will not lead to any messages if the worker died by SIGTERM, which presumable originated from a fulfilled stop condition. Use ‘warn_all’ to also receive warnings in this case.

  • interrupt_params (dict, optional) –

    Parameters that determine how the WorkerManager behaves when receiving KeyboardInterrupts during working. Possible keys:

    send_signal: Which signal to send to the workers. Can

    be SIGINT (default), SIGTERM, SIGKILL, or any valid signal as integer.

    grace_period: how long to wait for the other workers to

    gracefully shut down. After this period (in seconds), the workers will be killed via SIGKILL. Default is 5s.

    exit: whether to sys.exit at the end of start_working.

    Default is True.

  • cluster_mode (bool, optional) – Whether similar tasks to those that are managed by this WorkerManager are, at the same time, worked on by other WorkerManager. This is relevant because the output of files might be affected by whether another WorkerManager instance is currently working on the same output directory. Also, in the future, this argument might be used to communicate between nodes.

  • resolved_cluster_params (dict, optional) – The corresponding cluster parameters.

Raises

ValueError – For too negative num_workers argument

__init__(num_workers: Union[int, str] = 'auto', poll_delay: float = 0.05, lines_per_poll: int = 20, periodic_task_callback: Optional[int] = None, QueueCls=<class 'queue.Queue'>, reporter: Optional[utopya.reporter.WorkerManagerReporter] = None, rf_spec: Optional[Dict[str, Union[str, List[str]]]] = None, save_streams_on: Sequence[str] = (), nonzero_exit_handling: str = 'ignore', interrupt_params: Optional[dict] = None, cluster_mode: bool = False, resolved_cluster_params: Optional[dict] = None)[source]

Initialize the worker manager.

Parameters
  • num_workers (Union[int, str], optional) – The number of workers that can work in parallel. If ‘auto’ (default), uses os.cpu_count(). If below zero, deduces abs(num_workers) from the CPU count.

  • poll_delay (float, optional) – How long (in seconds) the delay between worker polls should be. For too small delays (<0.01), the CPU load will become significant.

  • lines_per_poll (int, optional) – How many lines to read from each stream during polling of the tasks. This value should not be too large, otherwise the polling is delayed by too much. By setting it to -1, all available lines are read.

  • periodic_task_callback (int, optional) – If given, an additional task callback will be invoked after every periodic_task_callback poll events.

  • QueueCls (Class, optional) – Which class to use for the Queue. Defaults to FiFo.

  • reporter (WorkerManagerReporter, optional) – The reporter associated with this WorkerManager, reporting on the progress.

  • rf_spec (Dict[str, Union[str, List[str]]], optional) –

    The names of report formats that should be invoked at different points of the WorkerManager’s operation. Possible keys: before_working, while_working, after_work, after_abort, task_spawn, task_finished. All other keys are ignored.

    The values of the dict can be either strings or lists of strings, where the strings always refer to report formats registered with the WorkerManagerReporter. This argument updates the default report format specifications.

  • save_streams_on (Sequence[str], optional) – On which events to invoke save_streams() during work. Should be a sequence containing one or both of the keys on_monitor_update, periodic_callback.

  • nonzero_exit_handling (str, optional) – How to react if a WorkerTask exits with a non-zero exit code. For ‘ignore’, nothing happens. For ‘warn’, a warning is printed and the last 5 lines of the log are shown. For ‘raise’, the last 20 lines of the log is shown, all other tasks are terminated, and the WorkerManager exits with the same exit code as the WorkerTask exited with. Note that ‘warn’ will not lead to any messages if the worker died by SIGTERM, which presumable originated from a fulfilled stop condition. Use ‘warn_all’ to also receive warnings in this case.

  • interrupt_params (dict, optional) –

    Parameters that determine how the WorkerManager behaves when receiving KeyboardInterrupts during working. Possible keys:

    send_signal: Which signal to send to the workers. Can

    be SIGINT (default), SIGTERM, SIGKILL, or any valid signal as integer.

    grace_period: how long to wait for the other workers to

    gracefully shut down. After this period (in seconds), the workers will be killed via SIGKILL. Default is 5s.

    exit: whether to sys.exit at the end of start_working.

    Default is True.

  • cluster_mode (bool, optional) – Whether similar tasks to those that are managed by this WorkerManager are, at the same time, worked on by other WorkerManager. This is relevant because the output of files might be affected by whether another WorkerManager instance is currently working on the same output directory. Also, in the future, this argument might be used to communicate between nodes.

  • resolved_cluster_params (dict, optional) – The corresponding cluster parameters.

Raises

ValueError – For too negative num_workers argument

property tasks: utopya.task.TaskList

The list of all tasks.

property task_queue: queue.Queue

The task queue.

property task_count: int

Returns the number of tasks that this manager ever took care of. Careful: This is NOT the current number of tasks in the queue!

property num_workers: int

The number of workers that may work in parallel

property active_tasks: List[utopya.task.WorkerTask]

The list of currently active tasks.

Note that this information might not be up-to-date; a process might quit just after the list has been updated.

property num_finished_tasks: int

The number of finished tasks. Incremented whenever a task leaves the active_tasks list, regardless of its exit status.

property num_free_workers: int

Returns the number of free workers.

property poll_delay: float

Returns the delay between two polls

property stop_conditions: Set[utopya.stopcond.StopCondition]

All stop conditions that were ever passed to start_working() during the life time of this WorkerManager.

property nonzero_exit_handling: str

Behavior upon a worker exiting with a non-zero exit code.

  • with ignore, nothing happens

  • with warn, a warning is printed

  • with raise, the log is shown and the WorkerManager exits with the same exit code as the corresponding WorkerTask exited with.

property reporter: Optional[utopya.reporter.WorkerManagerReporter]

The associated WorkerManagerReporter or None, if no reporter is set.

property cluster_mode: bool

Returns whether the WorkerManager is in cluster mode

property resolved_cluster_params: dict

Returns a copy of the cluster configuration with all parameters resolved. This makes some additional keys available on the top level.

add_task(*, TaskCls: type = <class 'utopya.task.WorkerTask'>, **task_kwargs) utopya.task.WorkerTask[source]

Adds a task to the WorkerManager.

Parameters
  • TaskCls (type, optional) – The WorkerTask-like type to use

  • **task_kwargs – All arguments needed for WorkerTask initialization. See utopya.task.WorkerTask for all valid arguments.

Returns

The created WorkerTask object

Return type

WorkerTask

start_working(*, detach: bool = False, timeout: Optional[float] = None, stop_conditions: Optional[Sequence[utopya.stopcond.StopCondition]] = None, post_poll_func: Optional[Callable] = None) None[source]

Upon call, all enqueued tasks will be worked on sequentially.

Parameters
  • detach (bool, optional) – If False (default), the WorkerManager will block here, as it continuously polls the workers and distributes tasks.

  • timeout (float, optional) – If given, the number of seconds this work session is allowed to take. Workers will be aborted if the number is exceeded. Note that this is not measured in CPU time, but the host systems wall time.

  • stop_conditions (Sequence[StopCondition], optional) – During the run these StopCondition objects will be checked

  • post_poll_func (Callable, optional) – If given, this is called after all workers have been polled. It can be used to perform custom actions during a the polling loop.

Raises
  • NotImplementedError – for detach True

  • ValueError – For invalid (i.e., negative) timeout value

  • WorkerManagerTotalTimeout – Upon a total timeout

__dict__ = mappingproxy({'__module__': 'utopya.workermanager', '__doc__': 'The WorkerManager class orchestrates :py:class:`~utopya.task.WorkerTask`\n    objects: setting them up, invoking them, tracking their progress, and\n    starting new workers if previous workers finished.\n\n    Attributes:\n        pending_exceptions (queue.Queue): A (FiFo) queue of Exception objects\n            that will be handled by the WorkerManager during working. This is\n            the interface that allows for other threads that have access to\n            the WorkerManager to add an exception and let it be handled in the\n            main thread.\n        rf_spec (dict): The report format specifications that are used\n            throughout the WorkerManager. These are invoked at different points\n            of the operation of the WorkerManager: ``while_working``,\n            ``after_work``, ``after_abort``, ``task_spawn``, ``task_finished``.\n        times (dict): Holds profiling information for the WorkerManager\n    ', '__init__': <function WorkerManager.__init__>, 'tasks': <property object>, 'task_queue': <property object>, 'task_count': <property object>, 'num_workers': <property object>, 'active_tasks': <property object>, 'num_finished_tasks': <property object>, 'num_free_workers': <property object>, 'poll_delay': <property object>, 'stop_conditions': <property object>, 'nonzero_exit_handling': <property object>, 'reporter': <property object>, 'cluster_mode': <property object>, 'resolved_cluster_params': <property object>, 'add_task': <function WorkerManager.add_task>, 'start_working': <function WorkerManager.start_working>, '_invoke_report': <function WorkerManager._invoke_report>, '_grab_task': <function WorkerManager._grab_task>, '_poll_workers': <function WorkerManager._poll_workers>, '_check_stop_conds': <function WorkerManager._check_stop_conds>, '_invoke_periodic_callbacks': <function WorkerManager._invoke_periodic_callbacks>, '_signal_workers': <function WorkerManager._signal_workers>, '_handle_pending_exceptions': <function WorkerManager._handle_pending_exceptions>, '__dict__': <attribute '__dict__' of 'WorkerManager' objects>, '__weakref__': <attribute '__weakref__' of 'WorkerManager' objects>, '__annotations__': {}})
__module__ = 'utopya.workermanager'
__weakref__

list of weak references to the object (if defined)

exception utopya.workermanager.WorkerManagerError[source]

Bases: BaseException

The base exception class for WorkerManager errors

__module__ = 'utopya.workermanager'
__weakref__

list of weak references to the object (if defined)

exception utopya.workermanager.WorkerManagerTotalTimeout[source]

Bases: utopya.workermanager.WorkerManagerError

Raised when a total timeout occurred

__module__ = 'utopya.workermanager'
exception utopya.workermanager.WorkerTaskError[source]

Bases: utopya.workermanager.WorkerManagerError

Raised when there was an error in a WorkerTask

__module__ = 'utopya.workermanager'
exception utopya.workermanager.WorkerTaskNonZeroExit(task: utopya.task.WorkerTask, *args, **kwargs)[source]

Bases: utopya.workermanager.WorkerTaskError

Can be raised when a WorkerTask exited with a non-zero exit code.

__module__ = 'utopya.workermanager'
__init__(task: utopya.task.WorkerTask, *args, **kwargs)[source]
__str__() str[source]

Returns information on the error

exception utopya.workermanager.WorkerTaskStopConditionFulfilled(task: utopya.task.WorkerTask, *args, **kwargs)[source]

Bases: utopya.workermanager.WorkerTaskNonZeroExit

An exception that is raised when a worker-specific stop condition was fulfilled. This allows being handled separately to other non-zero exits.

__module__ = 'utopya.workermanager'

utopya.yaml module

Takes care of the YAML setup for Utopya