Muhamed-Kheir commited on
Commit
30e2586
·
verified ·
1 Parent(s): 47a59ac

Delete kmer_unique.py

Browse files
Files changed (1) hide show
  1. kmer_unique.py +0 -281
kmer_unique.py DELETED
@@ -1,281 +0,0 @@
1
- #!/usr/bin/env python3
2
- """
3
- Multi-group unique k-mer analysis.
4
-
5
- Given multiple directories (each directory = one group) containing FASTA files,
6
- computes k-mers unique to each group (relative to all other groups), filters by
7
- minimum frequency, writes per-group outputs, and saves a summary spreadsheet + plots.
8
-
9
- Example:
10
- python kmer_unique.py \
11
- --group-dirs data/groupA data/groupB data/groupC \
12
- --k-min 15 --k-max 31 --min-freq 5 \
13
- --outdir results
14
- """
15
-
16
- from __future__ import annotations
17
-
18
- import argparse
19
- import os
20
- from collections import Counter
21
- from dataclasses import dataclass
22
- from typing import Dict, Iterable, List, Sequence, Tuple
23
-
24
- import matplotlib.pyplot as plt
25
- import pandas as pd
26
- from Bio import SeqIO
27
-
28
-
29
- # -------------------------------
30
- # Data structures
31
- # -------------------------------
32
-
33
- @dataclass(frozen=True)
34
- class GroupData:
35
- name: str
36
- dirpath: str
37
- fasta_files: Tuple[str, ...]
38
-
39
-
40
- # -------------------------------
41
- # K-mer utilities
42
- # -------------------------------
43
-
44
- def get_kmers(sequence: str, k: int) -> List[str]:
45
- """Extract all k-mers from a sequence, excluding those with 'N'."""
46
- s = str(sequence).upper()
47
- if k <= 0 or len(s) < k:
48
- return []
49
- return [s[i:i + k] for i in range(len(s) - k + 1) if "N" not in s[i:i + k]]
50
-
51
-
52
- def kmers_in_records(records: Iterable, k: int) -> set:
53
- """Return the set of all k-mers present in a list of SeqRecords."""
54
- out = set()
55
- for rec in records:
56
- out.update(get_kmers(str(rec.seq), k))
57
- return out
58
-
59
-
60
- def count_kmers_in_records(records: Iterable, k: int) -> Counter:
61
- """Count k-mers across all sequences in a list of SeqRecords."""
62
- kmers: List[str] = []
63
- for rec in records:
64
- kmers.extend(get_kmers(str(rec.seq), k))
65
- return Counter(kmers)
66
-
67
-
68
- # -------------------------------
69
- # IO helpers
70
- # -------------------------------
71
-
72
- FASTA_EXTS = (".fas", ".fasta", ".fa", ".fna")
73
-
74
-
75
- def find_fasta_files(directory: str) -> List[str]:
76
- """Find all FASTA files in a directory."""
77
- files = []
78
- for fn in os.listdir(directory):
79
- path = os.path.join(directory, fn)
80
- if os.path.isfile(path) and fn.lower().endswith(FASTA_EXTS):
81
- files.append(path)
82
- files.sort()
83
- return files
84
-
85
-
86
- def parse_fasta_files(files: Sequence[str]) -> List:
87
- """Parse FASTA files into BioPython SeqRecord objects."""
88
- records = []
89
- for fp in files:
90
- try:
91
- records.extend(list(SeqIO.parse(fp, "fasta")))
92
- except Exception as e:
93
- print(f"⚠️ Warning: could not read {fp}: {e}")
94
- return records
95
-
96
-
97
- def write_kmer_output(filepath: str, kmer_dict: Dict[str, int], total_freq: int) -> None:
98
- """Write k-mers and their counts to a TSV file."""
99
- with open(filepath, "w", encoding="utf-8") as f:
100
- f.write(f"# Total frequency of unique k-mers: {total_freq}\n")
101
- f.write("kmer\tcount\n")
102
- for kmer, count in sorted(kmer_dict.items()):
103
- f.write(f"{kmer}\t{count}\n")
104
-
105
-
106
- # -------------------------------
107
- # Plotting
108
- # -------------------------------
109
-
110
- def plot_kmer_summary(summary_df: pd.DataFrame, output_dir: str) -> None:
111
- """Generate plots showing unique k-mers and total frequencies per group."""
112
- groups = [c.replace(" unique k-mers", "") for c in summary_df.columns if c.endswith(" unique k-mers")]
113
-
114
- # Plot 1: Number of unique k-mers
115
- plt.figure(figsize=(10, 6))
116
- for g in groups:
117
- plt.plot(summary_df["k"], summary_df[f"{g} unique k-mers"], marker="o", label=g)
118
- plt.xlabel("k-mer size (k)")
119
- plt.ylabel("Number of unique k-mers")
120
- plt.title("Unique k-mers per group across k sizes")
121
- plt.legend()
122
- plt.grid(True)
123
- plt.tight_layout()
124
- plt.savefig(os.path.join(output_dir, "unique_kmers_per_group.png"), dpi=300)
125
- plt.close()
126
-
127
- # Plot 2: Total frequency of unique k-mers
128
- plt.figure(figsize=(10, 6))
129
- for g in groups:
130
- plt.plot(summary_df["k"], summary_df[f"{g} total frequency"], marker="s", label=g)
131
- plt.xlabel("k-mer size (k)")
132
- plt.ylabel("Total frequency of unique k-mers")
133
- plt.title("Total frequency of unique k-mers per group across k sizes")
134
- plt.legend()
135
- plt.grid(True)
136
- plt.tight_layout()
137
- plt.savefig(os.path.join(output_dir, "total_freq_per_group.png"), dpi=300)
138
- plt.close()
139
-
140
- print(f"✅ Plots saved to: {output_dir}")
141
-
142
-
143
- # -------------------------------
144
- # Core logic
145
- # -------------------------------
146
-
147
- def compute_unique_kmers_per_group(
148
- group_records: Dict[str, List],
149
- k: int,
150
- min_freq: int,
151
- ) -> Tuple[Dict[str, Dict[str, int]], Dict[str, int]]:
152
- """
153
- For each group, compute (unique kmers -> counts) after filtering by min_freq,
154
- and total frequency of those unique kmers.
155
- """
156
- group_kmer_sets = {g: kmers_in_records(recs, k) for g, recs in group_records.items()}
157
- group_counts = {g: count_kmers_in_records(recs, k) for g, recs in group_records.items()}
158
-
159
- unique_counts: Dict[str, Dict[str, int]] = {}
160
- total_freqs: Dict[str, int] = {}
161
-
162
- group_names = list(group_records.keys())
163
- for g in group_names:
164
- unique = set(group_kmer_sets[g])
165
- for other in group_names:
166
- if other != g:
167
- unique -= group_kmer_sets[other]
168
-
169
- freq_unique = {km: group_counts[g][km] for km in unique if group_counts[g][km] >= min_freq}
170
- unique_counts[g] = freq_unique
171
- total_freqs[g] = sum(freq_unique.values())
172
-
173
- return unique_counts, total_freqs
174
-
175
-
176
- def run(
177
- group_dirs: Sequence[str],
178
- k_min: int,
179
- k_max: int,
180
- min_freq: int,
181
- output_dir: str,
182
- ) -> None:
183
- os.makedirs(output_dir, exist_ok=True)
184
-
185
- # Validate and collect groups
186
- groups: List[GroupData] = []
187
- for d in group_dirs:
188
- if not os.path.exists(d):
189
- raise FileNotFoundError(f"Group directory not found: {d}")
190
- if not os.path.isdir(d):
191
- raise NotADirectoryError(f"Not a directory: {d}")
192
-
193
- name = os.path.basename(d.rstrip(os.sep))
194
- fasta_files = tuple(find_fasta_files(d))
195
- if not fasta_files:
196
- raise FileNotFoundError(f"No FASTA files found in: {d}")
197
- groups.append(GroupData(name=name, dirpath=d, fasta_files=fasta_files))
198
-
199
- print("📂 Groups and file counts:")
200
- for g in groups:
201
- print(f" {g.name}: {len(g.fasta_files)} files")
202
-
203
- summary_rows: List[dict] = []
204
-
205
- for k in range(k_min, k_max + 1):
206
- print(f"\n🔹 Processing k = {k}")
207
-
208
- # Parse records once per k (same as your original logic)
209
- group_records = {g.name: parse_fasta_files(g.fasta_files) for g in groups}
210
-
211
- unique_counts, total_freqs = compute_unique_kmers_per_group(
212
- group_records=group_records,
213
- k=k,
214
- min_freq=min_freq,
215
- )
216
-
217
- row = {"k": k}
218
- for g in groups:
219
- counts = unique_counts[g.name]
220
- total_freq = total_freqs[g.name]
221
- print(f" Group {g.name}: {len(counts)} unique {k}-mers | Total freq: {total_freq}")
222
-
223
- out_path = os.path.join(output_dir, f"unique_k{k}_{g.name}.tsv")
224
- write_kmer_output(out_path, counts, total_freq)
225
-
226
- row[f"{g.name} unique k-mers"] = len(counts)
227
- row[f"{g.name} total frequency"] = total_freq
228
-
229
- summary_rows.append(row)
230
-
231
- summary_df = pd.DataFrame(summary_rows)
232
- summary_path = os.path.join(output_dir, "kmer_summary.xlsx")
233
- summary_df.to_excel(summary_path, index=False)
234
- print(f"\n✅ Summary saved to: {summary_path}")
235
-
236
- plot_kmer_summary(summary_df, output_dir)
237
-
238
-
239
- # -------------------------------
240
- # CLI
241
- # -------------------------------
242
-
243
- def build_arg_parser() -> argparse.ArgumentParser:
244
- p = argparse.ArgumentParser(
245
- description="Compute group-specific unique k-mers from FASTA directories.",
246
- formatter_class=argparse.ArgumentDefaultsHelpFormatter,
247
- )
248
- p.add_argument(
249
- "--group-dirs",
250
- nargs="+",
251
- required=True,
252
- help="List of directories, one per group, containing FASTA files.",
253
- )
254
- p.add_argument("--k-min", type=int, default=1, help="Minimum k-mer size.")
255
- p.add_argument("--k-max", type=int, default=50, help="Maximum k-mer size.")
256
- p.add_argument("--min-freq", type=int, default=5, help="Minimum frequency threshold for unique k-mers.")
257
- p.add_argument("--outdir", type=str, default="kmer_results", help="Output directory.")
258
- return p
259
-
260
-
261
- def main() -> None:
262
- args = build_arg_parser().parse_args()
263
-
264
- if args.k_min < 1:
265
- raise ValueError("--k-min must be >= 1")
266
- if args.k_max < args.k_min:
267
- raise ValueError("--k-max must be >= --k-min")
268
- if args.min_freq < 1:
269
- raise ValueError("--min-freq must be >= 1")
270
-
271
- run(
272
- group_dirs=args.group_dirs,
273
- k_min=args.k_min,
274
- k_max=args.k_max,
275
- min_freq=args.min_freq,
276
- output_dir=args.outdir,
277
- )
278
-
279
-
280
- if __name__ == "__main__":
281
- main()