Source code for gunshotmatch_pipeline.excluded_peaks

#!/usr/bin/env python3
#
#  config.py
"""
Report excluded peaks from various pipeline steps.

.. versionadded:: 0.10.0
"""
#
#  Copyright © 2024 Dominic Davis-Foster <dominic@davis-foster.co.uk>
#
#  Permission is hereby granted, free of charge, to any person obtaining a copy
#  of this software and associated documentation files (the "Software"), to deal
#  in the Software without restriction, including without limitation the rights
#  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
#  copies of the Software, and to permit persons to whom the Software is
#  furnished to do so, subject to the following conditions:
#
#  The above copyright notice and this permission notice shall be included in all
#  copies or substantial portions of the Software.
#
#  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
#  EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
#  MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
#  IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
#  DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
#  OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
#  OR OTHER DEALINGS IN THE SOFTWARE.
#

# stdlib
import io
from collections import deque
from contextlib import redirect_stdout
from typing import Dict, List, MutableSequence, NamedTuple, Optional, Sequence

# 3rd party
import attrs
from libgunshotmatch.consolidate import ConsolidatedPeak, InvertedFilter
from libgunshotmatch.datafile import Repeat
from libgunshotmatch.peak import PeakList, QualifiedPeak, align_peaks
from libgunshotmatch.project import Project, consolidate
from pyms.DPA.Alignment import Alignment
from pyms.Peak.Class import Peak

# this package
from gunshotmatch_pipeline.nist_ms_search import PyMSNISTSearchCfg, engine_on_demand

__all__ = [
		"ExcludedPeaks",
		"PeakCounts",
		"get_excluded_peaks",
		]
"""
Filtering steps:

After Biller-Biemann  (noise and base peak, prior peaks not recorded)
> Filtered peak list saved as Repeat.peaks
During alignment (to minimum peaks)
> Alignment after filtering is saved as Project.alignment
After alignment (to top_n_peaks and min_peak_area)
> Peaks which survive this filtering get qualified and go to Repeat.qualified_peaks
Consolidate process
"""


[docs]class PeakCounts(NamedTuple): """ Counts of peaks at different steps of the pipeline. """ #: Peak list after Biller-Biemann peak detection and subsequent noise and base peak filtering. found_peaks: int #: Peaks for which partners were found during alignment (after filtering to `min_peaks`). alignment_peaks: int #: Peaks which didn't survive alignment step (no/insufficient matching peaks in other datafiles). unaligned_peaks: int #: Peaks filtered after alignment due to size (``top_n_peaks`` and ``min_peak_area``). small_peaks: int
[docs]@attrs.define class ExcludedPeaks: """ Represents the excluded peaks in a project. """ #: Partial alignment of peaks. alignment: Alignment #: Small peaks small_peaks: List[Peak] #: Peaks filtered during the consolidate process filtered_out_consolidated_peaks: List[ConsolidatedPeak] #: Stdout from alignment and peak identification process, for debugging. debug_stdout: io.StringIO
def get_padded_peak_list( qualified_peak_list: List[QualifiedPeak], alignment_peak_list: Sequence[Optional[Peak]], ) -> Sequence[Optional[QualifiedPeak]]: padded_qp_list: MutableSequence[Optional[QualifiedPeak]] = [] qps = deque(qualified_peak_list) for ap in alignment_peak_list: if ap is None: padded_qp_list.append(None) else: qp = qps.popleft() assert qp.rt == ap.rt padded_qp_list.append(qp) return padded_qp_list
[docs]def get_excluded_peaks( project: Project, pyms_nist_search_config: PyMSNISTSearchCfg, peak_filter: InvertedFilter, ) -> ExcludedPeaks: """ Determine excluded peaks in a project. :param project: :param pyms_nist_search_config: :param peak_filter: """ per_repeat_peak_counts: Dict[str, PeakCounts] = {} # df = project.alignment.get_peak_alignment(require_all_expr=False) all_unaligned_peaks: List[PeakList] = [] repeat: Repeat for repeat in project.datafile_data.values(): # Peak list after BB and subsequent noise and base peak filtering found_peaks = repeat.peaks # Peaks for which partners were found during alignment (after filktering to `min_peaks`) alignment_peaks = project.alignment.peakpos[project.alignment.expr_code.index(repeat.name)] unaligned_peaks = PeakList() unaligned_peaks.datafile_name = repeat.name assert repeat.qualified_peaks is not None qualified_peak_list = get_padded_peak_list(repeat.qualified_peaks, alignment_peaks) ap_rt_map = {ap.rt: idx for idx, ap in enumerate(alignment_peaks) if ap is not None} assert repeat.qualified_peaks is not None for p in repeat.peaks: p_rt = p.rt if p_rt in ap_rt_map: ap = qualified_peak_list[ap_rt_map[p_rt]] else: ap = None ap_rt = ap.rt if ap else None if not ap_rt: unaligned_peaks.append(p) qualified_peaks = PeakList(repeat.qualified_peaks) qp_rt_map = {qp.rt: idx for idx, qp in enumerate(qualified_peaks)} # Peaks filtered after alignment due to size (``top_n_peaks`` and ``min_peak_area``) small_peaks = PeakList(ap for ap in alignment_peaks if ap and ap.rt not in qp_rt_map) # Make sure we haven't lost a peak along the way assert len(unaligned_peaks) + len(qualified_peaks) + len(small_peaks) == len(found_peaks) all_unaligned_peaks.append(unaligned_peaks) per_repeat_peak_counts[repeat.name] = PeakCounts( found_peaks=len(found_peaks), alignment_peaks=len(alignment_peaks), unaligned_peaks=len(unaligned_peaks), small_peaks=len(small_peaks), ) debug_stdout = io.StringIO() with redirect_stdout(debug_stdout): A1 = align_peaks(all_unaligned_peaks) peaks_to_consolidate_counts = {pc.alignment_peaks - pc.small_peaks for pc in per_repeat_peak_counts.values()} assert len(peaks_to_consolidate_counts) == 1 peaks_to_consolidate_count = next(iter(peaks_to_consolidate_counts)) with redirect_stdout(debug_stdout): with engine_on_demand(pyms_nist_search_config) as search: unfiltered_consolidated_peaks, _ = consolidate(project, search.engine) filtered_out_consolidated_peaks = peak_filter.filter(unfiltered_consolidated_peaks) # print(len(unfiltered_consolidated_peaks)) # print(len(filtered_out_consolidated_peaks)) # print(len(project.consolidated_peaks)) # print(peaks_to_consolidate_count) assert len(unfiltered_consolidated_peaks) == peaks_to_consolidate_count return ExcludedPeaks( alignment=A1, debug_stdout=debug_stdout, small_peaks=small_peaks, filtered_out_consolidated_peaks=filtered_out_consolidated_peaks, )