Source code for stdpopsim.selection

import attr
import tskit


@attr.s
class GenerationAfter(object):
    """
    We often want to do things one generation after another event. This class
    is essentially just a marker so that the SLiM engine can calculate the next
    generation by taking into account the scaling parameter Q.
    We call float(generation_after_obj) to get the time value.

    XXX: Rename this? GenerationAfter means the generation after something
         in a forwards-time convention, but times are specified with units of
         generations before the present!
    """

    time = attr.ib(type=float)

    def __attrs_post_init__(self):
        validate_time(self)

    def __float__(self):
        return float(self.time)


def validate_time(time):
    if float(time) < 0:
        raise ValueError("Negative times are invalid.")
    if float(time) == 0 and isinstance(time, GenerationAfter):
        raise ValueError("The generation after time=0 is in the future.")


def validate_time_range(start_time, end_time):
    validate_time(start_time)
    validate_time(end_time)
    if float(start_time) < float(end_time):
        raise ValueError(f"start_time={start_time} < end_time={end_time})")
    if (
        float(start_time) == float(end_time)
        and isinstance(start_time, GenerationAfter)
        and not isinstance(end_time, GenerationAfter)
    ):
        raise ValueError(
            f"start_time=GenerationAfter({start_time}) < end_time={end_time}"
        )


class ExtendedEvent(object):
    """
    ExtendedEvent is analogous to msprime.DemographicEvent, but is used here
    for explicitly manipulating mutations. Currently, these are only used by
    the SLiM engine. Times are all in units of generations before the present,
    just like for msprime.DemographicEvent.
    """

    pass


@attr.s(kw_only=True)
class DrawMutation(ExtendedEvent):
    """
    Introduce a new mutation with the given mutation type in the given
    population at a given single site. The mutation will be added to one
    randomly chosen chromosome in the given population. Only one mutation
    may be drawn per single site.

    :ivar time: The number of generations in the past at which to introduce the
        mutation.
    :vartype start_time: float
    :ivar single_site_id: The string ID of the single site at which to
        introduce the mutation, see :meth:`Contig.add_single_site`.
    :vartype single_site_id: str
    :ivar population: The name of the population in which to introduce
        the mutation.
    :vartype population: str
    """

    time = attr.ib(type=float)
    single_site_id = attr.ib(type=str)
    population = attr.ib(type=str)

    def __attrs_post_init__(self):
        validate_time(self.time)


@attr.s(kw_only=True)
class ChangeMutationFitness(ExtendedEvent):
    """
    Change the selection and/or dominance coefficients for the given mutation
    type in the given population. The mutation this applies to need not exist
    in the population for all generations between start_time and end_time.
    E.g. if the mutation could be transferred to the population via migration.
    This requires that a `DrawMutation` event with the same `single_site_id` is
    also supplied to the simulation engine.

    See SLiM documentation for registerFitnessCallback().

    :ivar start_time: The number of generations in the past at which the change
        in mutation fitness begins.
    :vartype start_time: float
    :ivar end_time: The number of generations in the past at which the change
        in mutation fitness ends.
    :vartype end_time: float
    :ivar single_site_id: The string ID of the single site at which to change
        the mutation fitness, see :meth:`Contig.add_single_site`.
    :vartype single_site_id: str
    :ivar population: The name of the population in which to change
        the mutation fitness. If `None` (the default), then all populations are
        affected.
    :vartype population: str
    :ivar selection_coeff: The new selection coefficient of the mutation.
    :vartype selection_coeff: float
    :ivar dominance_coeff: The new dominance coefficient of the mutation.
    :vartype dominance_coeff: float
    """

    start_time = attr.ib(type=float)
    end_time = attr.ib(type=float)
    single_site_id = attr.ib(type=str)
    population = attr.ib(type=str, default=None)
    selection_coeff = attr.ib(type=float)
    dominance_coeff = attr.ib(type=float)

    def __attrs_post_init__(self):
        validate_time_range(self.start_time, self.end_time)


@attr.s(kw_only=True)
class ConditionOnAlleleFrequency(ExtendedEvent):
    """
    Condition on the allele frequency of a drawn mutation with the given
    mutation type in the given population. The mutation need not have been
    drawn in the population being conditioned upon.  This requires that a
    `DrawMutation` event for the same single site is also supplied to the
    simulation engine.

    :ivar start_time: The number of generations in the past at which to start
        checking the allele frequency condition.
    :vartype start_time: float
    :ivar end_time: The number of generations in the past at which to stop
        checking the allele frequency condition.
    :vartype end_time: float
    :ivar single_site_id: The string ID of the single site at which to check
        the allele frequency condition, see :meth:`Contig.add_single_site`.
    :vartype single_site_id: str
    :ivar population: The name of the population in which to check
        the allele frequency condition.
    :vartype population: str
    :ivar op: The comparison operator used for the allele frequency condition,
        one of `(">", ">=", "<", "<=")`.
    :vartype op: str
    :ivar allele_frequency: The allele frequency to compare against.
    :vartype op: float
    """

    start_time = attr.ib(type=float)
    end_time = attr.ib(type=float)
    single_site_id = attr.ib(type=str)
    population = attr.ib(type=str, default=None)
    op = attr.ib(type=str, default=None)
    allele_frequency = attr.ib(type=float, default=None)

    op_types = ("<", "<=", ">", ">=")

    def __attrs_post_init__(self):
        if self.op not in self.op_types:
            raise ValueError(f"Invalid conditioning op `{self.op}`.")
        if not (0 <= self.allele_frequency <= 1):
            raise ValueError("Must have 0 <= allele_frequency <= 1.")
        if (self.op == "<" and self.allele_frequency == 0) or (
            self.op == ">" and self.allele_frequency == 1
        ):
            raise ValueError(
                f"allele_frequency {self.op} {self.allele_frequency}: "
                "condition is always false."
            )
        if (self.op == ">=" and self.allele_frequency == 0) or (
            self.op == "<=" and self.allele_frequency == 1
        ):
            raise ValueError(
                f"allele_frequency {self.op} {self.allele_frequency}: "
                "condition is always true."
            )
        validate_time_range(self.start_time, self.end_time)

    @classmethod
    def op_id(cls, op):
        return cls.op_types.index(op)


[docs] def selective_sweep( single_site_id, population, mutation_generation_ago, selection_coeff, dominance_coeff=0.5, start_generation_ago=None, end_generation_ago=None, min_freq_at_start=None, min_freq_at_end=None, globally_adaptive=True, ): """ Creates a list of extended events corresponding to a selective sweep at a single site (see :meth:`Contig.add_single_site`) that may subsequently be passed to the simulation engine. The sequence of events is: (1) In generation `mutation_generation_ago`, a **neutral** mutation is introduced into `population`. (2) In generation `start_generation_ago`, the mutation becomes beneficial (whether globally or locally is controlled by the `globally_adaptive` option). If the frequency of the mutation in `population` is below `min_freq_at_start` in this generation, the simulation will restart from `mutation_generation_ago`. (3) In the generation after `end_generation_ago`, the mutation reverts to neutrality in `population`. If the frequency of the mutation in `population` is below `min_freq_at_end` in this generation, the simulation will restart from `mutation_generation_ago`. During the sweep, the fitnesses of individuals that are homozygous or heterozygous for the beneficial mutation are `1 + selection_coeff` and `1 + selection_coeff * dominance_coeff`, respectively. If `globally_adaptive` is `True` (the default), then all individuals carrying the mutation are impacted regardless of the population they belong to. Otherwise, the mutation only influences fitness within `population`. If the mutation is lost in `population` at any point between `mutation_generation_ago` and `end_generation_ago`, then the simulation will restart from `mutation_generation_ago`. The effect of "restarting" (aka "rejection sampling") is that the resulting simulations are conditioned on the mutation being extant after time `mutation_generation_ago` with frequency above `min_freq_at_start` at `start_generation_ago` and `min_freq_at_end` at `end_generation_ago`. Because the simulation will restart until the allele frequency conditions are satisfied, care must be taken to ensure that these are reasonable. For example, if `start_generation_ago` is very close to `mutation_generation_ago` and `min_freq_at_start` is substantially greater than zero, the vast majority of proposed allele frequency trajectories will be rejected and the simulation may take an infeasibly long time to complete. :param str single_site_id: The string ID of the single site at which to introduce the mutation, see :meth:`Contig.add_single_site`. :param int population: The name of the population in which the mutation is introduced and the sweep occurs. :param float mutation_generation_ago: The number of generations in the past at which to introduce the mutation. :param float selection_coeff: The selection coefficient of the beneficial mutation during the sweep. Must be greater than or equal to zero. :param float dominance_coeff: The dominance coefficient of the beneficial mutation during the sweep. Defaults to `0.5`. :param float start_generation_ago: The number of generations in the past at which the sweep starts and the mutation switches from neutral to beneficial. Defaults to `mutation_generation_ago` (i.e. a "hard" sweep). :param float end_generation_ago: The number of generations in the past at which the sweep ends and the mutation switches from beneficial to neutral. Defaults to `0` (i.e. the end of the simulation). :param float min_freq_at_start: The minimum allowed frequency for the mutation in `population` at the start of the sweep (optional). :param float min_freq_at_end: The minimum allowed frequency for the mutation in `population` at the end of the sweep (optional). :param bool globally_adaptive: If `True` (default), the mutation is beneficial in all populations. If `False`, the mutation is beneficial only in the population of origin. """ if start_generation_ago is None: start_generation_ago = mutation_generation_ago if end_generation_ago is None: end_generation_ago = 0 validate_time_range(mutation_generation_ago, start_generation_ago) validate_time_range(start_generation_ago, end_generation_ago) if selection_coeff < 0.0: raise ValueError("Selection coefficient must be non-negative.") extended_events = [ DrawMutation( single_site_id=single_site_id, time=mutation_generation_ago, population=population, ), ConditionOnAlleleFrequency( single_site_id=single_site_id, start_time=GenerationAfter(mutation_generation_ago), end_time=end_generation_ago, population=population, allele_frequency=0.0, op=">", ), ] # Sweep period is [start_generation_ago, end_generation_ago] (endpoints inclusive). sweep_restricted_to = None if globally_adaptive else population extended_events.extend( [ ChangeMutationFitness( single_site_id=single_site_id, start_time=start_generation_ago, end_time=end_generation_ago, population=sweep_restricted_to, selection_coeff=selection_coeff, dominance_coeff=dominance_coeff, ), ] ) # Only check sweep AF conditions in start/end generations, so that allele # trajectories are otherwise unconstrained. if min_freq_at_start is not None: if not 0 < min_freq_at_start <= 1: raise ValueError( "If specified, the minimum allele frequency at the start of the sweep " "must be in (0, 1]." ) if start_generation_ago == mutation_generation_ago: raise ValueError( "If the start time of the sweep coincides with the introduction " "of the mutation, then `min_freq_at_start` cannot be set." ) extended_events.extend( [ ConditionOnAlleleFrequency( single_site_id=single_site_id, start_time=start_generation_ago, end_time=start_generation_ago, population=population, allele_frequency=min_freq_at_start, op=">=", ), ] ) if min_freq_at_end is not None: if not 0 < min_freq_at_end <= 1: raise ValueError( "If specified, the minimum allele frequency at the end of the sweep " "must be in (0, 1]." ) extended_events.extend( [ ConditionOnAlleleFrequency( single_site_id=single_site_id, start_time=end_generation_ago, end_time=end_generation_ago, population=population, allele_frequency=min_freq_at_end, op=">=", ), ] ) return extended_events
def selection_coeff_from_mutation(ts, mutation): """ Returns the selection coefficient of the given mutation. In nearly all cases, this will just be "the selection coefficient" and users do not need to worry about the details. However: mutations produced by the SLiM engine have metadata which includes each mutation's selection coefficient. However, mutations may "stack", meaning that one mutation-in-the-tree-sequence may in fact represent a superposition of several SLiM-produced-mutations. This method adds up all selection coefficients in the metadata of this mutation, and subtracts the sum of the selection coefficients of the parent mutation, if there is one. For more information, see the SLiM manual and `the pyslim manual <https://tskit.dev/pyslim/docs/stable/tutorial.html#extracting-information-about-selected-mutations>`__. :param ts: A ``tskit.TreeSequence`` containing the mutation. :param mutation: A ``tskit.Mutation`` for which to extract the selection coefficient. """ if not isinstance(ts, tskit.TreeSequence): raise ValueError("`ts` must be a `tskit.TreeSequence` object") if not isinstance(mutation, tskit.Mutation): raise ValueError("`mutation` must be a `tskit.Mutation` object") if not isinstance(mutation.metadata, dict): return 0.0 selection_coeff = sum( [m.get("selection_coeff") for m in mutation.metadata["mutation_list"]] ) if mutation.parent != tskit.NULL: parent = ts.mutation(mutation.parent) selection_coeff -= sum( [m.get("selection_coeff") for m in parent.metadata["mutation_list"]] ) return selection_coeff