test-selenium / modules /sources /thichtruyen.py
tqhoa's picture
test
063b4bb
import traceback
import time
from lxml import html
import os
import re
import requests
import json
from modules import untils
from modules.sources import Sources
from modules.browsers import FireFoxBrowser, ChromeUndetectedBrowser
from modules.sources import Manga
from modules.websites import TruyenFull, system_status
from modules import g_config, logger
class ThichTruyen(Sources):
def __init__(self):
super().__init__()
self.temp_domains = ["thichtruyen.vn"]
self.domain = "https://thichtruyen.vn"
self.is_use_selenium = g_config.USE_SELENIUM
self.chapter_type = "text"
self.headers = {
}
def get_list_manga_newest(self):
arr_category = [
'danh-muc/truyen-ngon-tinh', 'danh-muc/ngon-tinh-viet-nam', 'danh-muc/xuyen-khong', 'danh-muc/dam-my',
'danh-muc/truyen-voz', 'danh-muc/tien-hiep', 'danh-muc/truyen-kiem-hiep', 'danh-muc/ky-nang-song',
'danh-muc/tieu-thuyet-phuong-tay', 'danh-muc/truyen-sac-hiep', 'danh-muc/truyen-ngan', 'danh-muc/do-thi',
'danh-muc/trinh-tham', 'danh-muc/chuyen-tham-kin', 'danh-muc/truyen-kinh-di', 'danh-muc/truyen-teen',
'danh-muc/don-doc']
if g_config.USE_SELENIUM:
self.selenium_get_list_manga_newest()
return
for cate in arr_category:
url_main = f"{self.domain}"
stt_page = 1
retry = 0
max_retry = 5
proxy = ""
driver = None
brows = None
while True:
url = f"{url_main}/{cate}?tab=truyen-moi&page={stt_page}"
item_website = None
try:
retry = 0
max_retry = 5
content = None
while retry < max_retry:
try:
content = requests.get(url, proxies=proxy, headers=self.headers).content
break
except KeyboardInterrupt:
raise KeyboardInterrupt
except:
logger.error(f"{self.domain} - Error get")
logger.error(traceback.format_exc())
retry = retry + 1
time.sleep(3)
if content is None:
logger.error("=" * 20)
logger.error(f"{self.domain} - get_list_manga_newest - content none - page {stt_page}")
continue
main_div = html.fromstring(content)
list_item = main_div.xpath("//div[@id='tab-content']/div/div/div")
if len(list_item) == 0:
break
for item in list_item:
href = None
href_ele = item.xpath("div/a")
if len(href_ele) > 0:
href = href_ele[0].get("href")
if href is None:
continue
if "http" not in href:
href = f"{self.domain}{href}"
item_website = self.get_episodes(href)
untils.remove_folder_manga(item_website)
if system_status["error"]:
return
self.selenium_upload_custom_task()
except KeyboardInterrupt:
if item_website is not None:
untils.remove_folder_manga(item_website)
raise KeyboardInterrupt
except:
logger.error("=" * 20)
logger.error(f"{self.domain} - Error selenium_get_list_manga_newest")
logger.error(traceback.format_exc())
continue
stt_page = stt_page + 1
time.sleep(5)
def selenium_get_list_manga_newest(self):
url_main = f"{self.domain}"
stt_page = 1
retry = 0
max_retry = 5
proxy = ""
driver = None
brows = None
while retry < max_retry:
brows = ChromeUndetectedBrowser("", proxy)
try:
brows.init_driver()
driver = brows.get_driver()
break
except KeyboardInterrupt:
if brows is not None:
brows.stop()
untils.close_all_chrome_browsers()
raise KeyboardInterrupt
except:
logger.error(f"{self.domain} - Error selenium get")
logger.error(traceback.format_exc())
retry = retry + 1
time.sleep(3)
continue
if driver is None:
if brows is not None:
brows.stop()
untils.close_all_chrome_browsers()
return
while True:
url = f"{url_main}/search.php?act=search&andor=and&sort=chap&view=detail&page={stt_page}&ajax=true"
item_website = None
try:
retry = 0
max_retry = 5
content = None
while retry < max_retry:
try:
driver.get(url)
time.sleep(2)
content = driver.page_source
break
except KeyboardInterrupt:
if brows is not None:
brows.stop()
raise KeyboardInterrupt
except:
logger.error(f"TruyenTranh8 - Error selenium get - Url: {url}")
logger.error(traceback.format_exc())
retry = retry + 1
time.sleep(3)
if content is None:
logger.error("=" * 20)
logger.error(f"TruyenTranh8 - selenium_get_list_manga_newest - content none - page {stt_page}")
continue
main_div = html.fromstring(content)
list_item = main_div.xpath("//div[@id='blockdetail']/div")
if len(list_item) == 0:
break
for item in list_item:
href = None
href_ele = item.xpath("div/div/a")
if len(href_ele) > 0:
href = href_ele[0].get("href")
if href is None:
continue
item_website = self.selenium_get_episodes(href)
untils.remove_folder_manga(item_website)
self.selenium_upload_custom_task()
except KeyboardInterrupt:
if brows is not None:
brows.stop()
if item_website is not None:
untils.remove_folder_manga(item_website)
raise KeyboardInterrupt
except:
logger.error("=" * 20)
logger.error(f"{self.domain} - Error selenium_get_list_manga_newest")
logger.error(traceback.format_exc())
continue
stt_page = stt_page + 1
if brows is not None:
brows.stop()
untils.close_all_chrome_browsers()
def get_episodes(self, url=None):
if g_config.USE_SELENIUM:
return self.selenium_get_episodes(url)
item_manga = Manga()
item_website = None
brows = None
results = []
cookies = []
try:
retry = 0
max_retry = 5
content = None
proxy = ""
while retry < max_retry:
try:
content = requests.get(url, proxies=proxy, headers=self.headers).content
break
except KeyboardInterrupt:
raise KeyboardInterrupt
except:
logger.error(f"{self.domain} - Error get")
logger.error(traceback.format_exc())
retry = retry + 1
time.sleep(3)
if content is None:
logger.error("=" * 20)
logger.error(f"{self.domain} - get_episode_detail - content none")
return results
main_div = html.fromstring(content)
cookies = untils.format_to_sure_cookies(cookies)
list_item = main_div.xpath("//div[@id='tab-chapper']/div/ul/li")
item_manga = self.get_info_manga(item_manga, main_div)
item_website = untils.get_item_website(item_manga)
# Download thumbnail
item_manga.download_thumb(cookies, self.domain)
list_chapter_uploaded = item_website.get_list_chapter()
self.get_all_chapters(brows, list_item, item_manga, cookies, list_chapter_uploaded)
logger.info(f"Manga {item_manga.name}")
logger.info(f"Total episodes - {len(item_manga.episodes)}")
if system_status["error"]:
return item_website
for episode in item_manga.episodes:
if self.chapter_type == "image":
logger.info(f"Total images - {len(episode['images'])}")
# item_manga.download_images(episode, cookies, self.domain)
if g_config.TYPE_LINK_IMAGE == 'local':
untils.compress_a_dir(os.path.join(os.getcwd(), f"resources/{item_manga.slug}/{episode['name']}"))
untils.update_data_to_website(item_website, episode, self.chapter_type)
except KeyboardInterrupt:
raise KeyboardInterrupt
except:
logger.error("=" * 20)
logger.error(f"{self.domain} - Error selenium_get_episodes")
logger.error(traceback.format_exc())
return item_website
def selenium_get_episodes(self, url=None):
item_manga = Manga()
item_website = None
brows = None
results = []
cookies = ""
try:
retry = 0
max_retry = 5
content = None
proxy = ""
while retry < max_retry:
brows = ChromeUndetectedBrowser("", proxy)
try:
brows.init_driver()
driver = brows.get_driver()
driver.get(url)
content = driver.page_source
cookies = driver.get_cookies()
break
except KeyboardInterrupt:
if brows is not None:
brows.stop()
raise KeyboardInterrupt
except:
logger.error("TruyenTranh8 - Error selenium get")
logger.error(traceback.format_exc())
retry = retry + 1
time.sleep(3)
if content is None:
if brows is not None:
brows.stop()
logger.error("=" * 20)
logger.error("TruyenTranh8 - selenium_get_episode_detail - content none")
return results
main_div = html.fromstring(content)
cookies = untils.format_to_sure_cookies(cookies)
list_item = main_div.xpath("//ul[@class='mangadetail-chaplist']/li")
# print(f"Total chapter: {len(list_item)}")
item_manga = self.get_info_manga(item_manga, main_div)
item_website = untils.get_item_website(item_manga)
# Download thumbnail
item_manga.download_thumb(cookies)
list_chapter_uploaded = item_website.get_list_chapter()
self.get_all_chapters(brows, list_item, item_manga, cookies, list_chapter_uploaded)
logger.info(f"Manga {item_manga.name}")
logger.info(f"Total episodes - {len(item_manga.episodes)}")
if system_status["error"]:
return item_website
for episode in item_manga.episodes:
if self.chapter_type == "image":
logger.info(f"Total images - {len(episode['images'])}")
# item_manga.download_images(episode, cookies)
if g_config.TYPE_LINK_IMAGE == 'local':
untils.compress_a_dir(os.path.join(os.getcwd(), f"resources/{item_manga.slug}/{episode['name']}"))
untils.update_data_to_website(item_website, episode, self.chapter_type)
except KeyboardInterrupt:
if brows is not None:
brows.stop()
raise KeyboardInterrupt
except:
logger.error("=" * 20)
logger.error(f"{self.domain} - Error selenium_get_episodes")
logger.error(traceback.format_exc())
if brows is not None:
brows.stop()
return item_website
def get_info_manga(self, item_manga, main_div):
avatar = ""
name_episode = ""
name_alternative = ""
tags = []
author_name = "Đang Cập Nhật"
status = ""
like_number = 0
view_number = 0
follow_number = 0
description = ""
slug = ""
name_episode_ele = main_div.xpath("//h1[@class='story-intro-title']/a")
if len(name_episode_ele) > 0:
name_episode = name_episode_ele[0].text.strip()
author_ele = main_div.xpath("//p[@class='story-intro-author ']/a")
if len(author_ele) > 0:
author_name = author_ele[0].text.strip()
info_tag_ele = main_div.xpath("//div[@class='story-introduction-content pull-left']/div[1]/a")
if len(info_tag_ele) > 0:
for item in info_tag_ele:
tag = item.text.strip()
if tag not in tags:
tags.append(tag)
status_ele = main_div.xpath("//p[@class='story-intro-chapper']/span")
if len(status_ele) > 0:
status = status_ele[0].text.strip()
if status.lower() == "full":
status = "Hoàn Thành"
else:
status = "Đang Cập Nhật"
description_ele = main_div.xpath("//div[@class='tab-text text-justify']/p")
if len(description_ele) > 0:
description = untils.remove_a_tag(description_ele[0], ".//img")
if len(description) == 0:
description = f"Truyện tranh {name_episode} được cập nhật nhanh và đầy đủ nhất tại TruyenFull. " \
f"Bạn đọc đừng quên để lại bình luận và chia sẻ, ủng hộ TruyenFull ra các chương mới " \
f"nhất của truyện {name_episode}."
avatar_ele = main_div.xpath("//div[@class='story-intro-top']/div[1]/a/img")
if len(avatar_ele) > 0:
avatar = avatar_ele[0].get("src")
if "https" not in avatar and "http" not in avatar:
avatar = f"https:{avatar}"
slug_ele = main_div.xpath("//meta[@name='og:url']")
if len(slug_ele) > 0:
slug = slug_ele[0].get("content")
arr = slug.split("/")
slug = arr[-1]
if len(slug) == 0:
slug = arr[-2]
slug = slug.replace(".jpg", "")
item_manga.slug = slug
item_manga.avatar = avatar
item_manga.name = name_episode
item_manga.name_alternative = name_alternative
item_manga.author_name = author_name
item_manga.status = status
item_manga.tags = tags
item_manga.description = description
return item_manga
def get_all_chapters(self, brows, list_item, item_manga, cookies, list_chapter_uploaded):
limit_chapter = g_config.MAX_NUM_CHAPTER
stt = 0
time_try = 10
for item in list_item:
href = None
name = None
stt = stt + 1
if stt > limit_chapter:
break
href_ele = item.xpath("a")
if len(href_ele) > 0:
href = href_ele[0].get("href")
name = href_ele[0].text
name = name.split("-")[-1].strip()
if name is None or href is None:
continue
if name in list_chapter_uploaded:
continue
href = f"{self.domain}{href}"
# print(f"Get details - {name} - href - {href}")
stt_try = 0
while True:
check = self.get_episode_detail(brows, item_manga, href, name, cookies)
if check is False and stt_try > time_try:
system_status["error"] = True
system_status["message"] = "Không thể bật trình duyệt - đang tiến hành thử lại"
return
if check or stt_try > time_try:
time.sleep(2)
break
stt_try += 1
time.sleep(2)
def get_episode_detail(self, brows, item_manga, url=None, name=None, cookies=None):
if g_config.USE_SELENIUM:
return self.selenium_get_episode_detail(brows, item_manga, url, name, cookies)
try:
proxy = ""
retry = 0
max_retry = 5
content = None
while retry < max_retry:
try:
content = requests.get(url, proxies=proxy, headers=self.headers).content
break
except KeyboardInterrupt:
raise KeyboardInterrupt
except:
logger.error(f"{self.domain} - Error get")
logger.error(traceback.format_exc())
retry = retry + 1
time.sleep(3)
main_div = html.fromstring(content)
content = main_div.xpath("//div[@class='story-detail-content']")
if len(content) == 0:
return True
content = untils.remove_a_tag(content[0], ".//img")
content = untils.clear_text_chapter(content)
images = []
# print(f"images: {len(images)}")
item_manga.episodes.append({"name": name, "images": images, "content": content})
time.sleep(2)
except KeyboardInterrupt:
raise KeyboardInterrupt
except:
logger.error("=" * 20)
logger.error(f"{self.domain} - Error get_episode_detail")
logger.error(traceback.format_exc())
return True
def selenium_get_episode_detail(self, brows, item_manga, url=None, name=None, cookies=None):
try:
retry = 0
max_retry = 5
content = None
while retry < max_retry:
try:
driver = brows.get_driver()
driver.get(url)
content = driver.page_source
break
except KeyboardInterrupt:
raise KeyboardInterrupt
except:
logger.error(f"{self.domain} - Error selenium get")
logger.error(traceback.format_exc())
retry = retry + 1
time.sleep(3)
main_div = html.fromstring(content)
list_item = main_div.xpath("//div[@id='reading-detail']/div")
images = []
if len(list_item) == 0:
# print(content)
logger.error(f"List item empty")
logger.error(content)
return False
for item in list_item:
"""Skip 1 img first"""
img_ele = item.xpath("img")
if len(img_ele) == 0:
continue
src = img_ele[0].get("src", None)
if src is None:
continue
images.append(src)
"""Skip last img"""
images = images[:-1]
# print(f"images: {len(images)}")
item_manga.episodes.append({"name": name, "images": images})
time.sleep(2)
except KeyboardInterrupt:
if brows is not None:
brows.stop()
raise KeyboardInterrupt
except:
logger.error("=" * 20)
logger.error(f"{self.domain} - Error get_episode_detail")
logger.error(traceback.format_exc())
return True
def selenium_upload_custom_task(self):
truyenfull = TruyenFull(None, logger)
list_task_customs = truyenfull.get_list_task_customs(self.temp_domains)
if len(list_task_customs) == 0:
return
for item in list_task_customs:
logger.info(f"Start task upload - {item['link_manga']}")
item_website = self.get_episodes(item['link_manga'])
untils.remove_folder_manga(item_website)
logger.info(f"Done task upload - {item['link_manga']}")
truyenfull.update_task_custom({"id": item['id'], "status": 2})