#src.summary.utils.py import re from typing import List from concurrent.futures import ProcessPoolExecutor, as_completed from typing import Callable from transformers import AutoTokenizer from tiktoken import Encoding, encoding_for_model SCENE_INDICATORS = ['씬/','씬','SS##','S#','s#','S','s','#\d+.','\d+.'] def delete_special(pre_text, character_list): for c in character_list: pre_text = pre_text.replace(c, "") return pre_text def preprocess_script(script:str) -> str: lines = script.split("\n") new_text = "" for line in lines: line = delete_special(line, ["\n", "\t", "\xa0",'၀','ᝰ','ศ','ನ','tุ','\x00Ā\x00\x00\x00']) cleaned = re.sub('[^가-힣a-zA-Z0-9\s,.!?/#]',' ', line).strip() cleaned = delete_special(cleaned, [" "]).strip() cleaned = cleaned.replace("<|start|>", "").replace("<|end|>","") if len(cleaned)>0: new_text += f"{line}\n" new_text = new_text.strip() return new_text def preprocess_scripts(scripts:List[str]) -> List[str]: scripts = [preprocess_script(s) for s in scripts] return scripts def break_down2scenes(text: str): # Split the text using "s#" as the delimiter scenes = re.split(r'(s#\d+)', text) # Remove empty elements from the split results scenes = [scene for scene in scenes if scene.strip()] scenes_list = [] current_scene_number = None for i in range(0, len(scenes), 2): # Process the "s#" marker and corresponding text as pairs scene_marker = scenes[i].strip() scene_number = int(scene_marker.split('#')[1]) # Extract only the number scene_text = scenes[i+1].strip() if i+1 < len(scenes) else "" # Verify that the scene numbers are in the correct order if current_scene_number is not None: expected_scene_number = current_scene_number + 1 if scene_number != expected_scene_number: raise ValueError(f"Unexpected scene number: {scene_number}, expected {expected_scene_number}") # Save the scene number and text together scenes_list.append({ 'detected_scene_number': scene_number, 'text': f"{scene_marker}\n{scene_text}".strip() }) return scenes_list def chunk_script_gpt(script:str, model_id:str, chunk_size:int=-1) -> List[str]: if chunk_size == -1: chunks = [script] print("Single Inference Mode") return chunks encoding = encoding_for_model(model_id) scenes = break_down2scenes(script) len_scenes = len(scenes) chunks = [] if len_scenes > 10: print(f"Num of detected scenes : {len_scenes}") chunk = "" token_len_chunk = 0 for i, scene_data in enumerate(scenes): scene = scene_data["text"].strip() token_len_scene = len(encoding.encode_ordinary(scene)) if token_len_chunk + token_len_scene > chunk_size: if token_len_chunk == 0: chunk += scene token_len_chunk += token_len_scene else: chunks.append(chunk) chunk = scene token_len_chunk = token_len_scene else: chunk += scene token_len_chunk += token_len_scene if i == len_scenes-1: chunks.append(chunk) else: print(f"No Detected Scenes ({len_scenes})") tokenized_script = encoding.encode_ordinary(script) token_len_script = len(tokenized_script) for start in range(0,token_len_script,chunk_size): if start + chunk_size >= token_len_script: end = token_len_script+1 else: end = start+chunk_size chunk = encoding.decode(tokenized_script[start:end]) chunks.append(chunk) print(f"Num of chunks : {len(chunks)}") return chunks