Spaces:
Paused
Paused
# -*- coding: utf-8 -*- | |
import shutil | |
from zipfile import ZipFile | |
import os | |
from os.path import basename | |
import subprocess | |
import time | |
from urllib.parse import urlparse | |
from urllib.parse import parse_qs | |
import re | |
import lxml | |
from lxml import html | |
from lxml.html.clean import Cleaner | |
import requests | |
from datetime import datetime | |
import logging | |
import traceback | |
import json | |
from modules.websites import TruyenFull | |
from modules import g_config, logger | |
new_pattern = '\s*(?:javascript:|jscript:|livescript:|vbscript:|data:|img:|about:|mocha:)' | |
lxml.html.clean._javascript_scheme_re = re.compile(new_pattern, re.I) | |
cleaner = Cleaner() | |
cleaner.javascript = True | |
cleaner.style = True | |
cleaner.embedded = True | |
def get_item_website(item_manga): | |
truyenfull = TruyenFull(item_manga, logger) | |
return truyenfull | |
def update_data_to_website(truyenfull, episode, chapter_type="image"): | |
is_created = truyenfull.create_manga(chapter_type) | |
if is_created: | |
truyenfull.init_post_id() | |
is_uploaded = truyenfull.upload_chapters_newest(episode, chapter_type) | |
if is_uploaded: | |
logger.info(f"Created success - {truyenfull.manga.name} - {episode['name']}") | |
else: | |
logger.info(f"Not create - {truyenfull.manga.name} - {episode['name']}") | |
try: | |
if chapter_type == "image": | |
path_episode = os.path.join(os.getcwd(), f"resources/{truyenfull.manga.slug}/{episode['name']}") | |
# remove_a_dir(path_episode) | |
except KeyboardInterrupt: | |
raise KeyboardInterrupt | |
except: | |
logger.error(f"Remove episode error - {truyenfull.manga.slug}") | |
logger.error(traceback.format_exc()) | |
time.sleep(3) | |
def remove_folder_manga(truyenfull): | |
path = os.path.join(os.getcwd(), f"resources/{truyenfull.manga.slug}") | |
remove_a_dir(path) | |
# Zip the files from given directory that matches the filter | |
def zipFilesInDir(dirName, zipFileName, filter): | |
# create a ZipFile object | |
with ZipFile(zipFileName, 'w') as zipObj: | |
# Iterate over all the files in directory | |
for folderName, subfolders, filenames in os.walk(dirName): | |
for filename in filenames: | |
if filter(filename): | |
# create complete filepath of file in directory | |
filePath = os.path.join(folderName, filename) | |
# Add file to zip | |
zipObj.write(filePath, basename(filePath)) | |
def compress_a_dir(path): | |
zipFilesInDir(path, f"{path}/archive.zip", lambda name: 'jpg' in name) | |
return True | |
def remove_a_dir(path): | |
shutil.rmtree(path) | |
def close_all_chrome_browsers(): | |
return | |
try: | |
subprocess.call("TASKKILL /f /IM CHROME.EXE") | |
subprocess.call("TASKKILL /f /IM CHROMEDRIVER.EXE") | |
except: | |
pass | |
def tear_down_python_scripts(): | |
return | |
pid = os.getpid() | |
command = f"pgrep -fl python | awk '!/{pid}/{{print $1}}' | xargs kill" | |
result = subprocess.Popen(command, shell=True) | |
def convert_link_images_to_do(manga_slug, name_chapter, images): | |
results = [] | |
for image in images: | |
parsed = urlparse(image) | |
file_name = parsed.path[1:] | |
file_name = file_name.replace("/", "-") | |
results.append(f"{g_config.HOST_GATEWAY_IMAGE}/image/{file_name}") | |
return results | |
def format_to_sure_cookies(cookies): | |
results = {} | |
for cookie in cookies: | |
results[cookie['name']] = cookie['value'] | |
return results | |
def download_file_img(url_img, path, cookies, domain="https://truyenqqpro.com/"): | |
# url = "https://truyenvua.com/12651/27/1.jpg?gf=hdfgdfg" | |
url = url_img | |
headers = { | |
'Referer': domain | |
} | |
try: | |
response = requests.get(url, stream=True, cookies=cookies, headers=headers) | |
with open(path, 'wb') as out_file: | |
shutil.copyfileobj(response.raw, out_file) | |
except KeyboardInterrupt: | |
raise KeyboardInterrupt | |
except: | |
logger.error(f"Download image error - {url}") | |
def remove_a_tag(ele, tag): | |
doc = cleaner.clean_html(ele) | |
img = doc.find(f'{tag}') | |
if img is None: | |
return html.tostring(doc) | |
parent = img.getparent() | |
parent.text = img.tail | |
doc.remove(img) | |
return html.tostring(doc) | |
def clear_text_chapter(content): | |
# content = str(content).replace("ThichTruyen.VN", "truyentranhfull.info") | |
return content | |