| """ |
| 扫描 repos_filtered/ 目录,统计仓库元画像 |
| 文件数/大小/扩展名分布/工程化信号等 |
| """ |
| import os |
| from pathlib import Path |
| from collections import Counter, defaultdict |
| from tqdm import tqdm |
| import json |
| import statistics |
|
|
|
|
| class RepoMetaScan: |
| def __init__(self, repos_dir, output_dir, top_n=None): |
| self.repos_dir = Path(repos_dir) |
| self.output_dir = Path(output_dir) |
| self.output_dir.mkdir(parents=True, exist_ok=True) |
| self.top_n = top_n |
| |
| |
| self.eng_signals = { |
| 'files': [ |
| 'Dockerfile', 'docker-compose.yml', 'requirements.txt', 'setup.py', |
| 'pyproject.toml', 'package.json', 'pom.xml', 'Makefile', 'LICENSE', |
| 'CITATION.cff', 'CMakeLists.txt', 'Cargo.toml', 'go.mod', 'go.sum' |
| ], |
| 'dirs': [ |
| '.github/workflows', 'tests', 'test', 'docs', 'doc', 'examples', |
| 'example', 'data', 'notebooks', 'notebook', 'scripts', 'script' |
| ] |
| } |
| |
| self.stats = [] |
| |
| def get_repo_full_name(self, dir_name): |
| """将目录名转换为full_name (owner___repo -> owner/repo)""" |
| return dir_name.replace('___', '/') |
| |
| def scan_repo(self, repo_path): |
| """扫描单个仓库""" |
| repo_name = repo_path.name |
| full_name = self.get_repo_full_name(repo_name) |
| |
| stats = { |
| 'repo_name': repo_name, |
| 'full_name': full_name, |
| 'total_files': 0, |
| 'total_size_bytes': 0, |
| 'max_file_size_bytes': 0, |
| 'extensions': Counter(), |
| 'has_ipynb': False, |
| 'ipynb_count': 0, |
| 'eng_signals': defaultdict(bool), |
| 'has_git': os.path.isdir(repo_path / '.git'), |
| } |
| |
| |
| skip_dirs = {'.git', 'node_modules', 'vendor', 'dist', 'build', '__pycache__', '.pytest_cache'} |
| |
| for root, dirs, files in os.walk(repo_path): |
| |
| dirs[:] = [d for d in dirs if d not in skip_dirs] |
| |
| |
| rel_path = Path(root).relative_to(repo_path) |
| for signal_dir in self.eng_signals['dirs']: |
| if signal_dir in str(rel_path): |
| stats['eng_signals'][signal_dir] = True |
| |
| for file in files: |
| file_path = Path(root) / file |
| |
| |
| try: |
| file_size = file_path.stat().st_size |
| stats['total_size_bytes'] += file_size |
| stats['max_file_size_bytes'] = max(stats['max_file_size_bytes'], file_size) |
| except: |
| continue |
| |
| stats['total_files'] += 1 |
| |
| |
| ext = file_path.suffix.lower() |
| if not ext: |
| ext = '<noext>' |
| stats['extensions'][ext] += 1 |
| |
| |
| if ext == '.ipynb': |
| stats['has_ipynb'] = True |
| stats['ipynb_count'] += 1 |
| |
| |
| for signal_file in self.eng_signals['files']: |
| if file == signal_file: |
| stats['eng_signals'][signal_file] = True |
| |
| |
| stats['extensions'] = dict(stats['extensions']) |
| stats['eng_signals'] = dict(stats['eng_signals']) |
| |
| return stats |
| |
| def scan_all_repos(self): |
| """扫描所有仓库(字典序前top_n,如果top_n为None则扫描所有)""" |
| print(f"Scanning repos in {self.repos_dir}...") |
| |
| |
| all_repos = sorted([d for d in self.repos_dir.iterdir() if d.is_dir()]) |
| if self.top_n is None: |
| selected_repos = all_repos |
| print(f"Selected {len(selected_repos)} repos (all repos)") |
| else: |
| selected_repos = all_repos[:self.top_n] |
| print(f"Selected {len(selected_repos)} repos (top {self.top_n} by alphabetical order)") |
| |
| for repo_path in tqdm(selected_repos, desc="Scanning repos"): |
| try: |
| stats = self.scan_repo(repo_path) |
| self.stats.append(stats) |
| except Exception as e: |
| print(f"Error scanning {repo_path}: {e}") |
| continue |
| |
| def save_results(self): |
| """保存结果""" |
| import pandas as pd |
| |
| |
| df = pd.DataFrame(self.stats) |
| top_n_suffix = f"_top{self.top_n}" if self.top_n else "" |
| df.to_csv(self.output_dir / f'repo_meta_scan{top_n_suffix}.csv', index=False) |
| |
| |
| summary = { |
| 'total_repos': len(self.stats), |
| 'total_files': sum(s['total_files'] for s in self.stats), |
| 'total_size_gb': sum(s['total_size_bytes'] for s in self.stats) / (1024**3), |
| 'avg_files_per_repo': statistics.mean([s['total_files'] for s in self.stats]) if self.stats else 0, |
| 'avg_size_mb_per_repo': statistics.mean([s['total_size_bytes'] for s in self.stats]) / (1024**2) if self.stats else 0, |
| 'repos_with_ipynb': sum(1 for s in self.stats if s['has_ipynb']), |
| 'total_ipynb_files': sum(s['ipynb_count'] for s in self.stats), |
| } |
| |
| |
| all_extensions = Counter() |
| for s in self.stats: |
| all_extensions.update(s['extensions']) |
| |
| summary['top_extensions'] = dict(all_extensions.most_common(30)) |
| |
| |
| eng_counts = defaultdict(int) |
| for s in self.stats: |
| for signal, present in s['eng_signals'].items(): |
| if present: |
| eng_counts[signal] += 1 |
| |
| summary['engineering_signals'] = dict(eng_counts) |
| |
| with open(self.output_dir / 'repo_meta_summary.json', 'w', encoding='utf-8') as f: |
| json.dump(summary, f, indent=2, ensure_ascii=False) |
| |
| |
| ext_df = pd.DataFrame([ |
| {'extension': ext, 'count': count} |
| for ext, count in all_extensions.most_common(50) |
| ]) |
| ext_df.to_csv(self.output_dir / 'extension_distribution.csv', index=False) |
| |
| |
| eng_df = pd.DataFrame([ |
| {'signal': signal, 'count': count, 'percentage': count / len(self.stats) * 100} |
| for signal, count in sorted(eng_counts.items(), key=lambda x: -x[1]) |
| ]) |
| eng_df.to_csv(self.output_dir / 'engineering_signals.csv', index=False) |
| |
| def run(self): |
| """执行完整流程""" |
| print("Scanning repository metadata...") |
| self.scan_all_repos() |
| print("Saving results...") |
| self.save_results() |
| print(f"Repo meta scan complete! Results saved to {self.output_dir}") |
|
|
|
|
| if __name__ == "__main__": |
| repos_dir = "/home/weifengsun/tangou1/domain_code/src/workdir/repos_filtered" |
| output_dir = "/home/weifengsun/tangou1/domain_code/src/workdir/reporting/repo_meta" |
| scanner = RepoMetaScan(repos_dir, output_dir, top_n=None) |
| scanner.run() |
|
|
|
|