Source code for packing_defect.core.analyzers.radius

# packing_defect/core/analyzers/radius.py

import os
import subprocess
import numpy as np
import MDAnalysis as mda

from packing_defect.core.analyzers.base import BaseDefectAnalyzer
from packing_defect.core.grid import DefectGrid
from packing_defect.utils import apply_pbc, compute_pairwise_distances


[docs] class RadiusDefectAnalyzer(BaseDefectAnalyzer): """ Defect analysis based on radius-cutoff GRO filtering. """ def __init__( self, universe, base_directory, output_dir, lipid_types, frame_start, frame_end, protein_atom_count, apply_protein_cutoff=True, cutoff_distance=1.5, ): super().__init__(universe, output_dir, lipid_types) self.base_directory = base_directory self.frame_start = frame_start self.frame_end = frame_end self.protein_atom_count = protein_atom_count self.apply_protein_cutoff = apply_protein_cutoff self.cutoff_distance = cutoff_distance self.defects_up = {} self.defects_down = {} self.defects_combined = {} # ------------------- Internal helpers ------------------- def _write_filtered_gro(self, input_file, output_file): """Filter defect atoms by protein cutoff and write new GRO.""" with open(input_file, "r", encoding="utf-8") as f: lines = f.readlines() header, footer = lines[0], lines[-1] prot = lines[2 : 2 + self.protein_atom_count] defect = lines[2 + self.protein_atom_count : -1] if self.apply_protein_cutoff: ppos = np.array([list(map(float, l[20:44].split())) for l in prot]) dpos = np.array([list(map(float, l[20:44].split())) for l in defect]) mins = np.min(compute_pairwise_distances(dpos, ppos), axis=1) defect = [d for i, d in enumerate(defect) if mins[i] > self.cutoff_distance] with open(output_file, "w", encoding="utf-8") as f: f.write(header) f.write(f"{len(prot)+len(defect)}\n") f.writelines(prot + defect) f.write(footer) def _renumber_gro(self, input_file, output_file): """Renumber atoms using GROMACS genconf.""" try: subprocess.run( ["gmx", "genconf", "-f", input_file, "-o", output_file, "-renumber"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) except subprocess.CalledProcessError: print(f" Failed renumbering {input_file}") def _calculate_defects(self, universe, combine=False): """Calculate defect cluster sizes for one Universe.""" up_sizes, dw_sizes = [], [] for ts in universe.trajectory: box = ts.dimensions[:3] apply_pbc(universe.atoms.positions, box) half = universe.select_atoms("prop z > 0") hz = np.mean(half.positions[:, 2]) grid = DefectGrid(box_xy=(box[0], box[1]), dx=1.0, dy=1.0, hz=hz) for atom in universe.select_atoms(f"prop z > {hz}"): grid.update(*atom.position, r=0.0, code=1, leaflet="up") for atom in universe.select_atoms(f"prop z < {hz}"): grid.update(*atom.position, r=0.0, code=1, leaflet="dw") up_sizes.extend(grid.cluster_sizes("up")) dw_sizes.extend(grid.cluster_sizes("dw")) return (up_sizes + dw_sizes) if combine else (up_sizes, dw_sizes) # ------------------- Required overrides -------------------
[docs] def run(self): """Main loop: filter, renumber, analyze defects for each lipid type.""" for lipid_type in self.lipid_types: print(f"\n▶ Processing {lipid_type}...") input_dir = os.path.join(self.base_directory, lipid_type) output_dir = os.path.join(self.output_dir, lipid_type) os.makedirs(output_dir, exist_ok=True) # Step 1: filter frames filtered_files = [] for i in range(self.frame_start, self.frame_end + 1): inp = os.path.join(input_dir, f"{lipid_type}_frame_{i}.gro") out = os.path.join(output_dir, f"{lipid_type}_corrected_frame_{i}.gro") if os.path.exists(inp): self._write_filtered_gro(inp, out) filtered_files.append(out) # Step 2: renumber renumbered_files = [] for f in filtered_files: out = os.path.join(output_dir, "renumbered_" + os.path.basename(f)) self._renumber_gro(f, out) renumbered_files.append(out) # Step 3: calculate defects up, down, combined = [], [], [] for f in renumbered_files: u = mda.Universe(f) up_frame, down_frame = self._calculate_defects(u, combine=False) up.extend(up_frame) down.extend(down_frame) combined.extend(up_frame + down_frame) # store self.results[lipid_type] = {"up": up, "down": down, "combined": combined} self.defects_up[lipid_type] = up self.defects_down[lipid_type] = down self.defects_combined[lipid_type] = combined print(f"Done: {lipid_type}, Frames processed: {len(renumbered_files)}")
[docs] def plot(self): """Plot histograms of defect sizes.""" import matplotlib.pyplot as plt def plot_hist(defects, label, color): h, bins = np.histogram(defects, bins=np.linspace(0, 150, 600)) h[0] = 0 total = h.sum() if total == 0: return centers = 0.5 * (bins[1:] + bins[:-1]) plt.scatter(centers, h / total, label=label, color=color) plt.yscale("log") # Up/Down plt.figure(figsize=(8, 6)) for lipid, color in zip(self.lipid_types, ["red", "blue", "green"]): plot_hist(self.defects_up[lipid], f"{lipid} Up", color) plot_hist(self.defects_down[lipid], f"{lipid} Down", color) plt.legend() plt.title("Top and Bottom Defects") plt.xlabel("Defect Size") plt.ylabel("Probability (log)") plt.savefig(os.path.join(self.output_dir, "top_bottom_defects.png"), dpi=300) plt.close() # Combined plt.figure(figsize=(8, 6)) for lipid, color in zip(self.lipid_types, ["red", "blue", "green"]): plot_hist(self.defects_combined[lipid], f"{lipid} Combined", color) plt.legend() plt.title("Combined Defects") plt.xlabel("Defect Size") plt.ylabel("Probability (log)") plt.savefig(os.path.join(self.output_dir, "combined_defects.png"), dpi=300) plt.close()