tqhoa's picture
test
063b4bb
# -*- coding: utf-8 -*-
import os
import shutil
import re
import requests
import json
from lxml import html
import traceback
import datetime
import time
from urllib.parse import urlparse
import html as un_html
from selenium.common.exceptions import WebDriverException, NoSuchElementException, JavascriptException
from lxml.html.clean import Cleaner
from modules import g_config, logger
from modules import untils
from modules.browsers import FireFoxBrowser, ChromeUndetectedBrowser
from modules.websites import TruyenFull, system_status
from modules.manga_boto3 import MangaBoto3
cleaner = Cleaner()
cleaner.javascript = True
cleaner.style = False
cleaner.embedded = False
class Manga():
def __init__(self):
self.slug = None
self.avatar = None
self.episodes = []
self.name = ""
self.name_alternative = ""
self.author_name = None
self.status = None
self.tags = []
self.like_number = 0
self.view_number = 0
self.follow_number = 0
self.description = None
def download_images(self, episode, cookies, domain="https://truyenqqpro.com/"):
manga_boto3 = None
if g_config.TYPE_LINK_IMAGE != "local":
manga_boto3 = MangaBoto3()
slug_episode = self.slug
path_dir = os.path.join(os.getcwd(), f"resources/{slug_episode}/{episode['name']}")
os.makedirs(path_dir, exist_ok=True)
for image in episode['images']:
parse = urlparse(image)
path = f"{path_dir}/{parse.path.split('/')[-1]}"
untils.download_file_img(image, path, cookies, domain)
if g_config.TYPE_LINK_IMAGE != "local":
chapter_number = episode['name'].split(" ")[-1]
file_name = parse.path.split('/')[-1]
manga_boto3.put_resource(path, f"{slug_episode}/{chapter_number}/{file_name}")
def download_thumb(self, cookies, domain=""):
path_dir = os.path.join(os.getcwd(), f"resources/{self.slug}")
os.makedirs(path_dir, exist_ok=True)
path = f"{path_dir}/thumb.jpg"
untils.download_file_img(self.avatar, path, cookies, domain)
class Sources:
def __init__(self):
pass
class Truyenqqpro(Sources):
def __init__(self):
super().__init__()
self.temp_domains = ["truyenqqvip.com", "truyenqqhot.com"]
self.domain = "https://truyenqqmoi.com"
self.is_use_selenium = g_config.USE_SELENIUM
self.chapter_type = "image"
def selenium_get_list_manga_in_category(self, url=None):
url_main = url
stt_page = 1
retry = 0
max_retry = 5
proxy = ""
driver = None
brows = None
while retry < max_retry:
brows = ChromeUndetectedBrowser("", proxy)
try:
brows.init_driver()
driver = brows.get_driver()
break
except KeyboardInterrupt:
if brows is not None:
brows.stop()
raise KeyboardInterrupt
except:
logger.error("Truyenqqpro - Error selenium get")
logger.error(traceback.format_exc())
retry = retry + 1
time.sleep(3)
continue
if driver is None:
return
while True:
url = f"{url_main}/trang-{stt_page}.html"
item_website = None
try:
retry = 0
max_retry = 5
content = None
while retry < max_retry:
try:
driver.get(url)
time.sleep(2)
content = driver.page_source
break
except KeyboardInterrupt:
if brows is not None:
brows.stop()
raise KeyboardInterrupt
except:
logger.error(f"Truyenqqpro - Error selenium get - Url: {url}")
logger.error(traceback.format_exc())
retry = retry + 1
time.sleep(3)
if content is None:
logger.error("=" * 20)
logger.error(f"Truyenqqpro - selenium_get_list_manga_in_category - content none - page {stt_page}")
continue
main_div = html.fromstring(content)
list_item = main_div.xpath("//div[@class='list_grid_out']/ul/li")
if len(list_item) == 0:
break
# print(f"total manga: {len(list_item)}")
for item in list_item:
href = None
href_ele = item.xpath("div[1]/a")
if len(href_ele) > 0:
href = href_ele[0].get("href")
if href is None:
continue
item_website = self.selenium_get_episodes(href)
untils.remove_folder_manga(item_website)
except KeyboardInterrupt:
if brows is not None:
brows.stop()
if item_website is not None:
untils.remove_folder_manga(item_website)
raise KeyboardInterrupt
except:
logger.error("=" * 20)
logger.error("Truyenqqpro - Error selenium_get_list_manga_in_category")
logger.error(traceback.format_exc())
continue
stt_page = stt_page + 1
if brows is not None:
brows.stop()
def selenium_get_episodes(self, url=None, brows=None):
item_manga = Manga()
item_website = None
results = []
cookies = ""
try:
retry = 0
max_retry = 5
content = None
proxy = ""
while retry < max_retry:
try:
driver = brows.get_driver()
driver.get(url)
content = driver.page_source
cookies = driver.get_cookies()
break
except KeyboardInterrupt:
raise KeyboardInterrupt
except:
logger.error("Truyenqqpro - Error selenium get")
logger.error(traceback.format_exc())
retry = retry + 1
time.sleep(3)
if content is None:
logger.error("=" * 20)
logger.error("Truyenqqpro - selenium_get_episode_detail - content none")
return results
main_div = html.fromstring(content)
cookies = untils.format_to_sure_cookies(cookies)
list_item = main_div.xpath("//div[@class='list_chapter']/div/div")
# print(f"Total chapter: {len(list_item)}")
item_manga = self.get_info_manga(item_manga, main_div)
item_website = untils.get_item_website(item_manga)
# Download thumbnail
item_manga.download_thumb(cookies)
list_chapter_uploaded = item_website.get_list_chapter()
self.get_all_chapters(brows, list_item, item_manga, cookies, list_chapter_uploaded)
logger.info(f"Manga {item_manga.name}")
logger.info(f"Total episodes - {len(item_manga.episodes)}")
if system_status["error"]:
return item_website
for episode in item_manga.episodes:
if self.chapter_type == "image":
logger.info(f"Total images - {len(episode['images'])}")
# item_manga.download_images(episode, cookies)
if g_config.TYPE_LINK_IMAGE == 'local':
untils.compress_a_dir(os.path.join(os.getcwd(), f"resources/{item_manga.slug}/{episode['name']}"))
untils.update_data_to_website(item_website, episode, self.chapter_type)
except KeyboardInterrupt:
raise KeyboardInterrupt
except:
logger.error("=" * 20)
logger.error("Truyenqqpro - Error selenium_get_episodes")
logger.error(traceback.format_exc())
return item_website
def get_episode_detail(self, brows, item_manga, url=None, name=None, cookies=None):
try:
retry = 0
max_retry = 5
content = None
while retry < max_retry:
try:
driver = brows.get_driver()
driver.get(url)
content = driver.page_source
break
except KeyboardInterrupt:
raise KeyboardInterrupt
except:
logger.error("Truyenqqpro - Error selenium get")
logger.error(traceback.format_exc())
retry = retry + 1
time.sleep(3)
main_div = html.fromstring(content)
list_item = main_div.xpath("//div[@class='chapter_content']/div[2]/div")
images = []
if len(list_item) == 0:
# print(content)
logger.error(f"List item empty")
logger.error(content)
return False
for item in list_item:
"""Skip 2 img first"""
img_ele = item.xpath("img")
if len(img_ele) == 0:
continue
src = img_ele[0].get("data-original", None)
if src is None:
continue
images.append(src)
"""Skip last img"""
images = images[:-1]
# print(f"images: {len(images)}")
item_manga.episodes.append({"name": name, "images": images})
time.sleep(2)
except KeyboardInterrupt:
if brows is not None:
brows.stop()
raise KeyboardInterrupt
except:
logger.error("=" * 20)
logger.error("Truyenqqpro - Error get_episode_detail")
logger.error(traceback.format_exc())
return True
def get_info_manga(self, item_manga, main_div):
avatar = ""
name_episode = ""
name_alternative = ""
tags = []
author_name = "Đang Cập Nhật"
status = ""
like_number = 0
view_number = 0
follow_number = 0
description = ""
slug = ""
name_episode_ele = main_div.xpath("//h1[@itemprop='name']")
if len(name_episode_ele) > 0:
name_episode = name_episode_ele[0].text.strip()
info_ele = main_div.xpath("//div[@class='book_info']/div[2]/div[1]/ul")
if len(info_ele) > 0:
name_alternative_ele = info_ele[0].xpath("li[@class='othername row']/h2")
if len(name_alternative_ele) > 0:
name_alternative = name_alternative_ele[0].text.strip()
author_ele = info_ele[0].xpath("li[@class='author row']/p[2]/a")
if len(author_ele) > 0:
author_name = author_ele[0].text.strip()
status_ele = info_ele[0].xpath("li[@class='status row']/p[2]")
if len(status_ele) > 0:
status = status_ele[0].text.strip()
like_ele = info_ele[0].xpath("li[4]/p[2]")
if len(like_ele) > 0:
like_number = like_ele[0].text.strip()
follow_ele = info_ele[0].xpath("li[5]/p[2]")
if len(follow_ele) > 0:
follow_number = follow_ele[0].text.strip()
view_ele = info_ele[0].xpath("li[6]/p[2]")
if len(view_ele) > 0:
view_number = view_ele[0].text.strip()
info_tag_ele = main_div.xpath("//div[@class='book_info']/div[2]/ul[1]/li")
if len(info_tag_ele) > 0:
for item in info_tag_ele:
tag = item.xpath("a")[0].text.strip()
tags.append(tag)
description_ele = main_div.xpath("//div[@class='book_detail']/div[2]/p")
if len(description_ele) > 0:
for des in description_ele:
description = f"{description}{des.text}\n"
description = description.strip()
if len(description) == 0:
description = f"Truyện tranh {name_episode} được cập nhật nhanh và đầy đủ nhất tại TruyenFull. " \
f"Bạn đọc đừng quên để lại bình luận và chia sẻ, ủng hộ TruyenFull ra các chương mới " \
f"nhất của truyện {name_episode}."
avatar_ele = main_div.xpath("//div[@class='book_avatar']/img")
if len(avatar_ele) > 0:
avatar = avatar_ele[0].get("src")
slug_ele = main_div.xpath("//input[@id='slug']")
if len(slug_ele) > 0:
slug = slug_ele[0].get("value")
item_manga.slug = slug
item_manga.avatar = avatar
item_manga.name = name_episode
item_manga.name_alternative = name_alternative
item_manga.author_name = author_name
item_manga.status = status
item_manga.like_number = int(str(like_number).replace(",", ""))
item_manga.follow_number = int(str(follow_number).replace(",", ""))
item_manga.view_number = int(str(view_number).replace(",", ""))
item_manga.tags = tags
item_manga.description = description
return item_manga
def selenium_get_list_manga_newest(self):
url_main = f"{self.domain}/truyen-moi-cap-nhat.html"
stt_page = 1
retry = 0
max_retry = 5
proxy = ""
driver = None
brows = None
while retry < max_retry:
brows = ChromeUndetectedBrowser("", proxy)
try:
brows.init_driver()
driver = brows.get_driver()
break
except KeyboardInterrupt:
if brows is not None:
brows.stop()
raise KeyboardInterrupt
except:
logger.error("Truyenqqpro - Error selenium get")
logger.error(traceback.format_exc())
retry = retry + 1
time.sleep(3)
continue
if driver is None:
return
while True:
url = f"{url_main}/trang-{stt_page}.html"
item_website = None
try:
retry = 0
max_retry = 5
content = None
while retry < max_retry:
try:
driver.get(url)
time.sleep(2)
content = driver.page_source
break
except KeyboardInterrupt:
raise KeyboardInterrupt
except:
logger.error(f"Truyenqqpro - Error selenium get - Url: {url}")
logger.error(traceback.format_exc())
retry = retry + 1
time.sleep(3)
if content is None:
logger.error("=" * 20)
logger.error(f"Truyenqqpro - selenium_get_list_manga_newest - content none - page {stt_page}")
continue
main_div = html.fromstring(content)
list_item = main_div.xpath("//div[@class='list_grid_out']/ul/li")
if len(list_item) == 0:
break
# print(f"total manga: {len(list_item)}")
for item in list_item:
href = None
href_ele = item.xpath("div[1]/a")
if len(href_ele) > 0:
href = href_ele[0].get("href")
if href is None:
continue
item_website = self.selenium_get_episodes(href, brows)
untils.remove_folder_manga(item_website)
self.selenium_upload_custom_task()
except KeyboardInterrupt:
if brows is not None:
brows.stop()
if item_website is not None:
untils.remove_folder_manga(item_website)
untils.close_all_chrome_browsers()
raise KeyboardInterrupt
except:
logger.error("=" * 20)
logger.error("Truyenqqpro - Error selenium_get_list_manga_newest")
logger.error(traceback.format_exc())
continue
stt_page = stt_page + 1
if brows is not None:
brows.stop()
untils.close_all_chrome_browsers()
def get_all_chapters(self, brows, list_item, item_manga, cookies, list_chapter_uploaded):
limit_chapter = g_config.MAX_NUM_CHAPTER
stt = 0
time_try = 10
for item in list_item:
href = None
name = None
stt = stt + 1
if stt > limit_chapter:
break
href_ele = item.xpath("div[1]/a")
if len(href_ele) > 0:
name = href_ele[0].text.strip()
href = href_ele[0].get("href")
if name is None or href is None:
continue
if name in list_chapter_uploaded:
continue
# print(f"Get details - {name} - href - {href}")
stt_try = 0
while True:
check = self.get_episode_detail(brows, item_manga, href, name, cookies)
if check is False and stt_try > time_try:
system_status["error"] = True
system_status["message"] = "Không thể bật trình duyệt - đang tiến hành thử lại"
return
if check or stt_try > time_try:
time.sleep(2)
break
stt_try += 1
time.sleep(2)
def selenium_upload_custom_task(self):
truyenfull = TruyenFull(None, logger)
list_task_customs = truyenfull.get_list_task_customs(self.temp_domains)
if len(list_task_customs) == 0:
return
for item in list_task_customs:
logger.info(f"Start task upload - {item['link_manga']}")
item_website = self.selenium_get_episodes(item['link_manga'])
untils.remove_folder_manga(item_website)
logger.info(f"Done task upload - {item['link_manga']}")
truyenfull.update_task_custom({"id": item['id'], "status": 2})