ginipick commited on
Commit
36b1d6b
·
verified ·
1 Parent(s): 4b83eaf

Update src/main.py

Browse files
Files changed (1) hide show
  1. src/main.py +164 -92
src/main.py CHANGED
@@ -6,96 +6,126 @@ import io
6
  import cv2
7
  import numpy as np
8
  import os
9
- import requests
 
 
10
  from urllib.parse import quote, unquote
11
  import tempfile
12
  import re
 
 
 
 
 
 
 
 
13
 
14
  app = Flask(__name__, static_folder='static')
15
  app.config['TITLE'] = 'Sign Language Translate'
16
 
 
17
  nlp, dict_docs_spacy = sp.load_spacy_values()
18
  dataset, list_2000_tokens = dg.load_data()
19
 
20
- def clean_quotes(text):
 
 
 
 
 
21
  """따옴표 정리 함수"""
22
  text = re.sub(r"'+", "'", text)
23
  text = re.sub(r'\s+', ' ', text).strip()
24
  return text
25
 
26
- def is_korean(text):
 
27
  """한글이 포함되어 있는지 확인"""
28
  return bool(re.search('[가-힣]', text))
29
 
30
- def is_english(text):
 
31
  """텍스트가 영어인지 확인하는 함수"""
32
  text_without_quotes = re.sub(r"'[^']*'|\s", "", text)
33
  return bool(re.match(r'^[A-Za-z.,!?-]*$', text_without_quotes))
34
 
35
- def normalize_quotes(text):
 
36
  """따옴표 형식을 정규화하는 함수"""
37
  text = re.sub(r"'+", "'", text)
38
  text = re.sub(r'\s+', ' ', text).strip()
39
 
40
- # 이미 따옴표로 묶인 단어가 있으면 그대로 반환
41
  if re.search(r"'[^']*'", text):
42
  return text
43
-
44
  return text
45
 
46
- def find_quoted_words(text):
 
47
  """작은따옴표로 묶인 단어들을 찾는 함수"""
48
  return re.findall(r"'([^']*)'", text)
49
 
50
- def spell_out_word(word):
 
51
  """단어를 개별 알파벳으로 분리하는 함수"""
52
  return ' '.join(list(word.lower()))
53
 
54
- def translate_korean_text(text):
55
- """한글 전용 번역 함수"""
56
  try:
57
- quoted_match = re.search(r"'([^']*)'", text)
58
- if not quoted_match:
59
- return text
60
-
61
- quoted_word = quoted_match.group(1)
62
-
63
  url = "https://translate.googleapis.com/translate_a/single"
64
  params = {
65
  "client": "gtx",
66
- "sl": "ko",
67
- "tl": "en",
68
  "dt": "t",
69
- "q": text.replace(f"'{quoted_word}'", "XXXXX")
70
  }
71
- response = requests.get(url, params=params)
72
- if response.status_code != 200:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
73
  return text
74
 
75
- translated_text = ' '.join(item[0] for item in response.json()[0] if item[0])
76
 
77
- if re.match(r'^[A-Za-z]+$', quoted_word):
78
- proper_noun = quoted_word.upper()
79
- else:
80
- params["q"] = quoted_word
81
- response = requests.get(url, params=params)
82
- if response.status_code == 200:
83
- proper_noun = response.json()[0][0][0].upper()
84
- else:
85
  proper_noun = quoted_word.upper()
 
 
86
 
87
- final_text = translated_text.replace("XXXXX", f"'{proper_noun}'")
88
- final_text = re.sub(r'\bNAME\b', 'name', final_text)
89
- final_text = final_text.replace(" .", ".")
90
-
91
- return final_text
92
 
93
  except Exception as e:
94
- print(f"Korean translation error: {e}")
95
  return text
96
 
97
- def translate_korean_to_english(text):
98
- """전체 텍스트 번역 함수"""
99
  try:
100
  text = normalize_quotes(text)
101
 
@@ -107,19 +137,84 @@ def translate_korean_to_english(text):
107
  return text
108
 
109
  if is_korean(text):
110
- return translate_korean_text(text)
111
 
112
  return text
113
  except Exception as e:
114
- print(f"Translation error: {e}")
115
  return text
116
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
117
  @app.route('/')
118
  def index():
119
  return render_template('index.html', title=app.config['TITLE'])
120
 
121
  @app.route('/translate/', methods=['POST'])
122
- def result():
123
  if request.method == 'POST':
124
  input_text = request.form['inputSentence'].strip()
125
  if not input_text:
@@ -127,16 +222,23 @@ def result():
127
 
128
  try:
129
  input_text = normalize_quotes(input_text)
130
- english_text = translate_korean_to_english(input_text)
131
  if not english_text:
132
  raise Exception("Translation failed")
133
 
134
  quoted_words = find_quoted_words(english_text)
135
 
136
- clean_english = re.sub(r"'([^']*)'", r"\1", english_text)
137
- eng_to_asl_translator = NlpSpacyBaseTranslator(sentence=clean_english)
138
- generated_gloss = eng_to_asl_translator.translate_to_gloss()
 
 
 
 
 
 
139
 
 
140
  processed_gloss = []
141
  words = generated_gloss.split()
142
 
@@ -150,6 +252,7 @@ def result():
150
 
151
  gloss_sentence_before_synonym = " ".join(processed_gloss)
152
 
 
153
  final_gloss = []
154
  i = 0
155
  while i < len(processed_gloss):
@@ -164,7 +267,17 @@ def result():
164
  i += 1
165
  else:
166
  word = processed_gloss[i]
167
- final_gloss.append(sp.find_synonyms(word, nlp, dict_docs_spacy, list_2000_tokens))
 
 
 
 
 
 
 
 
 
 
168
  i += 1
169
 
170
  gloss_sentence_after_synonym = " ".join(final_gloss)
@@ -175,52 +288,11 @@ def result():
175
  english_translation=english_text,
176
  gloss_sentence_before_synonym=gloss_sentence_before_synonym,
177
  gloss_sentence_after_synonym=gloss_sentence_after_synonym)
 
178
  except Exception as e:
 
179
  return render_template('error.html', error=f"Translation error: {str(e)}")
180
 
181
- def generate_complete_video(gloss_list, dataset, list_2000_tokens):
182
- try:
183
- frames = []
184
- is_spelling = False
185
-
186
- for gloss in gloss_list:
187
- if gloss == 'FINGERSPELL-START':
188
- is_spelling = True
189
- continue
190
- elif gloss == 'FINGERSPELL-END':
191
- is_spelling = False
192
- continue
193
-
194
- for frame in dg.generate_video([gloss], dataset, list_2000_tokens):
195
- frame_data = frame.split(b'\r\n\r\n')[1]
196
- nparr = np.frombuffer(frame_data, np.uint8)
197
- img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
198
- frames.append(img)
199
-
200
- if not frames:
201
- raise Exception("No frames generated")
202
-
203
- height, width = frames[0].shape[:2]
204
- fourcc = cv2.VideoWriter_fourcc(*'mp4v')
205
-
206
- with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as temp_file:
207
- temp_path = temp_file.name
208
-
209
- out = cv2.VideoWriter(temp_path, fourcc, 25, (width, height))
210
-
211
- for frame in frames:
212
- out.write(frame)
213
- out.release()
214
-
215
- with open(temp_path, 'rb') as f:
216
- video_bytes = f.read()
217
-
218
- os.remove(temp_path)
219
- return video_bytes
220
- except Exception as e:
221
- print(f"Error generating video: {str(e)}")
222
- raise
223
-
224
  @app.route('/video_feed')
225
  def video_feed():
226
  sentence = request.args.get('gloss_sentence_to_display', '')
@@ -229,7 +301,7 @@ def video_feed():
229
  mimetype='multipart/x-mixed-replace; boundary=frame')
230
 
231
  @app.route('/download_video/<path:gloss_sentence>')
232
- def download_video(gloss_sentence):
233
  try:
234
  decoded_sentence = unquote(gloss_sentence)
235
  gloss_list = decoded_sentence.split()
@@ -249,7 +321,7 @@ def download_video(gloss_sentence):
249
  download_name='sign_language.mp4'
250
  )
251
  except Exception as e:
252
- print(f"Download error: {str(e)}")
253
  return f"Error downloading video: {str(e)}", 500
254
 
255
  if __name__ == "__main__":
 
6
  import cv2
7
  import numpy as np
8
  import os
9
+ import aiohttp
10
+ import asyncio
11
+ from concurrent.futures import ThreadPoolExecutor
12
  from urllib.parse import quote, unquote
13
  import tempfile
14
  import re
15
+ from functools import lru_cache
16
+ from typing import List, Dict, Any
17
+ import logging
18
+ from contextlib import contextmanager
19
+
20
+ # 로깅 설정
21
+ logging.basicConfig(level=logging.INFO)
22
+ logger = logging.getLogger(__name__)
23
 
24
  app = Flask(__name__, static_folder='static')
25
  app.config['TITLE'] = 'Sign Language Translate'
26
 
27
+ # 전역 변수를 초기화하고 캐싱
28
  nlp, dict_docs_spacy = sp.load_spacy_values()
29
  dataset, list_2000_tokens = dg.load_data()
30
 
31
+ # 스레드 풀 생성
32
+ executor = ThreadPoolExecutor(max_workers=4)
33
+
34
+ # 메모리 캐시 데코레이터
35
+ @lru_cache(maxsize=1000)
36
+ def clean_quotes(text: str) -> str:
37
  """따옴표 정리 함수"""
38
  text = re.sub(r"'+", "'", text)
39
  text = re.sub(r'\s+', ' ', text).strip()
40
  return text
41
 
42
+ @lru_cache(maxsize=1000)
43
+ def is_korean(text: str) -> bool:
44
  """한글이 포함되어 있는지 확인"""
45
  return bool(re.search('[가-힣]', text))
46
 
47
+ @lru_cache(maxsize=1000)
48
+ def is_english(text: str) -> bool:
49
  """텍스트가 영어인지 확인하는 함수"""
50
  text_without_quotes = re.sub(r"'[^']*'|\s", "", text)
51
  return bool(re.match(r'^[A-Za-z.,!?-]*$', text_without_quotes))
52
 
53
+ @lru_cache(maxsize=1000)
54
+ def normalize_quotes(text: str) -> str:
55
  """따옴표 형식을 정규화하는 함수"""
56
  text = re.sub(r"'+", "'", text)
57
  text = re.sub(r'\s+', ' ', text).strip()
58
 
 
59
  if re.search(r"'[^']*'", text):
60
  return text
 
61
  return text
62
 
63
+ @lru_cache(maxsize=1000)
64
+ def find_quoted_words(text: str) -> List[str]:
65
  """작은따옴표로 묶인 단어들을 찾는 함수"""
66
  return re.findall(r"'([^']*)'", text)
67
 
68
+ @lru_cache(maxsize=1000)
69
+ def spell_out_word(word: str) -> str:
70
  """단어를 개별 알파벳으로 분리하는 함수"""
71
  return ' '.join(list(word.lower()))
72
 
73
+ async def translate_text_chunk(session: aiohttp.ClientSession, text: str, source_lang: str, target_lang: str) -> str:
74
+ """비동기 텍스트 번역 함수"""
75
  try:
 
 
 
 
 
 
76
  url = "https://translate.googleapis.com/translate_a/single"
77
  params = {
78
  "client": "gtx",
79
+ "sl": source_lang,
80
+ "tl": target_lang,
81
  "dt": "t",
82
+ "q": text
83
  }
84
+
85
+ async with session.get(url, params=params) as response:
86
+ if response.status != 200:
87
+ logger.error(f"Translation API error: {response.status}")
88
+ return text
89
+
90
+ data = await response.json()
91
+ return ' '.join(item[0] for item in data[0] if item[0])
92
+
93
+ except Exception as e:
94
+ logger.error(f"Translation error: {e}")
95
+ return text
96
+
97
+ async def translate_korean_text(text: str) -> str:
98
+ """한글 전용 번역 함수 - 비동기 처리"""
99
+ try:
100
+ quoted_match = re.search(r"'([^']*)'", text)
101
+ if not quoted_match:
102
  return text
103
 
104
+ quoted_word = quoted_match.group(1)
105
 
106
+ async with aiohttp.ClientSession() as session:
107
+ # 본문 번역
108
+ main_text = text.replace(f"'{quoted_word}'", "XXXXX")
109
+ translated_main = await translate_text_chunk(session, main_text, "ko", "en")
110
+
111
+ # 인용된 단어 처리
112
+ if re.match(r'^[A-Za-z]+$', quoted_word):
 
113
  proper_noun = quoted_word.upper()
114
+ else:
115
+ proper_noun = (await translate_text_chunk(session, quoted_word, "ko", "en")).upper()
116
 
117
+ final_text = translated_main.replace("XXXXX", f"'{proper_noun}'")
118
+ final_text = re.sub(r'\bNAME\b', 'name', final_text)
119
+ final_text = final_text.replace(" .", ".")
120
+
121
+ return final_text
122
 
123
  except Exception as e:
124
+ logger.error(f"Korean translation error: {e}")
125
  return text
126
 
127
+ async def translate_korean_to_english(text: str) -> str:
128
+ """전체 텍스트 번역 함수 - 비동기 처리"""
129
  try:
130
  text = normalize_quotes(text)
131
 
 
137
  return text
138
 
139
  if is_korean(text):
140
+ return await translate_korean_text(text)
141
 
142
  return text
143
  except Exception as e:
144
+ logger.error(f"Translation error: {e}")
145
  return text
146
 
147
+ def process_frame(frame_data: bytes) -> np.ndarray:
148
+ """프레임 처리 함수"""
149
+ try:
150
+ frame_content = frame_data.split(b'\r\n\r\n')[1]
151
+ nparr = np.frombuffer(frame_content, np.uint8)
152
+ return cv2.imdecode(nparr, cv2.IMREAD_COLOR)
153
+ except Exception as e:
154
+ logger.error(f"Frame processing error: {e}")
155
+ raise
156
+
157
+ @contextmanager
158
+ def video_writer(path: str, frame_size: tuple, fps: int = 25):
159
+ """비디오 작성을 위한 컨텍스트 매니저"""
160
+ fourcc = cv2.VideoWriter_fourcc(*'mp4v')
161
+ writer = cv2.VideoWriter(path, fourcc, fps, frame_size)
162
+ try:
163
+ yield writer
164
+ finally:
165
+ writer.release()
166
+
167
+ def generate_complete_video(gloss_list: List[str], dataset: Dict[str, Any], list_2000_tokens: List[str]) -> bytes:
168
+ """최적화된 비디오 생성 함수"""
169
+ try:
170
+ frames = []
171
+ is_spelling = False
172
+
173
+ # 프레임 생성을 병렬로 처리
174
+ with ThreadPoolExecutor() as executor:
175
+ for gloss in gloss_list:
176
+ if gloss == 'FINGERSPELL-START':
177
+ is_spelling = True
178
+ continue
179
+ elif gloss == 'FINGERSPELL-END':
180
+ is_spelling = False
181
+ continue
182
+
183
+ frame_futures = [
184
+ executor.submit(process_frame, frame)
185
+ for frame in dg.generate_video([gloss], dataset, list_2000_tokens)
186
+ ]
187
+ frames.extend([future.result() for future in frame_futures])
188
+
189
+ if not frames:
190
+ raise Exception("No frames generated")
191
+
192
+ height, width = frames[0].shape[:2]
193
+
194
+ # 임시 파일 처리 최적화
195
+ with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as temp_file:
196
+ temp_path = temp_file.name
197
+
198
+ with video_writer(temp_path, (width, height)) as out:
199
+ for frame in frames:
200
+ out.write(frame)
201
+
202
+ with open(temp_path, 'rb') as f:
203
+ video_bytes = f.read()
204
+
205
+ os.remove(temp_path)
206
+ return video_bytes
207
+
208
+ except Exception as e:
209
+ logger.error(f"Video generation error: {str(e)}")
210
+ raise
211
+
212
  @app.route('/')
213
  def index():
214
  return render_template('index.html', title=app.config['TITLE'])
215
 
216
  @app.route('/translate/', methods=['POST'])
217
+ async def result():
218
  if request.method == 'POST':
219
  input_text = request.form['inputSentence'].strip()
220
  if not input_text:
 
222
 
223
  try:
224
  input_text = normalize_quotes(input_text)
225
+ english_text = await translate_korean_to_english(input_text)
226
  if not english_text:
227
  raise Exception("Translation failed")
228
 
229
  quoted_words = find_quoted_words(english_text)
230
 
231
+ # NLP 처리를 스레드 풀에서 실행
232
+ def process_nlp():
233
+ clean_english = re.sub(r"'([^']*)'", r"\1", english_text)
234
+ eng_to_asl_translator = NlpSpacyBaseTranslator(sentence=clean_english)
235
+ return eng_to_asl_translator.translate_to_gloss()
236
+
237
+ generated_gloss = await asyncio.get_event_loop().run_in_executor(
238
+ executor, process_nlp
239
+ )
240
 
241
+ # Gloss 처리 최적화
242
  processed_gloss = []
243
  words = generated_gloss.split()
244
 
 
252
 
253
  gloss_sentence_before_synonym = " ".join(processed_gloss)
254
 
255
+ # 동의어 처리 최적화
256
  final_gloss = []
257
  i = 0
258
  while i < len(processed_gloss):
 
267
  i += 1
268
  else:
269
  word = processed_gloss[i]
270
+ # 동의어 찾기를 스레드 풀에서 실행
271
+ final_gloss.append(
272
+ await asyncio.get_event_loop().run_in_executor(
273
+ executor,
274
+ sp.find_synonyms,
275
+ word,
276
+ nlp,
277
+ dict_docs_spacy,
278
+ list_2000_tokens
279
+ )
280
+ )
281
  i += 1
282
 
283
  gloss_sentence_after_synonym = " ".join(final_gloss)
 
288
  english_translation=english_text,
289
  gloss_sentence_before_synonym=gloss_sentence_before_synonym,
290
  gloss_sentence_after_synonym=gloss_sentence_after_synonym)
291
+
292
  except Exception as e:
293
+ logger.error(f"Translation processing error: {str(e)}")
294
  return render_template('error.html', error=f"Translation error: {str(e)}")
295
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
296
  @app.route('/video_feed')
297
  def video_feed():
298
  sentence = request.args.get('gloss_sentence_to_display', '')
 
301
  mimetype='multipart/x-mixed-replace; boundary=frame')
302
 
303
  @app.route('/download_video/<path:gloss_sentence>')
304
+ def download_video(gloss_sentence: str):
305
  try:
306
  decoded_sentence = unquote(gloss_sentence)
307
  gloss_list = decoded_sentence.split()
 
321
  download_name='sign_language.mp4'
322
  )
323
  except Exception as e:
324
+ logger.error(f"Video download error: {str(e)}")
325
  return f"Error downloading video: {str(e)}", 500
326
 
327
  if __name__ == "__main__":