Source code for multi_locus_analysis.finite_window.munging

import warnings

import numpy as np
import pandas as pd
from pandas.api.types import CategoricalDtype

censor_t = CategoricalDtype(
    categories=['interior', 'left exterior', 'right exterior',
                'full exterior'],
    ordered=False
)


[docs]def state_changes_to_wait_times(traj): """ DEPRECATED: in favor of :func:`simulations_to_observations`. Converts the output of :func:`ab_window_fast` into a :class:`pd.DataFrame` containing each wait time, with its start, end, rank order, and the state it's leaving. This function deals with "continuous" wait times, as in not measured at discrete time points (on a grid), so the wait times it returns are exact.""" warnings.warn('Use sim_to_obs instead!', DeprecationWarning) waits = traj.copy() num_waits = len(waits) waits['rank_order'] = np.arange(num_waits) + 1 waits.set_index('rank_order', inplace=True) if num_waits == 0: return waits waits['wait_time'] = waits['end_time'] - waits['start_time'] waits.loc[1, 'wait_time'] \ = waits.loc[1, 'end_time'] - waits.loc[1, 'window_start'] waits.loc[num_waits, 'wait_time'] = \ waits.loc[num_waits, 'window_end'] \ - waits.loc[num_waits, 'start_time'] waits['window_size'] = waits['window_end'] - waits['window_start'] waits['wait_type'] = 'interior' if num_waits == 1: waits['wait_type'] = 'full exterior' waits['wait_time'] = waits['window_size'].copy() else: waits.loc[1, 'wait_type'] = 'left exterior' waits.loc[num_waits, 'wait_type'] = 'right exterior' return waits
[docs]def traj_to_waits(*args, **kwargs): """Alias of :func:`state_changes_to_wait_times`""" return state_changes_to_wait_times(*args, **kwargs)
[docs]def simulations_to_observations(simulations, traj_cols=['replicate']): """ Vectorized extracting of "observed" wait times. """ sim = simulations obs = sim[traj_cols + ['state']].copy() # observed times are only within the interval obs['start_time'] = np.max(sim[['start_time', 'window_start']].to_numpy(), axis=1) obs['end_time'] = np.min(sim[['end_time', 'window_end']].to_numpy(), axis=1) # allows vectorized computation of all wait times obs['wait_time'] = obs['end_time'] - obs['start_time'] obs['window_size'] = sim['window_end'] - sim['window_start'] # vectorized-ly extract rank order/num_waits to classify each wait time obs['rank_order'] = obs.groupby(traj_cols)['start_time'].rank().astype(int) obs.set_index(traj_cols, inplace=True) obs['num_waits'] = obs.groupby(traj_cols)['rank_order'].max() # default to assuming its an interior time obs['wait_type'] = 'interior' # don't think there's a way to make this categorical at "assign" time obs['wait_type'] = obs['wait_type'].astype(censor_t) # the first and last ones are left/right exterior obs.loc[obs['rank_order'] == 1, 'wait_type'] = 'left exterior' obs.loc[ obs['rank_order'] == obs['num_waits'], 'wait_type' ] = 'right exterior' # and isolated wait times are doubly exterior obs.loc[obs['num_waits'] == 1, 'wait_type'] = 'full exterior' return obs.set_index('rank_order', append=True)
sim_to_obs = simulations_to_observations
[docs]def state_changes_to_movie_frames( traj, times, state_col='state', start_times_col='start_time', end_times_col='end_time', wait_type_col='wait_type'): """ Convert state changes into discrete-time observations of state. Takes a Series of state change times into a Series containing observations at the times requested. The times become the index. Parameters ---------- times : (N,) array_like times at which to "measure" what state we're in to make the new trajectories. traj : pd.DataFrame should have *state_col* and *start_times_col* columns. the values of *state_col* will be copied over verbatim. state_col : str, default: 'state' name of column containing the state being transitioned out of for each measurement in *traj*. start_times_col : str, default: 'start_times' name of column containing times at which *traj* changed state end_times_col : (optional) str by default, the function assumes that times after the last provided state transition time are in the same state. if passed, this column is used to determine at what time the last state "finished". Times after this will be labeled as NaN. Analagously to how start times are treated, if the end time *exactly* matches the "transition" time, assume this is an "exterior" measurement. Returns ------- movie : pd.Series Series defining the "movie" with frames taken at `times` that simply measures what state `traj` is in at each frame. index is `times`, `state_col` is used to name the Series. Notes ----- A start time means that if we observe at that time, the state transition will have already happened (right-continuity), except in the case when the transition happens at *exactly* the window end time. This is confusing in words, but simple to see in an example (see the example below). Examples -------- For the DataFrame >>> df = pd.DataFrame([['A', -1, 0.1], ['B', 0.1, 1.0]], >>> columns=['state', 'start_time', 'end_time']) the discretization into tenths of seconds would give >>> state_changes_to_movie_frames(df, times=np.linspace(0, 1, 11), >>> end_times_col='end_time') t 0.0 A 0.1 B 0.2 B 0.3 B 0.4 B 0.5 B 0.6 B 0.7 B 0.8 B 0.9 B 1.0 B Name: state, dtype: object Notice in particular how at 0.1, the state is already 'B'. However at time 1.0 the state is not already "unknown". This is what is meant by the Notes section above. If the `end_times_col` argument is omitted, then the last observed state is assumed to continue for all `times` requested from then on: >>> state_changes_to_movie_frames(df, times=np.linspace(0, 2, 11)) t 0.0 A 0.2 B 0.4 B 0.6 B 0.8 B 1.0 B 1.2 B 1.4 B 1.6 B 1.8 B 2.0 B Name: state, dtype: object """ if len(traj) <= 0: raise ValueError('Need a non-empty trajectory, otherwise how will I ' 'know what the possible states are?') times = np.sort(np.array(times)) traj.sort_values(start_times_col) if times[0] < traj[start_times_col].iloc[0]: raise ValueError('Requested a time before any measurements were made!') # initialize in a random state to get dtype correct movie = pd.Series(traj[state_col].iloc[0], index=times) i = 0 i_max = len(traj) for time in times: while i < i_max and traj[start_times_col].iloc[i] <= time: i += 1 movie.loc[time] = traj[state_col].iloc[i-1] if end_times_col is not None: last_observed_time = traj[end_times_col].iloc[-1] if last_observed_time < times[-1]: first_bad_time = np.argmax(last_observed_time < times) movie.loc[times[first_bad_time]:] = np.nan movie.name = state_col movie.index.name = 't' return movie
[docs]def simulation_to_frame_times(simulation, t, traj_cols=['replicate']): """ Vectorized conversion into frames, using np.searchsorted. Getting what "frames" each transition happened on is pretty easy. Consider the example trajectory with ``window_start=1.2``, ``window_end=2.9``, and transition times: >>> transitions = [0.9, 1.5, 1.7, 2.5, 3.1] with possible frames: >>> frames = [0, 1, 2, 3, 4, 5, 6] It may seem that by definition, we can only observe the transitions between *window_start* and *window_end*, however, this is not actually true here, because the stationary, "continuous" Markov renewal process that we've generated using the `fw.simulation` functions is valid from the "true" start of the left exterior time to the "true" end of the right exterior time, even if we normally discard these times for the purposes of simulating censored statistics. So the discrete movie's window size will just be the number of frames that are in the interval defined by ``[min(transitions), max(transitions)]``. Similarly, if instead ``window_end=3.05``, then nothing actually changes, because either way in a discrete movie we directly observe the state at ``t=3``. We've got nice strats for extracting the first and last possible frame in a vectorized way. So the algorithm is to first do that. Then, we discard transition times that are beyond our observation frames in either direction, and searchsort the remaining ones into the frames. In the above, that's: >>> t_i = [1, 2, 2, 3, 4] or equivalently: >>> start_t_i = [1, 2, 2, 3] and >>> end_t_i = [2, 2, 3, 4] Now we simply notice that the *last* start time that searchsorts to being before a given frame is going to dictate the state that we observe at that frame. So if the states after each ``start_t_i`` were: >>> states = ['A', 'B', 'A', 'B'] then we will observe states ['A', 'A', 'B'] at frames [1, 2, 3], with no observation at frames 0 or 4, 5 and 6. Finally, sometimes there will be multiple state switches between two frames, and yet by chance we end up in the same state as when we started. We must combine these. To get the correct output for *every* frame, we would simply have to notice that sometimes a waiting time observation will cover many frames, and add one last step where we simply fill frames not specifically marked by a start time with whatever the state was at the previous frame. We don't do this right now because my thesis is due tomorrow. """ sim = simulation # First, get the discrete window left_lext = sim.groupby(traj_cols)['start_time'].min() start_i = np.searchsorted(t, left_lext) # start times that searchsort past the end means that the entire trajectory # occured *after* all frames were over, so it's unobservable t_rnan = np.append(t, np.nan) leftmost_frame = pd.Series(t_rnan[start_i], index=left_lext.index) right_rext = sim.groupby(traj_cols)['end_time'].max() # normally would be " - 1", but we insert nan at start of t instead # to automagically catch the case that the rightmost wait end searchsorts # left of *all* frames, in which case the trajectory is unobservable end_i = np.searchsorted(t, right_rext) t_lnan = np.insert(t, 0, np.nan) rightmost_frame = pd.Series(t_lnan[end_i], index=right_rext.index) # Second, build the frames df # exclude useless waits sim = sim[sim['start_time'] < t[-1]] sim = sim[sim['end_time'] > t[0]] states = sim[traj_cols + ['state']].copy() start_times = sim['start_time'].values states['start_i'] = np.searchsorted(t, start_times) frames = states.groupby(traj_cols + ['start_i'])['state'].last() frames = frames.reset_index().set_index(traj_cols) frames['start_time'] = t[frames['start_i'].values] frames['window_start'] = leftmost_frame frames['window_end'] = rightmost_frame # Before adding the end times we need to remove neighboring wait # times that end up corresponding to the same state. # HACK: create a special index that tracks which unique trajectory each # wait time belongs to, then blindly diff state col, and use the new # traj_id col to tell when a lack of diff is due to a redundant wait time # (same state as prev one) and when it's just due to a new trajectory. traj_id = frames.groupby(traj_cols)['start_time'].first() * 0 traj_id.iloc[:] = np.arange(len(traj_id)) frames['traj_id'] = traj_id # Need to add a single element start to get size match. First wait time by # definition can't be redundant, so add traj_id which will never matches so # it can't get marked as _is_redundant. frames['traj_id2'] = np.insert(frames['traj_id'].values[:-1], 0, -1) states = frames['state'].unique() state_id = np.arange(len(states)) state_map = {s: state_id[i] for i, s in enumerate(states)} frames['state_id'] = frames['state'].replace(state_map) # insert, so the *second* of the redundant guys is deleted state_diff = np.insert(np.diff(frames['state_id']), 0, 1) redundant = (state_diff == 0) & (frames['traj_id'] == frames['traj_id2']) frames = frames.loc[~redundant] # Use the same hack as for the redundant guys, but we need the nan on the # other side this time, to match what we'll do to the end time col traj_id = frames.groupby(traj_cols)['start_time'].first() * 0 traj_id.iloc[:] = np.arange(len(traj_id)) frames['traj_id'] = traj_id frames['traj_id2'] = np.append(frames['traj_id'].values[1:], np.nan) frames['end_time'] = np.append(frames['start_time'].values[1:], np.nan) is_last = frames['traj_id'] != frames['traj_id2'] frames.loc[is_last, 'end_time'] = frames.loc[is_last, 'window_end'] # get rid of hacky temporary columns del frames['traj_id'] del frames['traj_id2'] return frames
[docs]def traj_to_movie(*args, **kwargs): "Alias of :func:`.state_changes_to_movie_frames`." return state_changes_to_movie_frames(*args, **kwargs)
[docs]def discrete_trajectory_to_wait_times(data, t_col='t', state_col='state'): """Converts a discrete trajectory to a dataframe containing each wait time, with its start, end, rank order, and the state it's leaving. Discrete here means that the state of the system was observed at finite time points (on a lattice in time), as opposed to a system where the exact times of transitions between states are known. Because a discrete trajectory only bounds the wait times, and does not determine their exact lengths (as a continuous trajectory might), additional columns are included that explictly bound the wait times, in addition to returning the "natural" estimate. Parameters ---------- data : pd.DataFrame should have at least states_column and time_column columns, and already be groupby'd so that there's only one "trajectory" within the DataFrame. One row should correspond to an observation at a particular time point. time_column : string the name of the column containing the time of each time point states_column : string the name of the column containing the state for each time point Returns ------- wait_df : pd.DataFrame columns are ['wait_time', 'start_time', 'end_time', 'state', 'wait_type', 'min_waits', 'max_waits'], where [wait,end,start]_time columns are self explanatory, state is the value of the states_column during that waiting time, and wait_type is one of 'interior', 'left exterior', 'right exterior', 'full exterior', depending on what kind of waiting time was observed. See the `Notes` section below for detailed explanation of these categories. The 'min/max_waits' columns contain the minimum/maximum possible value of the wait time (resp.), given the observations. The default index is named "rank_order", since it tracks the order (zero-indexed) in which the wait times occured. Notes ----- the following types of wait times are of interest to us 1) *interior* censored times: whenever you are observing a switching process for a finite amount of time, any waiting time you observe the entirety of is called "interior" censored 2) *left exterior* censored times: whenever the waiting time you observe started before you began observation (it overlaps the "left" side of your interval of observation) 3) *right exterior* censored times: same as above, but overlapping the "right" side of your interval of observation. 4) *full exterior* censored times: whenever you observe the existence of a single, particular state throughout your entire window of observation. """ states = data[state_col].values times = data[t_col].values num_measurements = len(data) # now iterate through valid part of trajectory to establish wait times start_times = [] end_times = [] earliest_st = [] # bounds on start time latest_st = [] earliest_et = [] # bounds on end time latest_et = [] wait_state = [] wait_type = [] k0 = 0 # index at which current state began state = states[k0] state_has_changed = False for k in range(num_measurements): # if no state change, continue if states[k] == state: continue # otherwise, store change start_times.append(times[k0]) end_times.append(times[k]) wait_state.append(state) # bounds on true wait time value if k0 == 0: # left exterior times have exactly determined "start" earliest_st.append(times[k0]) else: earliest_st.append(times[k0-1]) latest_st.append(times[k0]) earliest_et.append(times[k-1]) latest_et.append(times[k]) # if this is the first state change, we store it separately if not state_has_changed: wait_type.append('left exterior') state_has_changed = True # otherwise, a normal state change else: wait_type.append('interior') # either way, state has changed state = states[k] k0 = k # also store the time spent in final state start_times.append(times[k0]) end_times.append(times[k]) if k0 == 0: # full exterior times also have exactly determined "start" earliest_st.append(times[k0]) else: earliest_st.append(times[k0-1]) latest_st.append(times[k0]) # right/full exterior times have exactly determined "end" earliest_et.append(times[k]) latest_et.append(times[k]) # state type stored specially for final state wait_state.append(state) if not state_has_changed: wait_type.append('full exterior') else: wait_type.append('right exterior') start_times = np.array(start_times) end_times = np.array(end_times) wait_times = end_times - start_times min_waits = np.array(earliest_et) - np.array(latest_st) max_waits = np.array(latest_et) - np.array(earliest_st) df = pd.DataFrame({'start_time': start_times, 'end_time': end_times, 'wait_time': wait_times, 'state': wait_state, 'min_waits': min_waits, 'max_waits': max_waits, 'wait_type': wait_type}) df.index.name = 'rank_order' df['window_size'] = times[-1] - times[0] return df
[docs]def movie_to_waits(*args, **kwargs): """Alias of :func:`.discrete_trajectory_to_wait_times`""" return discrete_trajectory_to_wait_times(*args, **kwargs)