Source code for maicos.core.base

#!/usr/bin/env python
# -*- Mode: python; tab-width: 4; indent-tabs-mode:nil; coding:utf-8 -*-
#
# Copyright (c) 2024 Authors and contributors
# (see the AUTHORS.rst file for the full list of names)
#
# Released under the GNU Public Licence, v3 or any higher version
# SPDX-License-Identifier: GPL-3.0-or-later
"""Base class for building Analysis classes."""
import inspect
import logging
import warnings
from datetime import datetime
from typing import Callable, Dict, List, Optional, Tuple, Union

import MDAnalysis as mda
import MDAnalysis.analysis.base
import numpy as np
from MDAnalysis.analysis.base import Results
from MDAnalysis.lib.log import ProgressBar
from tqdm.contrib.logging import logging_redirect_tqdm
from typing_extensions import Self

from .._version import get_versions
from ..lib.math import center_cluster, new_mean, new_variance
from ..lib.util import (
    atomgroup_header,
    correlation_analysis,
    get_center,
    get_cli_input,
    maicos_banner,
    render_docs,
)


__version__ = get_versions()["version"]
del get_versions

logger = logging.getLogger(__name__)


class _Runner:
    """Private Runner class that provides a common ``run`` method.

    Class is used inside ``AnalysisBase`` as well as in ``AnalysisCollection``
    """

    def _run(
        self,
        analysis_instances: Tuple["AnalysisBase", ...],
        start: Optional[int] = None,
        stop: Optional[int] = None,
        step: Optional[int] = None,
        frames: Optional[int] = None,
        verbose: Optional[bool] = None,
        progressbar_kwargs: Optional[dict] = None,
    ) -> Self:
        self._run_locals = locals()

        logger.info("Choosing frames to analyze")
        for analysis_object in analysis_instances:
            analysis_object._setup_frames(
                analysis_object._trajectory,
                start=start,
                stop=stop,
                step=step,
                frames=frames,
            )

        logger.info(maicos_banner(frame_char="#", version=f"v{__version__}"))
        logger.info("Starting preparation")

        for analysis_object in analysis_instances:
            analysis_object._call_prepare()

        if progressbar_kwargs is None:
            progressbar_kwargs = {}

        for i, ts in enumerate(
            ProgressBar(
                analysis_instances[0]._sliced_trajectory,
                verbose=verbose,
                **progressbar_kwargs,
            )
        ):
            ts_original = ts.copy()

            for analysis_object in analysis_instances:
                analysis_object._call_single_frame(ts=ts, current_frame_index=i)
                ts = ts_original

        logger.info("Finishing up")
        for analysis_object in analysis_instances:
            analysis_object._call_conclude()

        return self


[docs] @render_docs class AnalysisBase(_Runner, MDAnalysis.analysis.base.AnalysisBase): """Base class derived from MDAnalysis for defining multi-frame analysis. The class is designed as a template for creating multi-frame analyses. This class will automatically take care of setting up the trajectory reader for iterating, and it offers to show a progress meter. Computed results are stored inside the :attr:`results` attribute. To define a new analysis, `AnalysisBase` needs to be subclassed and :meth:`_single_frame` must be defined. It is also possible to define :meth:`_prepare` and :meth:`_conclude` for pre- and post-processing. All results should be stored as attributes of the :class:`MDAnalysis.analysis.base.Results` container. During the analysis, the correlation time of an observable can be estimated to ensure that calculated errors are reasonable. For this, the :meth:`_single_frame` method has to return a single :obj:`float`. For details on the computation of the correlation and its further analysis refer to :func:`maicos.lib.util.correlation_analysis`. Parameters ---------- ${ATOMGROUP_PARAMETER} ${BASE_CLASS_PARAMETERS} ${WRAP_COMPOUND_PARAMETER} Attributes ---------- ${ATOMGROUP_PARAMETER} _universe : MDAnalysis.core.universe.Universe The Universe the AtomGroup belong to _trajectory : MDAnalysis.coordinates.base.ReaderBase The trajectory the AtomGroup belong to times : numpy.ndarray array of Timestep times. Only exists after calling :meth:`AnalysisBase.run` frames : numpy.ndarray array of Timestep frame indices. Only exists after calling :meth:`AnalysisBase.run` _frame_index : int index of the frame currently analysed _index : int Number of frames already analysed (same as _frame_index + 1) results : MDAnalysis.analysis.base.Results results of calculation are stored after call to :meth:`AnalysisBase.run` _obs : MDAnalysis.analysis.base.Results Observables of the current frame _obs.box_center : numpy.ndarray Center of the simulation cell of the current frame sums : MDAnalysis.analysis.base.Results Sum of the observables across frames. Keys are the same as :attr:`_obs`. means : MDAnalysis.analysis.base.Results Means of the observables. Keys are the same as :attr:`_obs`. sems : MDAnalysis.analysis.base.Results Standard errors of the mean of the observables. Keys are the same as :attr:`_obs` corrtime : float The correlation time of the analysed data. For details on how this is calculated see :func:`maicos.lib.util.correlation_analysis`. Raises ------ ValueError If any of the provided AtomGroups (`atomgroup` or `refgroup`) does not contain any atoms. Example ------- To write your own analysis module you can use the example given below. As with all MAICoS modules, this inherits from the :class:`maicos.core.base.AnalysisBase` class. The example will calculate the average box volume and stores the result within the ``result`` object of the class. >>> import logging >>> from typing import Optional >>> import MDAnalysis as mda >>> import numpy as np >>> from maicos.core import AnalysisBase >>> from maicos.lib.util import render_docs Creating a logger makes debugging easier. >>> logger = logging.getLogger(__name__) In the following the analysis module itself. Due to the similar structure of all MAICoS modules you can render the parameters using the :func:`maicos.lib.util.render_docs` decorator. The decorator will replace special keywords with a leading ``$`` with the actual docstring as defined in :attr:`maicos.lib.util.DOC_DICT`. >>> @render_docs ... class NewAnalysis(AnalysisBase): ... '''Analysis class calcuting the average box volume.''' ... ... def __init__( ... self, ... atomgroup: mda.AtomGroup, ... concfreq: int = 0, ... temperature: float = 300, ... output: str = "outfile.dat", ... ): ... super().__init__( ... atomgroup=atomgroup, ... refgroup=None, ... unwrap=False, ... jitter=0.0, ... wrap_compound="atoms", ... concfreq=concfreq, ... ) ... ... self.temperature = temperature ... self.output = output ... ... def _prepare(self): ... '''Set things up before the analysis loop begins.''' ... # self.atomgroup refers to the provided `atomgroup` ... # self._universe refers to full universe of given `atomgroup` ... self.volume = 0 ... ... def _single_frame(self): ... '''Calculate data from a single frame of trajectory. ... ... Don't worry about normalising, just deal with a single frame. ... ''' ... # Current frame index: self._frame_index ... # Current timestep object: self._ts ... ... volume = self._ts.volume ... self.volume += volume ... ... # Eeach module should return a characteristic scalar which is used ... # by MAICoS to estimate correlations of an Analysis. ... return volume ... ... def _conclude(self): ... '''Finalise the results you've gathered. ... ... Called at the end of the run() method to finish everything up. ... ''' ... self.results.volume = self.volume / self.n_frames ... logger.info( ... "Average volume of the simulation box " ... f"{self.results.volume:.2f} ų" ... ) ... ... def save(self) -> None: ... '''Save results of analysis to file specified by ``output``. ... ... Called at the end of the run() method after _conclude. ... ''' ... self.savetxt( ... self.output, np.array([self.results.volume]), columns="volume / ų" ... ) ... Afterwards the new analysis can be run like this >>> import MDAnalysis as mda >>> from MDAnalysisTests.datafiles import TPR, XTC >>> u = mda.Universe(TPR, XTC) >>> na = NewAnalysis(u.atoms) >>> _ = na.run(start=0, stop=10) >>> round(na.results.volume, 2) 362631.65 Results can also be accessed by key >>> round(na.results["volume"], 2) 362631.65 """ def __init__( self, atomgroup: mda.AtomGroup, unwrap: bool, refgroup: Union[None, mda.AtomGroup], jitter: float, concfreq: int, wrap_compound: str, ) -> None: self.atomgroup = atomgroup if self.atomgroup.n_atoms == 0: raise ValueError("The provided `atomgroup` does not contain any atoms.") self._universe = atomgroup.universe self._trajectory = self._universe.trajectory self.refgroup = refgroup self.unwrap = unwrap self.jitter = jitter self.concfreq = concfreq if wrap_compound not in [ "atoms", "group", "residues", "segments", "molecules", "fragments", ]: raise ValueError( "Unrecognized `wrap_compound` definition " f"{wrap_compound}: \nPlease use " "one of 'atoms', 'group', 'residues', " "'segments', 'molecules', or 'fragments'." ) self.wrap_compound = wrap_compound if self.unwrap and self._universe.dimensions is None: raise ValueError( "Universe does not have `dimensions` and can't be unwrapped!" ) if self.unwrap and self.wrap_compound == "atoms": logger.warning( "Unwrapping in combination with the " "`wrap_compound='atoms` is superfluous. " "`unwrap` will be set to `False`." ) self.unwrap = False if self.refgroup is not None and self.refgroup.n_atoms == 0: raise ValueError("The provided `refgroup` does not contain any atoms.") self.module_has_save = callable(getattr(self.__class__, "save", None)) super().__init__(trajectory=self._trajectory) @property def box_center(self) -> np.ndarray: """Center of the simulation cell.""" return self._universe.dimensions[:3] / 2 def _prepare(self) -> None: """Set things up before the analysis loop begins""" pass # pylint: disable=unnecessary-pass def _call_prepare(self) -> None: """Base method wrapping all _prepare logic into a single call.""" if self.refgroup is not None: if ( not hasattr(self.refgroup, "masses") or np.sum(self.refgroup.masses) == 0 ): logger.warning( "No masses available in refgroup, falling back " "to center of geometry" ) self.ref_weights = np.ones_like(self.refgroup.atoms) else: self.ref_weights = self.refgroup.masses self._prepare() # Log bin information if a spatial analysis is run. if hasattr(self, "n_bins"): logger.info(f"Using {self.n_bins} bins.") self.timeseries = np.zeros(self.n_frames) logger.info(f"Starting analysis loop over {self.n_frames} trajectory frames.") def _single_frame(self) -> Union[None, float]: """Calculate data from a single frame of trajectory Don't worry about normalising, just deal with a single frame. """ raise NotImplementedError("Only implemented in child classes") def _call_single_frame(self, ts, current_frame_index) -> None: """Base method wrapping all single_frame logic into a single call.""" compatible_types = [np.ndarray, float, int, list, np.float_, np.int_] self._frame_index = current_frame_index self._index = self._frame_index + 1 self._ts = ts self.frames[current_frame_index] = ts.frame self.times[current_frame_index] = ts.time # Before we do any coordinate transformation we first unwrap the system to # avoid artifacts of later wrapping. if self.unwrap: self._universe.atoms.unwrap(compound=self.wrap_compound) if self.refgroup is not None: com_refgroup = center_cluster(self.refgroup, self.ref_weights) t = self.box_center - com_refgroup self._universe.atoms.translate(t) # If universe has a cell we wrap the compound into the primary unit cell to # use all compounds for the analysis. if self._universe.dimensions is not None: self._universe.atoms.wrap(compound=self.wrap_compound) if self.jitter != 0.0: ts.positions += np.random.random(size=(len(ts.positions), 3)) * self.jitter self._obs = Results() self.timeseries[current_frame_index] = self._single_frame() # This try/except block is used because it will fail only once and is # therefore not a performance issue like a if statement would be. try: for key in self._obs.keys(): if type(self._obs[key]) is list: self._obs[key] = np.array(self._obs[key]) old_mean = self.means[key] # type: ignore old_var = self.sems[key] ** 2 * (self._index - 1) # type: ignore self.means[key] = new_mean( # type: ignore self.means[key], self._obs[key], self._index # type: ignore ) # type: ignore self.sems[key] = np.sqrt( # type: ignore new_variance( old_var, old_mean, self.means[key], # type: ignore self._obs[key], self._index, ) / self._index ) self.sums[key] += self._obs[key] # type: ignore except AttributeError: with logging_redirect_tqdm(): logger.info("Preparing error estimation.") # the means and sems are not yet defined. We initialize the means with # the data from the first frame and set the sems to zero (with the # correct shape). self.sums = self._obs.copy() self.means = self._obs.copy() self.sems = Results() for key in self._obs.keys(): if type(self._obs[key]) not in compatible_types: raise TypeError(f"Obervable {key} has uncompatible type.") self.sems[key] = np.zeros(np.shape(self._obs[key])) if self.concfreq and self._index % self.concfreq == 0 and self._frame_index > 0: self._conclude() if self.module_has_save: self.save() def _conclude(self) -> None: """Finalize the results you've gathered. Called at the end of the :meth:`run` method to finish everything up. """ pass # pylint: disable=unnecessary-pass def _call_conclude(self) -> None: """Base method wrapping all _conclude logic into a single call.""" self.corrtime = correlation_analysis(self.timeseries) self._conclude() if self.concfreq and self.module_has_save: self.save()
[docs] @render_docs def run( self, start: Optional[int] = None, stop: Optional[int] = None, step: Optional[int] = None, frames: Optional[int] = None, verbose: Optional[bool] = None, progressbar_kwargs: Optional[dict] = None, ) -> Self: """Iterate over the trajectory.""" return _Runner._run( self, analysis_instances=(self,), start=start, stop=stop, step=step, frames=frames, verbose=verbose, progressbar_kwargs=progressbar_kwargs, )
[docs] def savetxt( self, fname: str, X: np.ndarray, columns: Optional[List[str]] = None ) -> None: """Save to text. An extension of the numpy savetxt function. Adds the command line input to the header and checks for a doubled defined filesuffix. Return a header for the text file to save the data to. This method builds a generic header that can be used by any MAICoS module. It is called by the save method of each module. The information it collects is: - timestamp of the analysis - name of the module - version of MAICoS that was used - command line arguments that were used to run the module - module call including the default arguments - number of frames that were analyzed - atomgroup that was analyzed - output messages from modules and base classes (if they exist) """ # This method breaks if fname is a Path object. We therefore convert it to a str fname = str(fname) # Get the required information first current_time = datetime.now().strftime("%a, %b %d %Y at %H:%M:%S ") module_name = self.__class__.__name__ # Here the specific output messages of the modules are collected. We only take # into account maicos modules and start at the top of the module tree. # Submodules without an own OUTPUT inherit from the parent class, so we want to # remove those duplicates. messages_list = [] for cls in self.__class__.mro()[-3::-1]: if hasattr(cls, "OUTPUT"): if cls.OUTPUT not in messages_list: messages_list.append(cls.OUTPUT) messages = "\n".join(messages_list) # Get information on the analyzed atomgroup atomgroups = f" (grp) {atomgroup_header(self.atomgroup)}\n" if hasattr(self, "refgroup") and self.refgroup is not None: atomgroups += f" (ref) {atomgroup_header(self.refgroup)}\n" # We have to check this since only the modules have the _locals attribute, # not the base classes. Yet we still want to test output behaviour of the base # classes. if hasattr(self, "_locals") and hasattr(self, "_run_locals"): sig = inspect.getfullargspec(self.__class__) sig.args.remove("self") strings = [] for param in sig.args: if type(self._locals[param]) is str: string = f"{param}='{self._locals[param]}'" elif param == "atomgroup": string = f"{param}=<AtomGroup>" elif param == "refgroup" and self._locals[param] is not None: string = f"{param}=<AtomGroup>" else: string = f"{param}={self._locals[param]}" strings.append(string) init_signature = ", ".join(strings) sig = inspect.getfullargspec(self.run) sig.args.remove("self") run_signature = ", ".join( [ ( f"{param}='{self._run_locals[param]}'" if type(self._run_locals[param]) is str else f"{param}={self._run_locals[param]}" ) for param in sig.args ] ) module_input = f"{module_name}({init_signature}).run({run_signature})" else: module_input = f"{module_name}(*args).run(*args)" header = ( f"This file was generated by {module_name} " f"on {current_time}\n\n" f"{module_name} is part of MAICoS v{__version__}\n\n" f"Command line: {get_cli_input()}\n" f"Module input: {module_input}\n\n" f"Statistics over {self._index} frames\n\n" f"Considered atomgroups:\n" f"{atomgroups}\n" f"{messages}\n\n" ) if columns is not None: header += "|".join([f"{i:^26}" for i in columns])[2:] fname = "{}{}".format(fname, (not fname.endswith(".dat")) * ".dat") np.savetxt(fname, X, header=header, fmt="% .18e ", encoding="utf8")
[docs] class AnalysisCollection(_Runner): """Running a collection of analysis classes on the same single trajectory. .. warning:: ``AnalysisCollection`` is still experimental. You should not use it for anything important. An analyses with ``AnalysisCollection`` can lead to a speedup compared to running the individual analyses, since the trajectory loop is performed only once. The class requires that each analysis is a child of :class:`AnalysisBase`. Additionally, the trajectory of all ``analysis_instances`` must be the same. It is ensured that all analysis instances use the *same original* timestep and not an altered one from a previous analysis instance. Parameters ---------- *analysis_instances : AnalysisBase Arbitrary number of analysis instances to be run on the same trajectory. Raises ------ AttributeError If the provided ``analysis_instances`` do not work on the same trajectory. AttributeError If an ``analysis_instances`` is not a child of :class:`AnalysisBase`. Example ------- >>> import MDAnalysis as mda >>> from maicos import DensityPlanar >>> from maicos.core import AnalysisCollection >>> from MDAnalysisTests.datafiles import TPR, XTC >>> u = mda.Universe(TPR, XTC) Select atoms >>> ag_O = u.select_atoms("name O") >>> ag_H = u.select_atoms("name H") Create the individual analysis instances >>> dplan_O = DensityPlanar(ag_O) >>> dplan_H = DensityPlanar(ag_H) Create a collection for common trajectory >>> collection = AnalysisCollection(dplan_O, dplan_H) Run the collected analysis >>> _ = collection.run(start=0, stop=100, step=10) Results are stored in the individual instances see :class:`AnalysisBase` on how to access them. You can also save all results of the analysis within one call: >>> collection.save() """ def __init__(self, *analysis_instances: AnalysisBase) -> None: warnings.warn( "`AnalysisCollection` is still experimental. You should not use it for " "anything important.", stacklevel=2, ) for analysis_object in analysis_instances: if analysis_instances[0]._trajectory != analysis_object._trajectory: raise ValueError( "`analysis_instances` do not have the same trajectory." ) if not isinstance(analysis_object, AnalysisBase): raise AttributeError( f"Analysis object {analysis_object} is " "not a child of `AnalysisBase`." ) self._analysis_instances = analysis_instances
[docs] @render_docs def run( self, start: Optional[int] = None, stop: Optional[int] = None, step: Optional[int] = None, frames: Optional[int] = None, verbose: Optional[bool] = None, progressbar_kwargs: Optional[dict] = None, ) -> Self: """${RUN_METHOD_DESCRIPTION}""" return _Runner._run( self, analysis_instances=self._analysis_instances, start=start, stop=stop, step=step, frames=frames, verbose=verbose, progressbar_kwargs=progressbar_kwargs, )
[docs] def save(self) -> None: """Save results of all ``analysis_instances`` to disk. The methods calls the :meth:`save` method of all ``analysis_instances`` if available. If an instance has no :meth:`save` method a warning for this instance is issued. """ for analysis_object in self._analysis_instances: if analysis_object.module_has_save: analysis_object.save() else: warnings.warn( f"`{analysis_object}` has no save() method. Analysis results of " "this instance can not be written to disk.", stacklevel=2, )
[docs] @render_docs class ProfileBase: """Base class for computing profiles. Parameters ---------- ${ATOMGROUP_PARAMETER} ${PROFILE_CLASS_PARAMETERS} ${PROFILE_CLASS_PARAMETERS_PRIVATE} Attributes ---------- ${PROFILE_CLASS_ATTRIBUTES} """ def __init__( self, atomgroup: mda.AtomGroup, grouping: str, bin_method: str, output: str, weighting_function: Callable, weighting_function_kwargs: Union[None, Dict], normalization: str, ) -> None: self.atomgroup = atomgroup self.grouping = grouping.lower() self.bin_method = bin_method.lower() self.output = output self.normalization = normalization.lower() if weighting_function_kwargs is None: weighting_function_kwargs = {} self.weighting_function = lambda ag: weighting_function( ag, grouping, **weighting_function_kwargs ) # We need to set the following dictionaries here because ProfileBase is not a # subclass of AnalysisBase (only needed for tests) self.results = Results() self._obs = Results() def _prepare(self): normalizations = ["none", "volume", "number"] if self.normalization not in normalizations: raise ValueError( f"Normalization {self.normalization!r} not supported. " f"Use {', '.join(normalizations)}." ) groupings = ["atoms", "segments", "residues", "molecules", "fragments"] if self.grouping not in groupings: raise ValueError( f"{self.grouping!r} is not a valid option for " f"grouping. Use {', '.join(groupings)}." ) # If unwrap has not been set we define it here if not hasattr(self, "unwrap"): self.unwrap = True def _compute_histogram( self, positions: np.ndarray, weights: Optional[np.ndarray] = None ) -> np.ndarray: """Calculate histogram based on positions. Parameters ---------- positions : numpy.ndarray positions weights : numpy.ndarray weights for the histogram. Returns ------- hist : numpy.ndarray histogram """ raise NotImplementedError("Only implemented in child classes.") def _single_frame(self) -> Union[None, float]: self._obs.profile = np.zeros(self.n_bins) # type: ignore self._obs.bincount = np.zeros(self.n_bins) # type: ignore if self.grouping == "atoms": # type: ignore positions = self.atomgroup.positions else: positions = get_center( self.atomgroup, bin_method=self.bin_method, compound=self.grouping ) weights = self.weighting_function(self.atomgroup) profile = self._compute_histogram(positions, weights) self._obs.bincount = self._compute_histogram(positions, weights=None) if self.normalization == "volume": profile /= self._obs.bin_volume self._obs.profile = profile return None def _conclude(self) -> None: if self.normalization == "number": with np.errstate(divide="ignore", invalid="ignore"): self.results.profile = ( self.sums.profile / self.sums.bincount # type: ignore ) else: self.results.profile = self.means.profile # type: ignore self.results.dprofile = self.sems.profile # type: ignore
[docs] @render_docs def save(self) -> None: """${SAVE_METHOD_DESCRIPTION}""" columns = ["positions [Å]"] columns.append("profile") columns.append("error") # Required attribute to use method from `AnalysisBase` AnalysisBase.savetxt( self, # type: ignore self.output, np.vstack( ( self.results.bin_pos, self.results.profile, self.results.dprofile, ) ).T, columns=columns, )