Spaces:
Paused
Paused
# -*- coding: utf-8 -*- | |
import os | |
import shutil | |
import re | |
import requests | |
import json | |
from lxml import html | |
import traceback | |
import datetime | |
import time | |
from urllib.parse import urlparse | |
import html as un_html | |
from selenium.common.exceptions import WebDriverException, NoSuchElementException, JavascriptException | |
from lxml.html.clean import Cleaner | |
from modules import g_config, logger | |
from modules import untils | |
from modules.browsers import FireFoxBrowser, ChromeUndetectedBrowser | |
from modules.websites import TruyenFull, system_status | |
from modules.manga_boto3 import MangaBoto3 | |
cleaner = Cleaner() | |
cleaner.javascript = True | |
cleaner.style = False | |
cleaner.embedded = False | |
class Manga(): | |
def __init__(self): | |
self.slug = None | |
self.avatar = None | |
self.episodes = [] | |
self.name = "" | |
self.name_alternative = "" | |
self.author_name = None | |
self.status = None | |
self.tags = [] | |
self.like_number = 0 | |
self.view_number = 0 | |
self.follow_number = 0 | |
self.description = None | |
def download_images(self, episode, cookies, domain="https://truyenqqpro.com/"): | |
manga_boto3 = None | |
if g_config.TYPE_LINK_IMAGE != "local": | |
manga_boto3 = MangaBoto3() | |
slug_episode = self.slug | |
path_dir = os.path.join(os.getcwd(), f"resources/{slug_episode}/{episode['name']}") | |
os.makedirs(path_dir, exist_ok=True) | |
for image in episode['images']: | |
parse = urlparse(image) | |
path = f"{path_dir}/{parse.path.split('/')[-1]}" | |
untils.download_file_img(image, path, cookies, domain) | |
if g_config.TYPE_LINK_IMAGE != "local": | |
chapter_number = episode['name'].split(" ")[-1] | |
file_name = parse.path.split('/')[-1] | |
manga_boto3.put_resource(path, f"{slug_episode}/{chapter_number}/{file_name}") | |
def download_thumb(self, cookies, domain=""): | |
path_dir = os.path.join(os.getcwd(), f"resources/{self.slug}") | |
os.makedirs(path_dir, exist_ok=True) | |
path = f"{path_dir}/thumb.jpg" | |
untils.download_file_img(self.avatar, path, cookies, domain) | |
class Sources: | |
def __init__(self): | |
pass | |
class Truyenqqpro(Sources): | |
def __init__(self): | |
super().__init__() | |
self.temp_domains = ["truyenqqvip.com", "truyenqqhot.com"] | |
self.domain = "https://truyenqqmoi.com" | |
self.is_use_selenium = g_config.USE_SELENIUM | |
self.chapter_type = "image" | |
def selenium_get_list_manga_in_category(self, url=None): | |
url_main = url | |
stt_page = 1 | |
retry = 0 | |
max_retry = 5 | |
proxy = "" | |
driver = None | |
brows = None | |
while retry < max_retry: | |
brows = ChromeUndetectedBrowser("", proxy) | |
try: | |
brows.init_driver() | |
driver = brows.get_driver() | |
break | |
except KeyboardInterrupt: | |
if brows is not None: | |
brows.stop() | |
raise KeyboardInterrupt | |
except: | |
logger.error("Truyenqqpro - Error selenium get") | |
logger.error(traceback.format_exc()) | |
retry = retry + 1 | |
time.sleep(3) | |
continue | |
if driver is None: | |
return | |
while True: | |
url = f"{url_main}/trang-{stt_page}.html" | |
item_website = None | |
try: | |
retry = 0 | |
max_retry = 5 | |
content = None | |
while retry < max_retry: | |
try: | |
driver.get(url) | |
time.sleep(2) | |
content = driver.page_source | |
break | |
except KeyboardInterrupt: | |
if brows is not None: | |
brows.stop() | |
raise KeyboardInterrupt | |
except: | |
logger.error(f"Truyenqqpro - Error selenium get - Url: {url}") | |
logger.error(traceback.format_exc()) | |
retry = retry + 1 | |
time.sleep(3) | |
if content is None: | |
logger.error("=" * 20) | |
logger.error(f"Truyenqqpro - selenium_get_list_manga_in_category - content none - page {stt_page}") | |
continue | |
main_div = html.fromstring(content) | |
list_item = main_div.xpath("//div[@class='list_grid_out']/ul/li") | |
if len(list_item) == 0: | |
break | |
# print(f"total manga: {len(list_item)}") | |
for item in list_item: | |
href = None | |
href_ele = item.xpath("div[1]/a") | |
if len(href_ele) > 0: | |
href = href_ele[0].get("href") | |
if href is None: | |
continue | |
item_website = self.selenium_get_episodes(href) | |
untils.remove_folder_manga(item_website) | |
except KeyboardInterrupt: | |
if brows is not None: | |
brows.stop() | |
if item_website is not None: | |
untils.remove_folder_manga(item_website) | |
raise KeyboardInterrupt | |
except: | |
logger.error("=" * 20) | |
logger.error("Truyenqqpro - Error selenium_get_list_manga_in_category") | |
logger.error(traceback.format_exc()) | |
continue | |
stt_page = stt_page + 1 | |
if brows is not None: | |
brows.stop() | |
def selenium_get_episodes(self, url=None, brows=None): | |
item_manga = Manga() | |
item_website = None | |
results = [] | |
cookies = "" | |
try: | |
retry = 0 | |
max_retry = 5 | |
content = None | |
proxy = "" | |
while retry < max_retry: | |
try: | |
driver = brows.get_driver() | |
driver.get(url) | |
content = driver.page_source | |
cookies = driver.get_cookies() | |
break | |
except KeyboardInterrupt: | |
raise KeyboardInterrupt | |
except: | |
logger.error("Truyenqqpro - Error selenium get") | |
logger.error(traceback.format_exc()) | |
retry = retry + 1 | |
time.sleep(3) | |
if content is None: | |
logger.error("=" * 20) | |
logger.error("Truyenqqpro - selenium_get_episode_detail - content none") | |
return results | |
main_div = html.fromstring(content) | |
cookies = untils.format_to_sure_cookies(cookies) | |
list_item = main_div.xpath("//div[@class='list_chapter']/div/div") | |
# print(f"Total chapter: {len(list_item)}") | |
item_manga = self.get_info_manga(item_manga, main_div) | |
item_website = untils.get_item_website(item_manga) | |
# Download thumbnail | |
item_manga.download_thumb(cookies) | |
list_chapter_uploaded = item_website.get_list_chapter() | |
self.get_all_chapters(brows, list_item, item_manga, cookies, list_chapter_uploaded) | |
logger.info(f"Manga {item_manga.name}") | |
logger.info(f"Total episodes - {len(item_manga.episodes)}") | |
if system_status["error"]: | |
return item_website | |
for episode in item_manga.episodes: | |
if self.chapter_type == "image": | |
logger.info(f"Total images - {len(episode['images'])}") | |
# item_manga.download_images(episode, cookies) | |
if g_config.TYPE_LINK_IMAGE == 'local': | |
untils.compress_a_dir(os.path.join(os.getcwd(), f"resources/{item_manga.slug}/{episode['name']}")) | |
untils.update_data_to_website(item_website, episode, self.chapter_type) | |
except KeyboardInterrupt: | |
raise KeyboardInterrupt | |
except: | |
logger.error("=" * 20) | |
logger.error("Truyenqqpro - Error selenium_get_episodes") | |
logger.error(traceback.format_exc()) | |
return item_website | |
def get_episode_detail(self, brows, item_manga, url=None, name=None, cookies=None): | |
try: | |
retry = 0 | |
max_retry = 5 | |
content = None | |
while retry < max_retry: | |
try: | |
driver = brows.get_driver() | |
driver.get(url) | |
content = driver.page_source | |
break | |
except KeyboardInterrupt: | |
raise KeyboardInterrupt | |
except: | |
logger.error("Truyenqqpro - Error selenium get") | |
logger.error(traceback.format_exc()) | |
retry = retry + 1 | |
time.sleep(3) | |
main_div = html.fromstring(content) | |
list_item = main_div.xpath("//div[@class='chapter_content']/div[2]/div") | |
images = [] | |
if len(list_item) == 0: | |
# print(content) | |
logger.error(f"List item empty") | |
logger.error(content) | |
return False | |
for item in list_item: | |
"""Skip 2 img first""" | |
img_ele = item.xpath("img") | |
if len(img_ele) == 0: | |
continue | |
src = img_ele[0].get("data-original", None) | |
if src is None: | |
continue | |
images.append(src) | |
"""Skip last img""" | |
images = images[:-1] | |
# print(f"images: {len(images)}") | |
item_manga.episodes.append({"name": name, "images": images}) | |
time.sleep(2) | |
except KeyboardInterrupt: | |
if brows is not None: | |
brows.stop() | |
raise KeyboardInterrupt | |
except: | |
logger.error("=" * 20) | |
logger.error("Truyenqqpro - Error get_episode_detail") | |
logger.error(traceback.format_exc()) | |
return True | |
def get_info_manga(self, item_manga, main_div): | |
avatar = "" | |
name_episode = "" | |
name_alternative = "" | |
tags = [] | |
author_name = "Đang Cập Nhật" | |
status = "" | |
like_number = 0 | |
view_number = 0 | |
follow_number = 0 | |
description = "" | |
slug = "" | |
name_episode_ele = main_div.xpath("//h1[@itemprop='name']") | |
if len(name_episode_ele) > 0: | |
name_episode = name_episode_ele[0].text.strip() | |
info_ele = main_div.xpath("//div[@class='book_info']/div[2]/div[1]/ul") | |
if len(info_ele) > 0: | |
name_alternative_ele = info_ele[0].xpath("li[@class='othername row']/h2") | |
if len(name_alternative_ele) > 0: | |
name_alternative = name_alternative_ele[0].text.strip() | |
author_ele = info_ele[0].xpath("li[@class='author row']/p[2]/a") | |
if len(author_ele) > 0: | |
author_name = author_ele[0].text.strip() | |
status_ele = info_ele[0].xpath("li[@class='status row']/p[2]") | |
if len(status_ele) > 0: | |
status = status_ele[0].text.strip() | |
like_ele = info_ele[0].xpath("li[4]/p[2]") | |
if len(like_ele) > 0: | |
like_number = like_ele[0].text.strip() | |
follow_ele = info_ele[0].xpath("li[5]/p[2]") | |
if len(follow_ele) > 0: | |
follow_number = follow_ele[0].text.strip() | |
view_ele = info_ele[0].xpath("li[6]/p[2]") | |
if len(view_ele) > 0: | |
view_number = view_ele[0].text.strip() | |
info_tag_ele = main_div.xpath("//div[@class='book_info']/div[2]/ul[1]/li") | |
if len(info_tag_ele) > 0: | |
for item in info_tag_ele: | |
tag = item.xpath("a")[0].text.strip() | |
tags.append(tag) | |
description_ele = main_div.xpath("//div[@class='book_detail']/div[2]/p") | |
if len(description_ele) > 0: | |
for des in description_ele: | |
description = f"{description}{des.text}\n" | |
description = description.strip() | |
if len(description) == 0: | |
description = f"Truyện tranh {name_episode} được cập nhật nhanh và đầy đủ nhất tại TruyenFull. " \ | |
f"Bạn đọc đừng quên để lại bình luận và chia sẻ, ủng hộ TruyenFull ra các chương mới " \ | |
f"nhất của truyện {name_episode}." | |
avatar_ele = main_div.xpath("//div[@class='book_avatar']/img") | |
if len(avatar_ele) > 0: | |
avatar = avatar_ele[0].get("src") | |
slug_ele = main_div.xpath("//input[@id='slug']") | |
if len(slug_ele) > 0: | |
slug = slug_ele[0].get("value") | |
item_manga.slug = slug | |
item_manga.avatar = avatar | |
item_manga.name = name_episode | |
item_manga.name_alternative = name_alternative | |
item_manga.author_name = author_name | |
item_manga.status = status | |
item_manga.like_number = int(str(like_number).replace(",", "")) | |
item_manga.follow_number = int(str(follow_number).replace(",", "")) | |
item_manga.view_number = int(str(view_number).replace(",", "")) | |
item_manga.tags = tags | |
item_manga.description = description | |
return item_manga | |
def selenium_get_list_manga_newest(self): | |
url_main = f"{self.domain}/truyen-moi-cap-nhat.html" | |
stt_page = 1 | |
retry = 0 | |
max_retry = 5 | |
proxy = "" | |
driver = None | |
brows = None | |
while retry < max_retry: | |
brows = ChromeUndetectedBrowser("", proxy) | |
try: | |
brows.init_driver() | |
driver = brows.get_driver() | |
break | |
except KeyboardInterrupt: | |
if brows is not None: | |
brows.stop() | |
raise KeyboardInterrupt | |
except: | |
logger.error("Truyenqqpro - Error selenium get") | |
logger.error(traceback.format_exc()) | |
retry = retry + 1 | |
time.sleep(3) | |
continue | |
if driver is None: | |
return | |
while True: | |
url = f"{url_main}/trang-{stt_page}.html" | |
item_website = None | |
try: | |
retry = 0 | |
max_retry = 5 | |
content = None | |
while retry < max_retry: | |
try: | |
driver.get(url) | |
time.sleep(2) | |
content = driver.page_source | |
break | |
except KeyboardInterrupt: | |
raise KeyboardInterrupt | |
except: | |
logger.error(f"Truyenqqpro - Error selenium get - Url: {url}") | |
logger.error(traceback.format_exc()) | |
retry = retry + 1 | |
time.sleep(3) | |
if content is None: | |
logger.error("=" * 20) | |
logger.error(f"Truyenqqpro - selenium_get_list_manga_newest - content none - page {stt_page}") | |
continue | |
main_div = html.fromstring(content) | |
list_item = main_div.xpath("//div[@class='list_grid_out']/ul/li") | |
if len(list_item) == 0: | |
break | |
# print(f"total manga: {len(list_item)}") | |
for item in list_item: | |
href = None | |
href_ele = item.xpath("div[1]/a") | |
if len(href_ele) > 0: | |
href = href_ele[0].get("href") | |
if href is None: | |
continue | |
item_website = self.selenium_get_episodes(href, brows) | |
untils.remove_folder_manga(item_website) | |
self.selenium_upload_custom_task() | |
except KeyboardInterrupt: | |
if brows is not None: | |
brows.stop() | |
if item_website is not None: | |
untils.remove_folder_manga(item_website) | |
untils.close_all_chrome_browsers() | |
raise KeyboardInterrupt | |
except: | |
logger.error("=" * 20) | |
logger.error("Truyenqqpro - Error selenium_get_list_manga_newest") | |
logger.error(traceback.format_exc()) | |
continue | |
stt_page = stt_page + 1 | |
if brows is not None: | |
brows.stop() | |
untils.close_all_chrome_browsers() | |
def get_all_chapters(self, brows, list_item, item_manga, cookies, list_chapter_uploaded): | |
limit_chapter = g_config.MAX_NUM_CHAPTER | |
stt = 0 | |
time_try = 10 | |
for item in list_item: | |
href = None | |
name = None | |
stt = stt + 1 | |
if stt > limit_chapter: | |
break | |
href_ele = item.xpath("div[1]/a") | |
if len(href_ele) > 0: | |
name = href_ele[0].text.strip() | |
href = href_ele[0].get("href") | |
if name is None or href is None: | |
continue | |
if name in list_chapter_uploaded: | |
continue | |
# print(f"Get details - {name} - href - {href}") | |
stt_try = 0 | |
while True: | |
check = self.get_episode_detail(brows, item_manga, href, name, cookies) | |
if check is False and stt_try > time_try: | |
system_status["error"] = True | |
system_status["message"] = "Không thể bật trình duyệt - đang tiến hành thử lại" | |
return | |
if check or stt_try > time_try: | |
time.sleep(2) | |
break | |
stt_try += 1 | |
time.sleep(2) | |
def selenium_upload_custom_task(self): | |
truyenfull = TruyenFull(None, logger) | |
list_task_customs = truyenfull.get_list_task_customs(self.temp_domains) | |
if len(list_task_customs) == 0: | |
return | |
for item in list_task_customs: | |
logger.info(f"Start task upload - {item['link_manga']}") | |
item_website = self.selenium_get_episodes(item['link_manga']) | |
untils.remove_folder_manga(item_website) | |
logger.info(f"Done task upload - {item['link_manga']}") | |
truyenfull.update_task_custom({"id": item['id'], "status": 2}) | |