lr-text-classification / spacy-tokenize.py
Bor Hodošček
feat: initial version
33dc3b3
#!/usr/bin/env python3
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "en-core-web-sm",
# "spacy",
# ]
#
# [tool.uv.sources]
# en-core-web-sm = { url = "https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.8.0/en_core_web_sm-3.8.0-py3-none-any.whl" }
# ///
#!/usr/bin/env python3
import argparse
import csv
import re
import sys
from pathlib import Path
import spacy
def main():
parser = argparse.ArgumentParser(
description="Tokenize text files and output chunked CSV"
)
parser.add_argument("files", nargs="+", help="Input text file(s) to process")
parser.add_argument(
"-n",
"--tokens",
type=int,
default=100,
help="Number of tokens per chunk (default: 100)",
)
parser.add_argument(
"-l",
"--label",
type=str,
help="Custom label for all chunks (defaults to filename)",
)
parser.add_argument(
"-o",
"--output",
type=str,
default="output.csv",
help="Output CSV filename (default: output.csv)",
)
parser.add_argument(
"-c",
"--max-chunks",
type=int,
help="Maximum number of chunks to output (default: unlimited)",
)
parser.add_argument(
"--lemma",
action="store_true",
help="Use lemmatized forms of tokens instead of original text",
)
args = parser.parse_args()
# Load spaCy model
nlp = spacy.load("en_core_web_sm")
# Process files and collect chunks
all_chunks = []
chunks_created = 0
for filename in args.files:
if args.max_chunks and chunks_created >= args.max_chunks:
break
filepath = Path(filename)
if not filepath.exists():
print(f"Warning: File '{filename}' not found, skipping...")
continue
try:
with open(filepath, "r", encoding="utf-8") as f:
text = f.read()
except Exception as e:
print(f"Error reading '{filename}': {e}")
continue
# Split on one or more newlines
segments = re.split(r"\n+", text)
# Remove empty segments
segments = [seg.strip() for seg in segments if seg.strip()]
# Process segments through spaCy pipe
all_tokens = []
for doc in nlp.pipe(segments):
# Extract tokens from each processed segment
if args.lemma:
tokens = [token.lemma_ for token in doc]
else:
tokens = [token.text for token in doc]
all_tokens.extend(tokens)
# Determine label
label = args.label if args.label else filepath.name
# Create chunks of n tokens
for i in range(0, len(all_tokens), args.tokens):
if args.max_chunks and chunks_created >= args.max_chunks:
break
chunk = all_tokens[i : i + args.tokens]
# Only include chunks with exactly n tokens
if len(chunk) == args.tokens:
chunk_text = " ".join(chunk)
all_chunks.append({"text": chunk_text, "label": label})
chunks_created += 1
# Write to CSV
if all_chunks:
with open(args.output, "w", newline="", encoding="utf-8") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=["text", "label"])
writer.writeheader()
writer.writerows(all_chunks)
print(f"Successfully wrote {len(all_chunks)} chunks to '{args.output}'")
if args.lemma:
print("Note: Tokens were lemmatized")
else:
print("No valid chunks to write.")
if __name__ == "__main__":
main()