taskManager / utils.py
rogerxavier's picture
Update utils.py
3953873 verified
raw
history blame contribute delete
No virus
15.6 kB
import json
import os
import random
from zipfile import ZipFile
import zipfile
import gradio as gr
from config import *
import shutil
from PIL import Image#上传保存遮罩和比较遮罩width height能否和章节图片匹配的时候用
from taskMap import *
from collections import Counter#计数删除少部分size不同图片
def load_config():
return get_variables()
class configData():
def __init__(self):
self.data = load_config()
def update(self):
self.data = load_config()
#print(self.data)
configData = configData()#复用实例
#通过图片路径判断是否是图片
def is_image_file(file_path):
try:
img = Image.open(file_path)
img.close()
return True
except IOError:
return False
# 解压缩函数
def unzip_file(file_path, extract_path):
with zipfile.ZipFile(file_path, 'r') as zip_ref:
zip_ref.extractall(extract_path)
# 上传并解压缩函数
#upload_and_unzip(file:"文件",extract_path:"解压路径")->"void 成功提示":
def upload_and_unzip(file_obj, extract_path):
print("执行上传解压函数")
##这样写如果新的多,那么替换为多的一方,如果新的少,也只会替换已经存在的部分--->符合预期想法
with ZipFile(file_obj.name) as zfile:
zfile.extractall(extract_path)
return "File uploaded and extracted successfully to "+extract_path
#构造函数获取当前目录file list信息-不放config进行返回是为了避免引入旧值
def update_file_info(root_dir):
info = {}
for root, dirs, files in os.walk(root_dir):
info[root] = {
"directories": dirs,
"files": files
}
for dir_name in dirs:
update_file_info(os.path.join(root, dir_name))
return info
#删除章节或者mask
def delete_file(delete_dir:str)->str:
for root, dirs, files in os.walk(delete_dir, topdown=False):
for name in files:
file_path = os.path.join(root, name)
print("删除指定目录函数检查到文件,删除文件:", file_path)
os.remove(file_path)
for name in dirs:
dir_path = os.path.join(root, name)
print("删除指定目录函数检查到目录,删除目录:", dir_path)
os.rmdir(dir_path)
return "message:"+ "成功删除{delete_dir}目录下的所有文件".format(delete_dir=delete_dir)
#随机返回一个章节,而不是第一个
def random_chapter():
# for root, info in configData.data['file_info'].items():
# 手动更新避免引入旧值-成功
file_info = update_file_info(configData.data['current_dir'])
configData.data['file_info'] = file_info
chapters = []
for root, info in configData.data['file_info'].items():
if root.startswith(configData.data['manga_abs_dir']):
for directory in info["directories"]:
if any(file.endswith('.jpg') or file.endswith('.png') for file in
os.listdir(os.path.join(root, directory))):
chapter_path = os.path.join(root, directory)
chapters.append(chapter_path)
if chapters:
# 随机选择一个章节
random_chapter_path = random.choice(chapters)
print("章节目录是:",random_chapter_path)
# 将路径转换为标题
folders = random_chapter_path.split(os.sep)
if len(folders) >= 2:
chapter_title = " ".join([folders[-2], folders[-1]]) # 获取倒数第二个和倒数第一个目录名作为标题
else:
chapter_title = random_chapter_path # 如果目录层级不足2层,直接使用路径作为标题
return chapter_title, random_chapter_path
return None, None
def list_files():
#查看files信息的时候需要更新configData
configData.update()
return configData.data['file_info']
def save_mask(mask_file, output_dir):
#上传遮罩保存到mask下面,名称随意
# 检查遮罩文件是否存在
# 构造保存文件的路径-保证linux和windows都能转义
save_path = os.path.join(output_dir, '0.jpg')
# 创建目录(如果目录不存在)
os.makedirs(output_dir, exist_ok=True)
# 使用with open方式保存文件
mask_file_img = mask_file #这个是Image.open后的对象
mask_file_img.save(save_path)
return "文件保存成功到:" + str(save_path)
def update_config_json(newConfig:str):
newConfig = json.loads(newConfig) #转换str为dict
with open(configData.data['current_config_json'], "w") as file:
file.write(json.dumps(newConfig, indent=4, ensure_ascii=False)) # 指定indent参数来保持JSON格式
#返回新的json文件内容
with open(configData.data['current_config_json'], "r") as f:
content =f.read() # 读取文件内容
return content
def zero_pad(s, length):
return s.zfill(length)
# 按照排序删除广告文件和片头
def delete_first_and_last_image_in_subdirectories(root_dir:str,deleted_list:list):
for root, dirs, files in os.walk(root_dir):
for dir_name in dirs:
dir_path = os.path.join(root, dir_name)
images = [f for f in os.listdir(dir_path) if is_image_file(os.path.join(dir_path, f))]#只要图片不要info.json等
images.sort(
key=lambda x: zero_pad(''.join(filter(str.isdigit, os.path.splitext(os.path.basename(x))[0])), 3))
for index in deleted_list:
if index < len(images) and index >= -len(images):
image_path = os.path.join(dir_path, images[index])
print("依据默认config,删除章节片头和结尾广告,可修改默认配置[0,1,-1]",image_path)
os.remove(image_path)
#void 删除当前chapter中部分size不同的少类图片,还有就是删除片头和广告,默认配置[0,1,-1],这个只对cur_chapter执行就不会多删了
def remove_different_size_images(chapter_path:str):
image_dir = chapter_path
# 获取目录下所有图片文件名
image_files = [f for f in os.listdir(image_dir) if is_image_file(os.path.join(image_dir, f))]
# 获取图片尺寸
image_sizes = []
for image_file in image_files:
with Image.open(os.path.join(image_dir, image_file)) as img:
image_sizes.append(img.size)
# 统计图片尺寸出现次数
size_counter = Counter(image_sizes)
# 找到占多数的章节图片尺寸
most_common_size = size_counter.most_common(1)[0][0]
# 删除其他尺寸的图片
for image_file, image_size in zip(image_files, image_sizes):
if image_size != most_common_size:
print("发现图片:",image_file,"大小不同于章节其他图片,认为是杂类图片,进行删除")
os.remove(os.path.join(image_dir, image_file))
# 按照默认配置删除片头和广告
delete_first_and_last_image_in_subdirectories(root_dir=chapter_path,deleted_list=configData.data['default_chapter_delete'])
def is_asp_task_valid()->bool:
# 启动状态检测 ->
# 1config允许定时任务 2 mask目录下齐备 3 manga_all目录下有可用素材 4 mask的width height和素材匹配
# 5应当将config写入文件而不是py随时可以修改,以防ck账号被封等情况下不发送->同时删除不可用账号 -全部space中账号反馈不可用的时候停止定时任务
# 6应该避免任务扎堆进行,对ocr space造成负担
print("判断任务条件时对config进行更新")
configData.update()
if configData.data['allow_scheduler'] == False:
print("当前配置不允许定时任务,停止定时任务")
return False
chapter_title ,cur_chapter= random_chapter()
if cur_chapter == None:
print("当前没有可用章节了,停止定时任务")
return False
print("开始删除当前chapter中部分size不同的少类图片")
remove_different_size_images(chapter_path=cur_chapter)
mask_file_path = os.path.join(configData.data['mask_dir'], '0.jpg')
if not os.path.exists(mask_file_path):
print("遮罩文件不存在,停止定时任务")
return False
# 读取遮罩图片长宽比较当前章节下面图片的长宽
mask_image = Image.open(mask_file_path)
# 获取指定目录下所有文件
all_files = os.listdir(cur_chapter)
# 过滤出图片文件
image_files = [file for file in all_files if file.endswith(('.jpg', '.jpeg', '.png', '.gif'))]
if image_files:
# 随机选择一个图片文件
random_chapter_image_name = random.choice(image_files)
random_chapter_image_path = os.path.join(cur_chapter, random_chapter_image_name)
# 打开选定的图片文件
random_chapter_image = Image.open(random_chapter_image_path)
# 现在random_chapter_image中包含了选中的随机图片文件
if mask_image.size != random_chapter_image.size:
print("遮罩大小和章节随机图片的大小不匹配,继续定时任务,此处只做记录,因为前面已经删除了少类图片,且space也可以微调mask")
print("遮罩大小是:",mask_image.size,"随机章节图片大小是:",random_chapter_image.size)
return True
else:
print("No image files found in the directory.")
return False
return True
#定时任务流程函数集合 ->返回None说明非正常执行情况
def run_asp_task():
if not is_asp_task_valid():
return None
#遍历bili_spaces中的space url并执行日常任务
for space_url in configData.data["bili_spaces"]:
process_task_list("0 1 2",space_url)
#实现gradio接受上传压缩文件,解压到manga_all目录下面
with gr.Blocks() as mangaManager:
#上传文件选择列表
file_selected = gr.File(label="待上传压缩章节",file_types=['.zip', '.tar', '.gz'])
#上传zip的解压保存路径或mask的地址(必选) 根据需要选择
output_dir_gr = gr.Dropdown(label="zip的解压保存路径或mask的地址/删除地址(必选)", choices=configData.data['default_values'])
#压缩包上传按钮
file_upload_btn = gr.Button("开始上传")
#获取到返回的结果-可以是上传的,可以是查看files信息,也可以是别的
someResult = gr.Textbox(label="获取按钮返回信息", type="text")
# 设置按钮点击事件(调用上传解压函数,将压缩包内容解压到指定目录 默认是manga_all下面)
file_upload_btn.click(fn=upload_and_unzip, inputs=[file_selected, output_dir_gr],outputs=someResult)
#设置按钮查看json类型的listFiles信息
filesInfoBtn = gr.Button("查看files信息")
filesInfoBtn.click(fn=list_files,outputs=someResult)
#设置mask上传遮罩按钮保存遮罩到mask目录-因为直接重启space利用docker上传会触发定时任务,也许造成不好结果
# mask_selected = gr.inputs.Image(label="待上传遮罩", type='pil')
mask_selected = gr.components.Image(label="待上传遮罩", type='pil')#代替inputs的新写法
markUploadBtn = gr.Button("上传遮罩mask")
markUploadBtn.click(fn=save_mask,inputs=[mask_selected, output_dir_gr],outputs=someResult)
#设置按钮关联新config输入进行更新操作
config_update_text = gr.Textbox(placeholder="输入新的JSON数据")#str类型
config_update_btn = gr.Button("更新config.json数据")
config_update_btn.click(fn=update_config_json, inputs=[config_update_text], outputs=someResult)
#删除指定目录文件 -对上传保存的文件进行删除 ->比如章节和mask不匹配,但是一个个删太麻烦,干脆一起
file_delete_btn = gr.Button("开始删除")
file_delete_btn.click(fn=delete_file, inputs=[output_dir_gr], outputs=someResult)
# 定义一个函数来处理输入的步骤转list然后执行,返回执行结果->taskResult/None
# gradio输入0 1 2 3就行,会自动转"0 1 2 3"然后给process_task_list处理
def process_task_list(input_task_str:str = None,baseUrl:str =None):
if not is_asp_task_valid():
print("任务基本条件不符合")
return None
if baseUrl is None:
print("baseUrl不存在")
return None
if input_task_str is None:
print("input_task_str不存在")
return None
chapter_title,cur_chapter_path = random_chapter()
task_results = {}#保存任务执行结果
#临时变量记录是否上传了章节-上传了需要删除,而且只能在for循环任务结束后删除
manga_has_uploaded = False
print(input_task_str)# "a b c" - >['a', 'b', 'c']
task_list = input_task_str.split(' ')
for task in task_list:
if task in task_functions:
task_func = task_functions[task]#func
task_param = tasks_params[task]#dict
if "baseUrl" in task_param:
task_param["baseUrl"] = baseUrl
if "mask_path" in task_param:
#使用后遮罩不删,否则维护成本太高
task_param["mask_path"] = configData.data['mask_abs_dir']
if "manga_path" in task_param:
task_param["manga_path"] = cur_chapter_path#上传一个章节作为本次素材,保存到space的manga下面
print("上传了章节:",cur_chapter_path,"下面提交后删除")
manga_has_uploaded = True
#执行了这个任务就要删除该章节
if "allow_submit" in task_param:
#采用config.json中的是否允许上传
task_param["allow_submit"] = configData.data['allow_submit']
print("当前是否允许上传bili:",configData.data['allow_submit'])
if "bili_meta" in task_param:
bili_meta_data["title"] = chapter_title
task_param["bili_meta"] = bili_meta_data
result = task_func(**task_param)#执行对应函数获取结果
task_results[task] = "success" if result is None else result#保存对应函数执行结果
else:
print("该执行流程函数不存在")
return None#立即推出循环结束函数
if manga_has_uploaded is True:
#如果上传章节任务得到了执行,那么删除
shutil.rmtree(cur_chapter_path) # 递归地删除指定路径的目录及其所有内容,包括文件和子目录 #删除应该在返回的时候删
return task_results
# 创建一个输入块,接受str+空格类型的输入,比如设置默认值3只查看结果output.mp4
# 将按钮添加到任务管理器中
with gr.Blocks() as taskManager:
# 任务需要 1:查看指定space的output video状态 2: 手动修改执行流程->比如上传后是发送还是查看output效果
# 3:可以添加按钮跳转路由
task_list_text = gr.components.Textbox(type="text", label="输入任务列表元素,用空格分隔(一般用来执首尾比如查看output状态)", lines=5)
baseSpaceUrl = gr.components.Textbox(type="text", label="指定手动任务的执行容器地址", lines=3)
# 获取到返回的结果-可以是上传的,可以是查看files信息,也可以是别的
someResult = gr.components.Textbox(label="获取按钮返回信息", type="text")
# 创建一个按钮块来触发处理函数
taskBtn = gr.Button("处理列表")
taskBtn.click(fn=process_task_list,inputs=[task_list_text, baseSpaceUrl],outputs=someResult)
new_text = gr.components.Textbox(placeholder="敬请期待") # str类型