Source code for stdpopsim.engines

import logging
import warnings

import attr
import msprime
import stdpopsim
import numpy as np
import math

logger = logging.getLogger(__name__)

_registered_engines = {}


[docs]def register_engine(engine): """ Registers the specified simulation engine. :param engine: The simulation engine object to register. :type engine: :class:`.Engine` """ if engine.id in _registered_engines: raise ValueError(f"Simulation engine '{engine.id}' already registered.") logger.debug(f"Registering simulation engine '{engine.id}'") _registered_engines[engine.id] = engine
[docs]def get_engine(id): """ Returns the simulation engine with the specified ``id``. :param str id: The string identifier for the requested engine. The currently supported engines are "msprime" and "slim". :return: A simulation engine object with a ``simulate()`` method. :rtype: :class:`.Engine` """ if id not in _registered_engines: raise ValueError(f"Simulation engine '{id}' not registered") return _registered_engines[id]
def all_engines(): """ Returns an iterator over all registered simulation engines. """ for sim_engine in _registered_engines.values(): yield sim_engine
[docs]@attr.s class Engine: """ Abstract class representing a simulation engine. To implement a new simulation engine, one should inherit from this class. At a minimum, the ``id``, ``description`` and ``citations`` attributes must be set, and the :func:`simulate()` and :func:`get_version()` methods must be implemented. See msprime example in ``engines.py``. :ivar ~.id: The unique identifier for the simulation engine. :vartype ~.id: str :ivar ~.description: A short description of this engine. :vartype ~.description: str :ivar citations: A list of citations for the simulation engine. :vartype citations: list of :class:`.Citation` """
[docs] def simulate( self, demographic_model, contig, samples, *, seed=None, dry_run=False, ): """ Simulates the model for the specified contig and samples. ``demographic_model``, ``contig``, and ``samples`` must be specified. :param demographic_model: The demographic model to simulate. :type demographic_model: :class:`.DemographicModel` :param contig: The contig, defining the length, mutation rate, and recombination rate(s). :type contig: :class:`.Contig` :param samples: The samples to be obtained from the simulation. :type samples: dict containing number of individuals per population :param seed: The seed for the random number generator. :type seed: int :param dry_run: If True, the simulation engine will return None without running the simulation. :type dry_run: bool :return: A succinct tree sequence. :rtype: :class:`tskit.trees.TreeSequence` or None """ raise NotImplementedError()
[docs] def get_version(self): """ Returns the version of the engine. :rtype: str """ raise NotImplementedError()
def _warn_mutation_rate_mismatch(self, contig, demographic_model): if demographic_model.mutation_rate is not None and not math.isclose( demographic_model.mutation_rate, contig.mutation_rate ): warnings.warn( "The demographic model has mutation rate " f"{demographic_model.mutation_rate}, but this simulation used the " "contig's mutation rate " f"{contig.mutation_rate}. " "Diversity levels may be different than expected for this species. " "For details see documentation at " "https://popsim-consortium.github.io/stdpopsim-docs/stable/tutorial.html" ) def _warn_recombination_rate_mismatch(self, contig, demographic_model): if demographic_model.recombination_rate is not None and not math.isclose( demographic_model.recombination_rate, contig.recombination_map.mean_rate ): warnings.warn( "The demographic model has recombination rate " f"{demographic_model.recombination_rate}, but this simulation used the " "contig's recombination rate " f"{contig.recombination_map.mean_rate}. " "Patterns of linkage may be different than expected for this species. " "For details see documentation at " "https://popsim-consortium.github.io/stdpopsim-docs/stable/tutorial.html" )
[docs]class _MsprimeEngine(Engine): id = "msprime" #: description = "Msprime coalescent simulator" #: citations = [ stdpopsim.Citation( doi="https://doi.org/10.1371/journal.pcbi.1004842", year="2016", author="Kelleher et al.", reasons={stdpopsim.CiteReason.ENGINE}, ) ] # We default to the first model in the list. model_class_map = { "hudson": msprime.StandardCoalescent, "dtwf": msprime.DiscreteTimeWrightFisher, "smc": msprime.SmcApproxCoalescent, "smc_prime": msprime.SmcPrimeApproxCoalescent, } model_citations = { "dtwf": [ stdpopsim.Citation( doi="https://doi.org/10.1371/journal.pgen.1008619", year="2020", author="Nelson et al.", reasons={stdpopsim.CiteReason.ENGINE}, ) ] } @property def supported_models(self): return list(self.model_class_map.keys()) def _convert_model_spec(self, model_str, model_changes): """ Convert the specified model specification into a form suitable for sim_ancestry. The model param is a string or None. The model_changes is either None or list of (time, model_str) tuples. Also return the appropriate extra citations. """ citations = [] if model_str is None: model_str = "hudson" else: if model_str not in self.model_class_map: raise ValueError(f"Unrecognised model '{model_str}'") if model_str in self.model_citations: citations.extend(self.model_citations[model_str]) if model_changes is None: model = model_str else: model_list = [] last_t = 0 last_model = model_str for t, model in model_changes: if model not in self.supported_models: raise ValueError(f"Unrecognised model '{model}'") if model in self.model_citations: citations.extend(self.model_citations[model]) duration = t - last_t model_list.append(self.model_class_map[last_model](duration=duration)) last_model = model last_t = t model_list.append(self.model_class_map[last_model](duration=None)) model = model_list return model, citations
[docs] def simulate( self, demographic_model, contig, samples, *, seed=None, msprime_model=None, msprime_change_model=None, dry_run=False, **kwargs, ): """ Simulate the demographic model using msprime. See :meth:`.Engine.simulate()` for definitions of parameters defined for all engines. :param msprime_model: The msprime simulation model to be used. One of ``hudson``, ``dtwf``, ``smc``, or ``smc_prime``. See msprime API documentation for details. :type msprime_model: str :param msprime_change_model: A list of (time, model) tuples, which changes the simulation model to the new model at the time specified. :type msprime_change_model: list of (float, str) tuples :param dry_run: If True, ``end_time=0`` is passed to :meth:`msprime.simulate()` to initialise the simulation and then immediately return. :type dry_run: bool :param \\**kwargs: Further arguments passed to :meth:`msprime.sim_ancestry()` """ model, citations = self._convert_model_spec(msprime_model, msprime_change_model) self.citations.extend(citations) if "random_seed" in kwargs.keys(): if seed is None: seed = kwargs["random_seed"] del kwargs["random_seed"] else: raise ValueError("Cannot set both seed and random_seed") # test to make sure contig is fully neutral if not contig.is_neutral: raise ValueError( "Contig had non neutral mutation types " "but you are using the msprime engine" ) # handle deprecated samples=[msprime.SampleSet] input if isinstance(samples, dict): sample_sets = demographic_model.get_sample_sets( samples, ploidy=contig.ploidy ) elif all([isinstance(x, msprime.SampleSet) for x in samples]): sample_sets = samples else: raise ValueError( "Samples must be a dict of the form {population_name:num_samples}." ) self._warn_mutation_rate_mismatch(contig, demographic_model) self._warn_recombination_rate_mismatch(contig, demographic_model) rng = np.random.default_rng(seed) seeds = rng.integers(1, 2**31 - 1, size=2) # if bacterial_recombination=True then "recombination_rate" # is implemented by msprime as gene conversion rate # (which must be constant) recombination_map = contig.recombination_map gc_rate = None gc_frac = contig.gene_conversion_fraction if contig.bacterial_recombination: gc_rate = contig.recombination_map.mean_rate recombination_map = msprime.RateMap( position=[0.0, contig.length], rate=[0.0] ) elif (gc_frac is not None) and (gc_frac > 0.0): # the recombination map is a map of double-stranded breaks, # so we need to separate out gene conversion from crossing-over gc_rate = contig.recombination_map.mean_rate * gc_frac / (1 - gc_frac) recombination_map = msprime.RateMap( position=recombination_map.position, rate=recombination_map.rate * (1 - gc_frac), ) ts = msprime.sim_ancestry( samples=sample_sets, recombination_rate=recombination_map, gene_conversion_rate=gc_rate, gene_conversion_tract_length=contig.gene_conversion_length, demography=demographic_model.model, ploidy=contig.ploidy, random_seed=seeds[0], model=model, end_time=0 if dry_run else None, **kwargs, ) ts = msprime.sim_mutations( ts, end_time=0 if dry_run else None, random_seed=seeds[1], rate=contig.mutation_rate, ) if contig.inclusion_mask is not None: ts = stdpopsim.utils.mask_tree_sequence(ts, contig.inclusion_mask, False) if contig.exclusion_mask is not None: ts = stdpopsim.utils.mask_tree_sequence(ts, contig.exclusion_mask, True) if dry_run: ts = None return ts
def get_version(self): return msprime.__version__
register_engine(_MsprimeEngine())
[docs]def get_default_engine(): """ Returns the default simulation engine (msprime). :rtype: :class:`.Engine` """ return get_engine("msprime")