跳转至

细菌运动数据统计分析脚本

适用语言:Python 3.9+, R 4.0+
依赖库:pandas, numpy, matplotlib, seaborn, scipy
数据来源:Bactrak 服务器输出的 tracks.csv 文件

本教程提供常用的细菌运动轨迹数据分析脚本,包括速度分析、方向性分析、群体行为统计等。

1. 环境配置

1.1 Python 环境

pip install pandas numpy matplotlib seaborn scipy scikit-learn
pip install trackpy  # 轨迹分析专用库

1.2 R 环境(可选)

install.packages(c("tidyverse", "ggplot2", "trajr", "circular"))

2. 基础数据处理

2.1 数据加载与预处理

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

def load_tracks(file_path):
    """加载轨迹数据并添加时间信息"""
    df = pd.read_csv(file_path)

    # 假设帧率为 1 fps,可根据实际调整
    frame_rate = 1.0  # frames per second
    df['time'] = df['frame'] / frame_rate

    # 计算位移和速度
    df = df.sort_values(['track_id', 'frame'])
    df['dx'] = df.groupby('track_id')['x'].diff()
    df['dy'] = df.groupby('track_id')['y'].diff()
    df['displacement'] = np.sqrt(df['dx']**2 + df['dy']**2)
    df['speed'] = df['displacement'] * frame_rate

    return df

# 使用示例
tracks = load_tracks('tracks.csv')
print(f"总轨迹数: {tracks['track_id'].nunique()}")
print(f"总帧数: {tracks['frame'].nunique()}")

3. 运动特征分析

3.1 速度分布分析

def analyze_speed_distribution(tracks, output_dir='./'):
    """分析并可视化速度分布"""
    # 过滤掉 NaN 值
    speeds = tracks['speed'].dropna()

    fig, axes = plt.subplots(1, 2, figsize=(12, 5))

    # 速度直方图
    axes[0].hist(speeds, bins=50, density=True, alpha=0.7, edgecolor='black')
    axes[0].set_xlabel('速度 (μm/s)')
    axes[0].set_ylabel('概率密度')
    axes[0].set_title('速度分布直方图')

    # 速度箱线图(按轨迹)
    speed_by_track = tracks.groupby('track_id')['speed'].mean()
    axes[1].boxplot(speed_by_track, vert=True)
    axes[1].set_ylabel('平均速度 (μm/s)')
    axes[1].set_title('各轨迹平均速度分布')

    plt.tight_layout()
    plt.savefig(f'{output_dir}/speed_distribution.png', dpi=300)

    # 统计信息
    stats = {
        '平均速度': speeds.mean(),
        '中位数速度': speeds.median(),
        '标准差': speeds.std(),
        '最大速度': speeds.max(),
        '最小速度': speeds.min()
    }

    return stats

3.2 轨迹持续性分析

def calculate_persistence(tracks):
    """计算轨迹持续性(方向一致性)"""
    persistence_data = []

    for track_id, group in tracks.groupby('track_id'):
        if len(group) < 5:  # 轨迹太短,跳过
            continue

        # 计算运动方向
        angles = np.arctan2(group['dy'].values, group['dx'].values)

        # 计算角度变化
        angle_changes = np.diff(angles)
        # 处理角度周期性
        angle_changes = np.arctan2(np.sin(angle_changes), np.cos(angle_changes))

        # 持续性指标:方向变化的标准差倒数
        if len(angle_changes) > 0:
            persistence = 1 / (np.std(angle_changes) + 1e-6)
        else:
            persistence = 0

        persistence_data.append({
            'track_id': track_id,
            'persistence': persistence,
            'track_length': len(group),
            'mean_speed': group['speed'].mean()
        })

    return pd.DataFrame(persistence_data)

3.3 均方位移(MSD)分析

def calculate_msd(tracks, max_lag=100):
    """计算均方位移用于判断运动类型"""
    msd_data = []

    for track_id, group in tracks.groupby('track_id'):
        group = group.sort_values('frame')
        positions = group[['x', 'y']].values
        times = group['time'].values

        track_length = len(positions)
        if track_length < 10:
            continue

        # 计算不同时间延迟的 MSD
        for lag in range(1, min(max_lag, track_length)):
            displacements = positions[lag:] - positions[:-lag]
            squared_displacements = np.sum(displacements**2, axis=1)
            msd = np.mean(squared_displacements)
            delta_t = times[lag] - times[0]

            msd_data.append({
                'track_id': track_id,
                'lag': lag,
                'delta_t': delta_t,
                'msd': msd
            })

    msd_df = pd.DataFrame(msd_data)

    # 绘制 MSD 曲线
    plt.figure(figsize=(8, 6))

    # 绘制所有轨迹的平均 MSD
    avg_msd = msd_df.groupby('delta_t')['msd'].agg(['mean', 'std'])
    plt.errorbar(avg_msd.index, avg_msd['mean'], 
                yerr=avg_msd['std'], alpha=0.7, label='实验数据')

    # 添加理论参考线
    t = avg_msd.index
    plt.plot(t, 4 * 0.1 * t, '--', label='布朗运动 (α=1)')
    plt.plot(t, 4 * 0.1 * t**2, '-.', label='定向运动 (α=2)')

    plt.xlabel('时间延迟 Δt (s)')
    plt.ylabel('MSD (μm²)')
    plt.xscale('log')
    plt.yscale('log')
    plt.legend()
    plt.title('均方位移分析')
    plt.savefig('msd_analysis.png', dpi=300)

    return msd_df

4. 群体行为分析

4.1 密度分布热图

def create_density_heatmap(tracks, frame_range=None):
    """创建细菌密度分布热图"""
    if frame_range:
        tracks_subset = tracks[tracks['frame'].between(*frame_range)]
    else:
        tracks_subset = tracks

    plt.figure(figsize=(10, 8))

    # 创建2D直方图
    h = plt.hist2d(tracks_subset['x'], tracks_subset['y'], 
                   bins=50, cmap='hot', cmin=1)
    plt.colorbar(label='细胞数量')
    plt.xlabel('X 位置 (像素)')
    plt.ylabel('Y 位置 (像素)')
    plt.title('细菌空间密度分布')
    plt.axis('equal')
    plt.savefig('density_heatmap.png', dpi=300)

4.2 速度场分析

def analyze_velocity_field(tracks, grid_size=20, frame=None):
    """分析并可视化速度场"""
    if frame is not None:
        frame_data = tracks[tracks['frame'] == frame]
    else:
        # 使用所有帧的平均
        frame_data = tracks

    # 创建网格
    x_bins = np.linspace(frame_data['x'].min(), frame_data['x'].max(), grid_size)
    y_bins = np.linspace(frame_data['y'].min(), frame_data['y'].max(), grid_size)

    # 计算每个网格的平均速度矢量
    velocity_field = np.zeros((grid_size-1, grid_size-1, 2))
    counts = np.zeros((grid_size-1, grid_size-1))

    for i in range(grid_size-1):
        for j in range(grid_size-1):
            mask = (frame_data['x'] >= x_bins[i]) & (frame_data['x'] < x_bins[i+1]) & \
                   (frame_data['y'] >= y_bins[j]) & (frame_data['y'] < y_bins[j+1])

            if mask.sum() > 0:
                velocity_field[j, i, 0] = frame_data.loc[mask, 'dx'].mean()
                velocity_field[j, i, 1] = frame_data.loc[mask, 'dy'].mean()
                counts[j, i] = mask.sum()

    # 可视化
    plt.figure(figsize=(10, 8))

    # 背景:细胞密度
    plt.imshow(counts.T, origin='lower', cmap='Blues', alpha=0.5,
              extent=[x_bins[0], x_bins[-1], y_bins[0], y_bins[-1]])

    # 速度矢量
    X, Y = np.meshgrid(x_bins[:-1] + np.diff(x_bins)/2,
                       y_bins[:-1] + np.diff(y_bins)/2)

    # 只在有细胞的地方画箭头
    mask = counts > 2
    plt.quiver(X[mask], Y[mask], 
              velocity_field[:,:,0][mask], 
              velocity_field[:,:,1][mask],
              scale=50, color='red', alpha=0.7)

    plt.xlabel('X 位置 (像素)')
    plt.ylabel('Y 位置 (像素)')
    plt.title('细菌群体速度场')
    plt.colorbar(label='细胞数量')
    plt.savefig('velocity_field.png', dpi=300)

5. 统计报告生成

def generate_report(tracks, output_file='analysis_report.txt'):
    """生成综合统计报告"""
    with open(output_file, 'w', encoding='utf-8') as f:
        f.write("===== 细菌运动轨迹分析报告 =====\n\n")

        # 基本统计
        f.write("1. 基本统计信息\n")
        f.write(f"   - 总轨迹数: {tracks['track_id'].nunique()}\n")
        f.write(f"   - 总帧数: {tracks['frame'].nunique()}\n")
        f.write(f"   - 时间跨度: {tracks['time'].max():.1f}\n\n")

        # 轨迹长度统计
        track_lengths = tracks.groupby('track_id').size()
        f.write("2. 轨迹长度统计\n")
        f.write(f"   - 平均长度: {track_lengths.mean():.1f}\n")
        f.write(f"   - 最长轨迹: {track_lengths.max()}\n")
        f.write(f"   - 最短轨迹: {track_lengths.min()}\n\n")

        # 速度统计
        speeds = tracks['speed'].dropna()
        f.write("3. 速度统计 (μm/s)\n")
        f.write(f"   - 平均速度: {speeds.mean():.2f}\n")
        f.write(f"   - 速度标准差: {speeds.std():.2f}\n")
        f.write(f"   - 最大速度: {speeds.max():.2f}\n")
        f.write(f"   - 中位数速度: {speeds.median():.2f}\n\n")

        # 运动类型分类
        persistence_df = calculate_persistence(tracks)
        high_persistence = (persistence_df['persistence'] > 5).sum()
        f.write("4. 运动类型分析\n")
        f.write(f"   - 高持续性轨迹: {high_persistence} ({high_persistence/len(persistence_df)*100:.1f}%)\n")
        f.write(f"   - 低持续性轨迹: {len(persistence_df)-high_persistence} ({(1-high_persistence/len(persistence_df))*100:.1f}%)\n")

    print(f"报告已生成: {output_file}")

6. 批量分析脚本

# batch_analysis.py
import os
import glob
from pathlib import Path

def batch_analyze(input_dir, output_dir):
    """批量分析多个实验数据"""
    os.makedirs(output_dir, exist_ok=True)

    # 查找所有 CSV 文件
    csv_files = glob.glob(os.path.join(input_dir, '*.csv'))

    results_summary = []

    for csv_file in csv_files:
        print(f"\n处理: {csv_file}")
        exp_name = Path(csv_file).stem
        exp_output_dir = os.path.join(output_dir, exp_name)
        os.makedirs(exp_output_dir, exist_ok=True)

        # 加载数据
        tracks = load_tracks(csv_file)

        # 执行各项分析
        os.chdir(exp_output_dir)  # 切换到输出目录

        # 速度分析
        speed_stats = analyze_speed_distribution(tracks)

        # MSD 分析
        msd_df = calculate_msd(tracks)

        # 密度热图
        create_density_heatmap(tracks)

        # 速度场
        analyze_velocity_field(tracks)

        # 生成报告
        generate_report(tracks)

        # 收集结果摘要
        results_summary.append({
            'experiment': exp_name,
            'n_tracks': tracks['track_id'].nunique(),
            'mean_speed': speed_stats['平均速度'],
            'std_speed': speed_stats['标准差']
        })

    # 生成总结报告
    summary_df = pd.DataFrame(results_summary)
    summary_df.to_csv(os.path.join(output_dir, 'batch_summary.csv'), index=False)

    print("\n批量分析完成!")

# 使用示例
if __name__ == "__main__":
    batch_analyze('./data/', './results/')

7. 高级可视化

7.1 3D 轨迹可视化(时空轨迹)

from mpl_toolkits.mplot3d import Axes3D

def plot_3d_trajectories(tracks, sample_tracks=10):
    """绘制3D时空轨迹"""
    fig = plt.figure(figsize=(12, 8))
    ax = fig.add_subplot(111, projection='3d')

    # 随机采样轨迹
    track_ids = tracks['track_id'].unique()
    if len(track_ids) > sample_tracks:
        track_ids = np.random.choice(track_ids, sample_tracks, replace=False)

    # 绘制每条轨迹
    for track_id in track_ids:
        track = tracks[tracks['track_id'] == track_id].sort_values('frame')
        ax.plot(track['x'], track['y'], track['time'], alpha=0.7)

    ax.set_xlabel('X 位置 (像素)')
    ax.set_ylabel('Y 位置 (像素)')
    ax.set_zlabel('时间 (s)')
    ax.set_title('细菌运动时空轨迹')

    plt.savefig('3d_trajectories.png', dpi=300)

8. 导出与整合

def export_for_further_analysis(tracks, output_prefix='processed'):
    """导出处理后的数据供其他软件使用"""
    # 导出为 MATLAB 格式
    import scipy.io
    scipy.io.savemat(f'{output_prefix}.mat', {
        'tracks': tracks.to_dict('list')
    })

    # 导出为 R 友好格式
    tracks.to_csv(f'{output_prefix}_for_R.csv', index=False)

    # 导出轨迹摘要
    summary = tracks.groupby('track_id').agg({
        'x': ['min', 'max', 'mean'],
        'y': ['min', 'max', 'mean'],
        'speed': ['mean', 'std', 'max'],
        'frame': 'count'
    })
    summary.columns = ['_'.join(col).strip() for col in summary.columns]
    summary.to_csv(f'{output_prefix}_summary.csv')

9. 版本记录

日期 版本 说明
2025-07-04 v1.0 初版发布,包含基础分析脚本

注意事项: - 所有脚本中的像素到微米转换系数需根据实际显微镜标定 - 帧率参数需根据实际拍摄设置调整 - 大数据集建议使用 HDF5 格式存储和处理

如需更多分析功能或遇到问题,请联系 @yipeng