Spaces:
Paused
Paused
import traceback | |
import time | |
from lxml import html | |
import os | |
import re | |
import requests | |
import json | |
from modules import untils | |
from modules.sources import Sources | |
from modules.browsers import FireFoxBrowser, ChromeUndetectedBrowser | |
from modules.sources import Manga | |
from modules.websites import TruyenFull, system_status | |
from modules import g_config, logger | |
class ThichTruyen(Sources): | |
def __init__(self): | |
super().__init__() | |
self.temp_domains = ["thichtruyen.vn"] | |
self.domain = "https://thichtruyen.vn" | |
self.is_use_selenium = g_config.USE_SELENIUM | |
self.chapter_type = "text" | |
self.headers = { | |
} | |
def get_list_manga_newest(self): | |
arr_category = [ | |
'danh-muc/truyen-ngon-tinh', 'danh-muc/ngon-tinh-viet-nam', 'danh-muc/xuyen-khong', 'danh-muc/dam-my', | |
'danh-muc/truyen-voz', 'danh-muc/tien-hiep', 'danh-muc/truyen-kiem-hiep', 'danh-muc/ky-nang-song', | |
'danh-muc/tieu-thuyet-phuong-tay', 'danh-muc/truyen-sac-hiep', 'danh-muc/truyen-ngan', 'danh-muc/do-thi', | |
'danh-muc/trinh-tham', 'danh-muc/chuyen-tham-kin', 'danh-muc/truyen-kinh-di', 'danh-muc/truyen-teen', | |
'danh-muc/don-doc'] | |
if g_config.USE_SELENIUM: | |
self.selenium_get_list_manga_newest() | |
return | |
for cate in arr_category: | |
url_main = f"{self.domain}" | |
stt_page = 1 | |
retry = 0 | |
max_retry = 5 | |
proxy = "" | |
driver = None | |
brows = None | |
while True: | |
url = f"{url_main}/{cate}?tab=truyen-moi&page={stt_page}" | |
item_website = None | |
try: | |
retry = 0 | |
max_retry = 5 | |
content = None | |
while retry < max_retry: | |
try: | |
content = requests.get(url, proxies=proxy, headers=self.headers).content | |
break | |
except KeyboardInterrupt: | |
raise KeyboardInterrupt | |
except: | |
logger.error(f"{self.domain} - Error get") | |
logger.error(traceback.format_exc()) | |
retry = retry + 1 | |
time.sleep(3) | |
if content is None: | |
logger.error("=" * 20) | |
logger.error(f"{self.domain} - get_list_manga_newest - content none - page {stt_page}") | |
continue | |
main_div = html.fromstring(content) | |
list_item = main_div.xpath("//div[@id='tab-content']/div/div/div") | |
if len(list_item) == 0: | |
break | |
for item in list_item: | |
href = None | |
href_ele = item.xpath("div/a") | |
if len(href_ele) > 0: | |
href = href_ele[0].get("href") | |
if href is None: | |
continue | |
if "http" not in href: | |
href = f"{self.domain}{href}" | |
item_website = self.get_episodes(href) | |
untils.remove_folder_manga(item_website) | |
if system_status["error"]: | |
return | |
self.selenium_upload_custom_task() | |
except KeyboardInterrupt: | |
if item_website is not None: | |
untils.remove_folder_manga(item_website) | |
raise KeyboardInterrupt | |
except: | |
logger.error("=" * 20) | |
logger.error(f"{self.domain} - Error selenium_get_list_manga_newest") | |
logger.error(traceback.format_exc()) | |
continue | |
stt_page = stt_page + 1 | |
time.sleep(5) | |
def selenium_get_list_manga_newest(self): | |
url_main = f"{self.domain}" | |
stt_page = 1 | |
retry = 0 | |
max_retry = 5 | |
proxy = "" | |
driver = None | |
brows = None | |
while retry < max_retry: | |
brows = ChromeUndetectedBrowser("", proxy) | |
try: | |
brows.init_driver() | |
driver = brows.get_driver() | |
break | |
except KeyboardInterrupt: | |
if brows is not None: | |
brows.stop() | |
untils.close_all_chrome_browsers() | |
raise KeyboardInterrupt | |
except: | |
logger.error(f"{self.domain} - Error selenium get") | |
logger.error(traceback.format_exc()) | |
retry = retry + 1 | |
time.sleep(3) | |
continue | |
if driver is None: | |
if brows is not None: | |
brows.stop() | |
untils.close_all_chrome_browsers() | |
return | |
while True: | |
url = f"{url_main}/search.php?act=search&andor=and&sort=chap&view=detail&page={stt_page}&ajax=true" | |
item_website = None | |
try: | |
retry = 0 | |
max_retry = 5 | |
content = None | |
while retry < max_retry: | |
try: | |
driver.get(url) | |
time.sleep(2) | |
content = driver.page_source | |
break | |
except KeyboardInterrupt: | |
if brows is not None: | |
brows.stop() | |
raise KeyboardInterrupt | |
except: | |
logger.error(f"TruyenTranh8 - Error selenium get - Url: {url}") | |
logger.error(traceback.format_exc()) | |
retry = retry + 1 | |
time.sleep(3) | |
if content is None: | |
logger.error("=" * 20) | |
logger.error(f"TruyenTranh8 - selenium_get_list_manga_newest - content none - page {stt_page}") | |
continue | |
main_div = html.fromstring(content) | |
list_item = main_div.xpath("//div[@id='blockdetail']/div") | |
if len(list_item) == 0: | |
break | |
for item in list_item: | |
href = None | |
href_ele = item.xpath("div/div/a") | |
if len(href_ele) > 0: | |
href = href_ele[0].get("href") | |
if href is None: | |
continue | |
item_website = self.selenium_get_episodes(href) | |
untils.remove_folder_manga(item_website) | |
self.selenium_upload_custom_task() | |
except KeyboardInterrupt: | |
if brows is not None: | |
brows.stop() | |
if item_website is not None: | |
untils.remove_folder_manga(item_website) | |
raise KeyboardInterrupt | |
except: | |
logger.error("=" * 20) | |
logger.error(f"{self.domain} - Error selenium_get_list_manga_newest") | |
logger.error(traceback.format_exc()) | |
continue | |
stt_page = stt_page + 1 | |
if brows is not None: | |
brows.stop() | |
untils.close_all_chrome_browsers() | |
def get_episodes(self, url=None): | |
if g_config.USE_SELENIUM: | |
return self.selenium_get_episodes(url) | |
item_manga = Manga() | |
item_website = None | |
brows = None | |
results = [] | |
cookies = [] | |
try: | |
retry = 0 | |
max_retry = 5 | |
content = None | |
proxy = "" | |
while retry < max_retry: | |
try: | |
content = requests.get(url, proxies=proxy, headers=self.headers).content | |
break | |
except KeyboardInterrupt: | |
raise KeyboardInterrupt | |
except: | |
logger.error(f"{self.domain} - Error get") | |
logger.error(traceback.format_exc()) | |
retry = retry + 1 | |
time.sleep(3) | |
if content is None: | |
logger.error("=" * 20) | |
logger.error(f"{self.domain} - get_episode_detail - content none") | |
return results | |
main_div = html.fromstring(content) | |
cookies = untils.format_to_sure_cookies(cookies) | |
list_item = main_div.xpath("//div[@id='tab-chapper']/div/ul/li") | |
item_manga = self.get_info_manga(item_manga, main_div) | |
item_website = untils.get_item_website(item_manga) | |
# Download thumbnail | |
item_manga.download_thumb(cookies, self.domain) | |
list_chapter_uploaded = item_website.get_list_chapter() | |
self.get_all_chapters(brows, list_item, item_manga, cookies, list_chapter_uploaded) | |
logger.info(f"Manga {item_manga.name}") | |
logger.info(f"Total episodes - {len(item_manga.episodes)}") | |
if system_status["error"]: | |
return item_website | |
for episode in item_manga.episodes: | |
if self.chapter_type == "image": | |
logger.info(f"Total images - {len(episode['images'])}") | |
# item_manga.download_images(episode, cookies, self.domain) | |
if g_config.TYPE_LINK_IMAGE == 'local': | |
untils.compress_a_dir(os.path.join(os.getcwd(), f"resources/{item_manga.slug}/{episode['name']}")) | |
untils.update_data_to_website(item_website, episode, self.chapter_type) | |
except KeyboardInterrupt: | |
raise KeyboardInterrupt | |
except: | |
logger.error("=" * 20) | |
logger.error(f"{self.domain} - Error selenium_get_episodes") | |
logger.error(traceback.format_exc()) | |
return item_website | |
def selenium_get_episodes(self, url=None): | |
item_manga = Manga() | |
item_website = None | |
brows = None | |
results = [] | |
cookies = "" | |
try: | |
retry = 0 | |
max_retry = 5 | |
content = None | |
proxy = "" | |
while retry < max_retry: | |
brows = ChromeUndetectedBrowser("", proxy) | |
try: | |
brows.init_driver() | |
driver = brows.get_driver() | |
driver.get(url) | |
content = driver.page_source | |
cookies = driver.get_cookies() | |
break | |
except KeyboardInterrupt: | |
if brows is not None: | |
brows.stop() | |
raise KeyboardInterrupt | |
except: | |
logger.error("TruyenTranh8 - Error selenium get") | |
logger.error(traceback.format_exc()) | |
retry = retry + 1 | |
time.sleep(3) | |
if content is None: | |
if brows is not None: | |
brows.stop() | |
logger.error("=" * 20) | |
logger.error("TruyenTranh8 - selenium_get_episode_detail - content none") | |
return results | |
main_div = html.fromstring(content) | |
cookies = untils.format_to_sure_cookies(cookies) | |
list_item = main_div.xpath("//ul[@class='mangadetail-chaplist']/li") | |
# print(f"Total chapter: {len(list_item)}") | |
item_manga = self.get_info_manga(item_manga, main_div) | |
item_website = untils.get_item_website(item_manga) | |
# Download thumbnail | |
item_manga.download_thumb(cookies) | |
list_chapter_uploaded = item_website.get_list_chapter() | |
self.get_all_chapters(brows, list_item, item_manga, cookies, list_chapter_uploaded) | |
logger.info(f"Manga {item_manga.name}") | |
logger.info(f"Total episodes - {len(item_manga.episodes)}") | |
if system_status["error"]: | |
return item_website | |
for episode in item_manga.episodes: | |
if self.chapter_type == "image": | |
logger.info(f"Total images - {len(episode['images'])}") | |
# item_manga.download_images(episode, cookies) | |
if g_config.TYPE_LINK_IMAGE == 'local': | |
untils.compress_a_dir(os.path.join(os.getcwd(), f"resources/{item_manga.slug}/{episode['name']}")) | |
untils.update_data_to_website(item_website, episode, self.chapter_type) | |
except KeyboardInterrupt: | |
if brows is not None: | |
brows.stop() | |
raise KeyboardInterrupt | |
except: | |
logger.error("=" * 20) | |
logger.error(f"{self.domain} - Error selenium_get_episodes") | |
logger.error(traceback.format_exc()) | |
if brows is not None: | |
brows.stop() | |
return item_website | |
def get_info_manga(self, item_manga, main_div): | |
avatar = "" | |
name_episode = "" | |
name_alternative = "" | |
tags = [] | |
author_name = "Đang Cập Nhật" | |
status = "" | |
like_number = 0 | |
view_number = 0 | |
follow_number = 0 | |
description = "" | |
slug = "" | |
name_episode_ele = main_div.xpath("//h1[@class='story-intro-title']/a") | |
if len(name_episode_ele) > 0: | |
name_episode = name_episode_ele[0].text.strip() | |
author_ele = main_div.xpath("//p[@class='story-intro-author ']/a") | |
if len(author_ele) > 0: | |
author_name = author_ele[0].text.strip() | |
info_tag_ele = main_div.xpath("//div[@class='story-introduction-content pull-left']/div[1]/a") | |
if len(info_tag_ele) > 0: | |
for item in info_tag_ele: | |
tag = item.text.strip() | |
if tag not in tags: | |
tags.append(tag) | |
status_ele = main_div.xpath("//p[@class='story-intro-chapper']/span") | |
if len(status_ele) > 0: | |
status = status_ele[0].text.strip() | |
if status.lower() == "full": | |
status = "Hoàn Thành" | |
else: | |
status = "Đang Cập Nhật" | |
description_ele = main_div.xpath("//div[@class='tab-text text-justify']/p") | |
if len(description_ele) > 0: | |
description = untils.remove_a_tag(description_ele[0], ".//img") | |
if len(description) == 0: | |
description = f"Truyện tranh {name_episode} được cập nhật nhanh và đầy đủ nhất tại TruyenFull. " \ | |
f"Bạn đọc đừng quên để lại bình luận và chia sẻ, ủng hộ TruyenFull ra các chương mới " \ | |
f"nhất của truyện {name_episode}." | |
avatar_ele = main_div.xpath("//div[@class='story-intro-top']/div[1]/a/img") | |
if len(avatar_ele) > 0: | |
avatar = avatar_ele[0].get("src") | |
if "https" not in avatar and "http" not in avatar: | |
avatar = f"https:{avatar}" | |
slug_ele = main_div.xpath("//meta[@name='og:url']") | |
if len(slug_ele) > 0: | |
slug = slug_ele[0].get("content") | |
arr = slug.split("/") | |
slug = arr[-1] | |
if len(slug) == 0: | |
slug = arr[-2] | |
slug = slug.replace(".jpg", "") | |
item_manga.slug = slug | |
item_manga.avatar = avatar | |
item_manga.name = name_episode | |
item_manga.name_alternative = name_alternative | |
item_manga.author_name = author_name | |
item_manga.status = status | |
item_manga.tags = tags | |
item_manga.description = description | |
return item_manga | |
def get_all_chapters(self, brows, list_item, item_manga, cookies, list_chapter_uploaded): | |
limit_chapter = g_config.MAX_NUM_CHAPTER | |
stt = 0 | |
time_try = 10 | |
for item in list_item: | |
href = None | |
name = None | |
stt = stt + 1 | |
if stt > limit_chapter: | |
break | |
href_ele = item.xpath("a") | |
if len(href_ele) > 0: | |
href = href_ele[0].get("href") | |
name = href_ele[0].text | |
name = name.split("-")[-1].strip() | |
if name is None or href is None: | |
continue | |
if name in list_chapter_uploaded: | |
continue | |
href = f"{self.domain}{href}" | |
# print(f"Get details - {name} - href - {href}") | |
stt_try = 0 | |
while True: | |
check = self.get_episode_detail(brows, item_manga, href, name, cookies) | |
if check is False and stt_try > time_try: | |
system_status["error"] = True | |
system_status["message"] = "Không thể bật trình duyệt - đang tiến hành thử lại" | |
return | |
if check or stt_try > time_try: | |
time.sleep(2) | |
break | |
stt_try += 1 | |
time.sleep(2) | |
def get_episode_detail(self, brows, item_manga, url=None, name=None, cookies=None): | |
if g_config.USE_SELENIUM: | |
return self.selenium_get_episode_detail(brows, item_manga, url, name, cookies) | |
try: | |
proxy = "" | |
retry = 0 | |
max_retry = 5 | |
content = None | |
while retry < max_retry: | |
try: | |
content = requests.get(url, proxies=proxy, headers=self.headers).content | |
break | |
except KeyboardInterrupt: | |
raise KeyboardInterrupt | |
except: | |
logger.error(f"{self.domain} - Error get") | |
logger.error(traceback.format_exc()) | |
retry = retry + 1 | |
time.sleep(3) | |
main_div = html.fromstring(content) | |
content = main_div.xpath("//div[@class='story-detail-content']") | |
if len(content) == 0: | |
return True | |
content = untils.remove_a_tag(content[0], ".//img") | |
content = untils.clear_text_chapter(content) | |
images = [] | |
# print(f"images: {len(images)}") | |
item_manga.episodes.append({"name": name, "images": images, "content": content}) | |
time.sleep(2) | |
except KeyboardInterrupt: | |
raise KeyboardInterrupt | |
except: | |
logger.error("=" * 20) | |
logger.error(f"{self.domain} - Error get_episode_detail") | |
logger.error(traceback.format_exc()) | |
return True | |
def selenium_get_episode_detail(self, brows, item_manga, url=None, name=None, cookies=None): | |
try: | |
retry = 0 | |
max_retry = 5 | |
content = None | |
while retry < max_retry: | |
try: | |
driver = brows.get_driver() | |
driver.get(url) | |
content = driver.page_source | |
break | |
except KeyboardInterrupt: | |
raise KeyboardInterrupt | |
except: | |
logger.error(f"{self.domain} - Error selenium get") | |
logger.error(traceback.format_exc()) | |
retry = retry + 1 | |
time.sleep(3) | |
main_div = html.fromstring(content) | |
list_item = main_div.xpath("//div[@id='reading-detail']/div") | |
images = [] | |
if len(list_item) == 0: | |
# print(content) | |
logger.error(f"List item empty") | |
logger.error(content) | |
return False | |
for item in list_item: | |
"""Skip 1 img first""" | |
img_ele = item.xpath("img") | |
if len(img_ele) == 0: | |
continue | |
src = img_ele[0].get("src", None) | |
if src is None: | |
continue | |
images.append(src) | |
"""Skip last img""" | |
images = images[:-1] | |
# print(f"images: {len(images)}") | |
item_manga.episodes.append({"name": name, "images": images}) | |
time.sleep(2) | |
except KeyboardInterrupt: | |
if brows is not None: | |
brows.stop() | |
raise KeyboardInterrupt | |
except: | |
logger.error("=" * 20) | |
logger.error(f"{self.domain} - Error get_episode_detail") | |
logger.error(traceback.format_exc()) | |
return True | |
def selenium_upload_custom_task(self): | |
truyenfull = TruyenFull(None, logger) | |
list_task_customs = truyenfull.get_list_task_customs(self.temp_domains) | |
if len(list_task_customs) == 0: | |
return | |
for item in list_task_customs: | |
logger.info(f"Start task upload - {item['link_manga']}") | |
item_website = self.get_episodes(item['link_manga']) | |
untils.remove_folder_manga(item_website) | |
logger.info(f"Done task upload - {item['link_manga']}") | |
truyenfull.update_task_custom({"id": item['id'], "status": 2}) |