File size: 4,040 Bytes
eaa3d8a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#src.summary.utils.py
import re
from typing import List
from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import Callable

from transformers import AutoTokenizer
from tiktoken import Encoding, encoding_for_model

SCENE_INDICATORS = ['씬/','씬','SS##','S#','s#','S','s','#\d+.','\d+.']

def delete_special(pre_text, character_list):
        for c in character_list:
            pre_text = pre_text.replace(c, "")
        return pre_text

def preprocess_script(script:str) -> str:

    lines = script.split("\n")
    
    new_text = ""
    for line in lines:
        line = delete_special(line, ["\n", "\t", "\xa0",'၀','ᝰ','ศ','ನ','tุ','\x00Ā\x00\x00\x00'])
        cleaned = re.sub('[^가-힣a-zA-Z0-9\s,.!?/#]',' ', line).strip()
        cleaned = delete_special(cleaned, ["  "]).strip()
        cleaned = cleaned.replace("<|start|>", "").replace("<|end|>","")
        if len(cleaned)>0:
            new_text += f"{line}\n"
    new_text = new_text.strip()
    
    return new_text


def preprocess_scripts(scripts:List[str]) -> List[str]:
    scripts = [preprocess_script(s) for s in scripts]

    return scripts

def break_down2scenes(text: str):
    # Split the text using "s#" as the delimiter
    scenes = re.split(r'(s#\d+)', text)
    
    # Remove empty elements from the split results
    scenes = [scene for scene in scenes if scene.strip()]
    
    scenes_list = []
    current_scene_number = None

    for i in range(0, len(scenes), 2):  # Process the "s#" marker and corresponding text as pairs
        scene_marker = scenes[i].strip()
        scene_number = int(scene_marker.split('#')[1])  # Extract only the number
        scene_text = scenes[i+1].strip() if i+1 < len(scenes) else ""

        # Verify that the scene numbers are in the correct order
        if current_scene_number is not None:
            expected_scene_number = current_scene_number + 1
            if scene_number != expected_scene_number:
                raise ValueError(f"Unexpected scene number: {scene_number}, expected {expected_scene_number}")

        # Save the scene number and text together
        scenes_list.append({
            'detected_scene_number': scene_number,
            'text': f"{scene_marker}\n{scene_text}".strip()
        })
    return scenes_list


def chunk_script_gpt(script:str,
                    model_id:str,
                    chunk_size:int=-1) -> List[str]:
    if chunk_size == -1:
        chunks = [script]
        print("Single Inference Mode")
        return chunks

    encoding = encoding_for_model(model_id)
    
    scenes = break_down2scenes(script)
    
    len_scenes = len(scenes)

    chunks = []
    if len_scenes > 10:
        print(f"Num of detected scenes : {len_scenes}")

        chunk = ""
        token_len_chunk = 0
        for i, scene_data in enumerate(scenes):
            scene = scene_data["text"].strip()
            token_len_scene = len(encoding.encode_ordinary(scene))
            if token_len_chunk + token_len_scene > chunk_size:
                if token_len_chunk == 0:
                    chunk += scene
                    token_len_chunk += token_len_scene
                else:
                    chunks.append(chunk)
                    chunk = scene
                    token_len_chunk = token_len_scene
            else:
                chunk += scene
                token_len_chunk += token_len_scene

            if i == len_scenes-1:
                chunks.append(chunk)
    else:
        print(f"No Detected Scenes ({len_scenes})")
        tokenized_script = encoding.encode_ordinary(script)
        token_len_script = len(tokenized_script)
        for start in range(0,token_len_script,chunk_size):
            if start + chunk_size >= token_len_script:
                end = token_len_script+1
            else:
                end = start+chunk_size
            
            chunk = encoding.decode(tokenized_script[start:end])
            chunks.append(chunk)
    print(f"Num of chunks : {len(chunks)}")
    return chunks