r"""
For computing moments of particle motions.
When dealing with trajectory data, most of the common readouts can simply be
thought of as moments of the trajectory with different time lags or discrete
derivatives applied.
More precisely, suppose you have trajectory data :math:`X_i(t_k)`, then
we define the discrete velocity (the first discrete derivative of the particle
as :math:`V_i^\delta(t_k) = X_i(t_k + \delta) - X_i(t_k)`
and the discrete relative velocities
:math:`V_{ij}^\delta(t_k) = X_{ij}(t_k + \delta) - X_{ij}(t_k)`
where :math:`X_{ij}` is the distance between the points :math:`i` and :math:`j`.
This module simply helps quickly calculate moments of the :math:`V^\delta(t)`.
For example, the ensemble MSD is simply :math:`E_i[(V_i^\delta(0))^2]` as a
function of :math:`\delta`, where :math:`E_\chi[\cdot]` indicates the average is
taken over the variable :math:`\chi`. The time-averaged MSD for a single
particle on the other hand is given by :math:`E_t[(V_i^\delta(t))^2]`.
This module provides a simple set of functions for easily extracting the moment
of any product of any derivatives with any combination of time lags of a set of
trajectories, :math:`X_i(t_k)`.
"""
import numpy as np
import pandas as pd
import scipy
from scipy.io import loadmat
import gc
from pathlib import Path
###############{{{
# Call on DataFrame directly
def re_agg_mean(df, group_cols, mean_col='mean', count_col='count'):
def ave_wrap(df):
return np.average(df[mean_col], weights=df[count_col])
return df.groupby(group_cols).apply(ave_wrap)
def autocorr_1(df, xcol, tcol=None, **kwargs):
XX = corr_prod(df, xcol=xcol, tcol=tcol, **kwargs)
return XX.groupby(['t'])['XX'].agg(['mean', 'std', 'count'])
[docs]def corr_prod(df, xcol, tcol=None, max_dt=None, max_t_over_delta=4,
allowed_dts=None, t_step=None):
r"""Calculates the time averaged autocorraltion correlation over a column,
with an optional time column.
The time average can be written
.. math::
C_X(t) = E_\tau[X_{\tau + t}X_\tau]
"""
x = df[xcol]
if tcol is not None:
t = df[tcol]
else:
t = np.arange(len(x))
# C_x(i-j) = dx[i]*dx[j] so all n^2 products
XX = x[None,:]*x[:,None]
dts = t[None,:] - t[:,None]
tau = t[None,:] - np.zeros_like(t[:,None])
good_ix = dts >= 0
if max_dt:
good_ix &= dts < max_dt
if max_t_over_delta and 'delta' in df:
good_ix &= dts/df['delta'].iloc[0] <= max_t_over_delta
if allowed_dts:
good_ix &= np.isin(dts, allowed_dts)
arrs = [pd.Series(arr[good_ix]) for arr in [dts, XX, tau]]
XX = pd.concat(arrs, axis=1)
XX.columns = ['t', 'XX', 'tau']
XX.set_index(['t', 'tau'], inplace=True)
return XX
# end Call on DataFrame directly
###############}}}
###############{{{
# Apply to groupby'd DataFrame
[docs]def traj_to_msds(traj, xcol='x', ycol='y', framecol='frame id'):
"""Data should be groupby'd trajectories.
Takes x,y coordinate columns and the "time" column, expects integer index
for time column."""
x = traj[xcol]
y = traj[ycol]
ft = traj[framecol]
dfts = ft[None,:] - ft[:,None]
dxs = x[None,:] - x[:,None]
dys = y[None,:] - y[:,None]
dX = np.power(dxs, 2) + np.power(dys, 2)
old_index = np.array(traj.index)
old_index_start = old_index[:,None] - np.zeros((len(traj.index),))[None,:].astype(int)
old_index_end = old_index[None,:] - np.zeros((len(traj.index),))[:,None].astype(int)
pos_ix = dfts > 0
displacements = pd.DataFrame({'displacements': dX[pos_ix],
'old_index_end': old_index_end[pos_ix],
'old_index_start': old_index_start[pos_ix],
'delta': dfts[pos_ix]})
return displacements
[docs]def pos_to_all_vel(trace, xcol='x', ycol='y', zcol=None, framecol='i',
delta=None, delta_max=None, deltas=None,
fixed_dt=None, force_independence=False):
"""For getting displacement distributions for all possible deltas
simultaneously.
>>> all_vel = df.groupby(bp.track_columns).apply(lambda df:
mla.pos_to_all_vel(df, xcol='dX', ycol='dY', framecol='tp.n'
))
fixed_dt should be the value of the fixed value of dt between consecutive
frames, if such a value exists
"""
# framecol could feasibly be an index column of the dataframe, so we should
# reset index just in case
trace = trace.reset_index()
t = trace[framecol].to_numpy()
ti = t[:,None] - np.zeros_like(t)[None,:]
tf = t[None,:] - np.zeros_like(t)[:,None]
dt = tf - ti
# always call output x, y, z, regardless of input
cols = {'x': xcol, 'y': ycol, 'z': zcol}
xs = {c: trace[col].to_numpy() for c, col in cols.items() if col is not None}
vs = {'v' + col: x[None,:] - x[:,None] for col, x in xs.items()}
good_dt = dt >= 0
if force_independence:
i = np.arange(len(ti))
overlap_id = i[None,:] % (i[None,:] - i[:,None])
good_dt &= overlap_id == 0
if delta:
good_dt &= dt == delta
if delta_max:
good_dt &= dt < delta_max
if deltas is not None:
good_dt &= np.isin(dt, deltas)
vs = {c: v[good_dt] for c, v in vs.items()}
vs.update({'ti': ti[good_dt], 'delta': dt[good_dt], 'tf': tf[good_dt]})
all_vel = pd.DataFrame(vs)
# if non-integer values are used here, we will likely need to do some
# rounding to make sure that values of delta taht are supposed to be equal
# actually are
if fixed_dt:
all_vel['delta'] /= fixed_dt
all_vel['delta'] = np.round(all_vel['delta'])
all_vel['delta'] *= fixed_dt
all_vel.set_index(['ti', 'delta'], inplace=True)
return all_vel
def vel_to_pos(vel, dxcol='vx', dycol='vy', framecol='tf'):
t = vel[framecol].as_matrix()
vx = vel[dxcol].as_matrix()
vy = vel[dycol].as_matrix()
sorted_ix = np.argsort(t)
df = pd.DataFrame({'x': np.cumsum(vx[sorted_ix]),
'y': np.cumsum(vy[sorted_ix])},
index=t[sorted_ix])
df.index.name = 't'
return df
[docs]def all_vel_to_corr(vel, dxcol='vx', dycol='vy', dzcol=None, framecol='tf',
max_dt=None, max_t_over_delta=4, allowed_dts=None):
"""
>>> all_vel.reset_index(level=pandas_util.multiindex_col_ix(all_vel, 'ti'), inplace=True)
"""
t = vel[framecol]
dx = vel[dxcol]
# cvv[i,j] = dx[i]*dx[j] + dy[i]*dy[j], so all n^2 dot products
cvv = dx[None,:]*dx[:,None]
if dycol is not None:
dy = vel[dycol]
cvv = cvv + dy[None,:]*dy[:,None]
if dzcol is not None:
dz = vel[dzcol]
cvv = cvv + dz[None,:]*dz[:,None]
tau = t[None,:] - np.zeros_like(t[:,None])
dts = t[None,:] - t[:,None]
good_ix = dts >= 0
if max_dt:
good_ix &= dts < max_dt
if max_t_over_delta and 'delta' in vel:
good_ix &= dts/vel['delta'].iloc[0] <= max_t_over_delta
if allowed_dts:
good_ix &= np.isin(dts, allowed_dts)
arrs = [pd.Series(arr[good_ix]) for arr in [dts, cvv, tau]]
cvvs = pd.concat(arrs, axis=1)
cvvs.columns = ['t', 'cvv', 'tau']
cvvs.set_index(['t', 'tau'], inplace=True)
return cvvs
[docs]def moments(df, cols=None, ns=[0,1,2]):
"""Take a DataFrame and optionally a list of columns of interest and for
each n in ns, calculate the nth moment of each column, where the 0th moment
in defined for convenience to be the count.
Returns a series with multi-index with two levels. Level 0 contains the
requested columns and level 1 contains the requested moment numbers (ns)."""
if cols:
df = df[cols]
def moment(n):
if n == 0:
return lambda x: np.nansum(np.isfinite(x))
elif n > 0:
return lambda x: np.nanmean(np.power(x, n))
# rows have cols, columns have ns
df = pd.DataFrame({n: df.apply(moment(n), axis=0) for n in ns})
df.rename_axis('Moment', axis='columns', inplace=True)
return df.T.unstack()
# end Apply to groupby'd DataFrame
###############}}}
###############{{{
# "combining" functions, post groupby/apply
[docs]def combine_moments(data):
"""Takes a column from output of df.groupby([...]).apply(moments) and
combines the values to get overall moments for full dataset.
Example:
.. code-block:: python
grouped_stats = df.groupby(['experiment']).apply(lp.moments)
overall_stats = grouped_stats.groupby(level=0, axis=1).apply(lp.combine_moments).unstack()
Example:
.. code-block:: python
def grouper(g):
return g.groupby(level=0, axis=1).apply(lp.combine_moments).unstack()
grouped_stats[new_col] = make_interesting_value(grouped_stats)
overall_stats = df.groupby(new_col).apply(grouper)
"""
# get top level column multi-index name if it exists
if data.columns.names[0] != 'Moment':
name = data.columns.levels[0][data.columns.labels[0][0]]
data = data[name]
total = np.nansum(data[0])
weights = data[0]/total
combined_vals = data.apply(lambda x: np.nansum(weights*x))
combined_vals[0] = total
return combined_vals
[docs]def cvv_by_hand_make_usable(cvv_stats, group_cols):
"""add columns to vvc_stats_by_hand output that we typically want. (cvv,
std, ste, cvv_normed, ste_normed). requires the group-by columns used to do
the normalization via msds using groupby"""
cvv_stats['cvv'] = cvv_stats['sum']/cvv_stats['cnt']
cvv_stats['std'] = np.sqrt(cvv_stats['sqs']/cvv_stats['cnt']
- np.power(cvv_stats['cvv'], 2))
cvv_stats['ste'] = cvv_stats['std']/np.sqrt(cvv_stats['cnt'])
def subtract_t0(cvvs):
cvv0 = cvvs[cvvs['t'] == 0]['cvv']
if len(cvv0) > 1:
raise ValueError('Ambiguous t0, did you groupby the wrong thing?')
cvvs['cvv_normed'] = cvvs['cvv']/cvv0.iloc[0]
cvvs['ste_normed'] = cvvs['ste']/cvv0.iloc[0]
return cvvs
cvv_stats = cvv_stats.groupby(group_cols + ['delta']).apply(subtract_t0)
return cvv_stats
# "combining" functions, post groupby/apply
###############}}}
###############{{{
# Doing Pandas stuff manually cause memory leaks
[docs]def groupby_apply_efficient(df, group_cols, apply_fun, n_between_gc=50,
print_progress=False, apply_to_cols=None):
"""An attempt to make groupby not leak memory. Kinda worked at one point
(late 2016)."""
output_df = []
for i, (group_vals, group) in enumerate(df.groupby(group_cols)):
if apply_to_cols:
group = group[apply_to_cols]
if i % n_between_gc == 0:
gc.collect()
if print_progress:
print('.')
group_df = apply_fun(group).reset_index()
for j, col in enumerate(group_cols):
group_df[col] = group_vals[j]
output_df.append(group_df)
return pd.concat(output_df, ignore_index=True)
[docs]def vels_to_cvvs_by_hand(vels, groups, file_name, framecol='ti', dxcol='vx',
dycol='vy', dzcol='vz', max_t_over_delta=None, max_dt=None,
allowed_dts=None, deltas_of_interest=None,
include_group=True):
"""should be passed a velocities dataframe (as from pos_to_all_vel.
groups vels by groups, optionally ignores group labels to make a flat
file with all vvc information with repeats of t,delta pairs as
appropriate."""
# plausibly, vels has some of groups in its index, so we will want to make
# them columns, espcially delta and framecol
vels = vels.reset_index()
if deltas_of_interest is None:
deltas_of_interest = vels.delta.unique()
# delta == 0, then all velocities equal zero, wasting computation time
deltas_of_interest = deltas_of_interest[deltas_of_interest > 0]
if 'delta' not in groups:
groups.append('delta')
with open(file_name, 'w') as f:
f.write('tau,t,cvv')
for group in groups:
f.write(',' + str(group))
f.write('\n')
for label, vel in vels[
np.isin(vels.delta, deltas_of_interest)
].groupby(groups):
t = vel[framecol]
dx = vel[dxcol]
dy = vel[dycol]
if dzcol is not None:
dz = vel[dzcol]
cvv = dx[None,:]*dx[:,None] + dy[None,:]*dy[:,None] + dz[None,:]*dz[:,None]
# cvv[i,j] = dx[i]*dx[j] + dy[i]*dy[j], so all n^2 dot products
else:
cvv = dx[None,:]*dx[:,None] + dy[None,:]*dy[:,None]
tau = t[None,:] - np.zeros_like(t[:,None])
dts = t[None,:] - t[:,None]
good_ix = dts >= 0
if max_dt:
good_ix &= dts < max_dt
if max_t_over_delta and 'delta' in vel:
good_ix &= dts/vel['delta'].iloc[0] <= max_t_over_delta
if allowed_dts:
good_ix &= np.isin(dts, allowed_dts)
# if tmax_or_dtmax:
# good_ix &= ((dts < tmax_or_deltamax[0])
# | (dts/vel['delta'].iloc[0] <= tmax_or_deltamax[1]))
arrs = [pd.Series(arr[good_ix]) for arr in [tau, dts, cvv]]
cvvs = pd.concat(arrs, axis=1)
cvvs.columns = ['tau', 't', 'cvv']
cvvs.dropna(inplace=True)
if include_group:
for i, group in enumerate(groups):
cvvs[group] = label[i]
cvvs.to_csv(f, index=False, header=False)
[docs]def vvc_stats_by_hand(file_name, groups, print_interval=None, skip_lines=None):
"""Calculates moments using the output of vels_to_cvvs_by_hand."""
grouped_sqs = {}
grouped_sum = {}
grouped_cnt = {}
group_ixs = []
group_names = []
with open(file_name) as f:
for i,line in enumerate(f):
# get group names on first go round
if i == 0:
columns = line.rstrip().split(',')
for j, column in enumerate(columns):
if column == 'cvv':
cvv_ix = j
if column in groups or column == 'delta' or column == 't':
group_ixs.append(j)
group_names.append(column)
group_ixs = np.array(group_ixs)
group_names = group_names
# <= accounts for first line being header
if skip_lines and i <= skip_lines:
continue
if print_interval and i % print_interval == 0:
print(i)
vals = np.array(line.rstrip().split(','))
try:
cvv = float(vals[cvv_ix])
except ValueError:
# cvv = np.nan
continue
group = tuple(vals[group_ixs])
if group not in grouped_sqs:
grouped_sqs[group] = 0
grouped_sum[group] = 0
grouped_cnt[group] = 0
grouped_sqs[group] += cvv*cvv
grouped_sum[group] += cvv
grouped_cnt[group] += 1
cvvsqs = pd.DataFrame.from_dict(grouped_sqs, orient='index')
cvv_stats = cvvsqs.reset_index()['index'].apply(pd.Series)
cvv_stats.columns = group_names
cvv_stats['sqs'] = cvvsqs[0].values
cvvsum = pd.DataFrame.from_dict(grouped_sum, orient='index')
cvv_stats['sum'] = cvvsum[0].values
cvvcnt = pd.DataFrame.from_dict(grouped_cnt, orient='index')
cvv_stats['cnt'] = cvvcnt[0].values
cvv_stats['t'] = pd.to_numeric(cvv_stats['t'])
cvv_stats['delta'] = pd.to_numeric(cvv_stats['delta'])
return cvv_stats
# end Doing Pandas stuff manually cause memory leaks
###############}}}