Spaces:
Sleeping
Sleeping
File size: 11,541 Bytes
b0825a9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 |
import json
import os
import random
from zipfile import ZipFile
import zipfile
import gradio as gr
from config import *
import shutil
from PIL import Image#上传保存遮罩和比较遮罩width height能否和章节图片匹配的时候用
from taskMap import *
def load_config():
return get_variables()
class configData():
def __init__(self):
self.data = load_config()
def update(self):
self.data = load_config()
print(self.data)
configData = configData()#复用实例
# 解压缩函数
def unzip_file(file_path, extract_path):
with zipfile.ZipFile(file_path, 'r') as zip_ref:
zip_ref.extractall(extract_path)
# 上传并解压缩函数
#upload_and_unzip(file:"文件",extract_path:"解压路径")->"void 成功提示":
def upload_and_unzip(file_obj, extract_path):
print("执行上传解压函数")
##这样写如果新的多,那么替换为多的一方,如果新的少,也只会替换已经存在的部分--->符合预期想法
with ZipFile(file_obj.name) as zfile:
zfile.extractall(extract_path)
return "File uploaded and extracted successfully to "+extract_path
#构造函数获取当前目录file list信息-不放config进行返回是为了避免引入旧值
def update_file_info(root_dir):
info = {}
for root, dirs, files in os.walk(root_dir):
info[root] = {
"directories": dirs,
"files": files
}
for dir_name in dirs:
update_file_info(os.path.join(root, dir_name))
return info
def random_chapter():
# for root, info in configData.data['file_info'].items():
#手动更新避免引入旧值-成功
file_info = update_file_info(configData.data['current_dir'])
configData.data['file_info'] = file_info
for root, info in configData.data['file_info'].items():
if root.startswith(configData.data['manga_abs_dir']):
for directory in info["directories"]:
if any(file.endswith('.jpg') or file.endswith('.png') for file in os.listdir(os.path.join(root, directory))):
chapter_path = os.path.join(root, directory)
# 删除章节目录
print("章节目录是:",chapter_path)
#获取标题
# 将路径转换为标题
folders = chapter_path.split(os.sep)
if len(folders) >= 2:
chapter_title = " ".join([folders[-2], folders[-1]]) # 获取倒数第二个和倒数第一个目录名作为标题
else:
chapter_title = chapter_path # 如果目录层级不足2层,直接使用路径作为标题
# 获取标题
return chapter_title,chapter_path#返回章节标题和章节目录给任务-1批量顺序上传原始图片到服务器manga.
return None,None
def list_files():
return configData.data['file_info']
def save_mask(mask_file, output_dir):
#上传遮罩保存到mask下面,名称随意
# 检查遮罩文件是否存在
# 构造保存文件的路径-保证linux和windows都能转义
save_path = os.path.join(output_dir, '0.jpg')
# 创建目录(如果目录不存在)
os.makedirs(output_dir, exist_ok=True)
# 使用with open方式保存文件
mask_file_img = mask_file #这个是Image.open后的对象
mask_file_img.save(save_path)
return "文件保存成功到:" + str(save_path)
def update_config_json(newConfig:str):
newConfig = json.loads(newConfig) #转换str为dict
with open(configData.data['current_config_json'], "w") as file:
file.write(json.dumps(newConfig, indent=4, ensure_ascii=False)) # 指定indent参数来保持JSON格式
#返回新的json文件内容
with open(configData.data['current_config_json'], "r") as f:
content =f.read() # 读取文件内容
return content
def is_asp_task_valid()->bool:
# 启动状态检测 ->
# 1config允许定时任务 2 mask目录下齐备 3 manga_all目录下有可用素材 4 mask的width height和素材匹配
# 5应当将config写入文件而不是py随时可以修改,以防ck账号被封等情况下不发送->同时删除不可用账号 -全部space中账号反馈不可用的时候停止定时任务
# 6应该避免任务扎堆进行,对ocr space造成负担
if configData.data['allow_scheduler'] == False:
print("当前配置不允许定时任务,停止定时任务")
return False
chapter_title ,cur_chapter= random_chapter()
if cur_chapter == None:
print("当前没有可用章节了,停止定时任务")
return False
mask_file_path = os.path.join(configData.data['mask_dir'], '0.jpg')
if not os.path.exists(mask_file_path):
print("遮罩文件不存在,停止定时任务")
return False
# 读取遮罩图片长宽比较当前章节下面图片的长宽
mask_image = Image.open(mask_file_path)
# 获取指定目录下所有文件
all_files = os.listdir(cur_chapter)
# 过滤出图片文件
image_files = [file for file in all_files if file.endswith(('.jpg', '.jpeg', '.png', '.gif'))]
if image_files:
# 随机选择一个图片文件
random_chapter_image_name = random.choice(image_files)
random_chapter_image_path = os.path.join(cur_chapter, random_chapter_image_name)
# 打开选定的图片文件
random_chapter_image = Image.open(random_chapter_image_path)
# 现在random_chapter_image中包含了选中的随机图片文件
if mask_image.size != random_chapter_image.size:
print("遮罩大小和章节随机一个图片的大小不匹配,停止定时任务")
return False
else:
print("No image files found in the directory.")
return False
return True
#定时任务流程函数集合 ->返回None说明非正常执行情况
def run_asp_task():
if not is_asp_task_valid():
return None
#遍历bili_spaces中的space url并执行日常任务
for space_url in configData.data["bili_spaces"]:
process_task_list("0 1 2",space_url)
#实现gradio接受上传压缩文件,解压到manga_all目录下面
with gr.Blocks() as mangaManager:
#上传文件选择列表
file_selected = gr.File(label="待上传压缩章节",file_types=['.zip', '.tar', '.gz'])
#上传zip的解压保存路径或mask的地址(必选) 根据需要选择
output_dir_gr = gr.Dropdown(label="上传zip的解压保存路径或mask的地址(必选)", choices=configData.data['default_values'])
#压缩包上传按钮
file_upload_btn = gr.Button("开始上传")
#获取到返回的结果-可以是上传的,可以是查看files信息,也可以是别的
someResult = gr.Textbox(label="获取按钮返回信息", type="text")
# 设置按钮点击事件(调用上传解压函数,将压缩包内容解压到指定目录 默认是manga_all下面)
file_upload_btn.click(fn=upload_and_unzip, inputs=[file_selected, output_dir_gr],outputs=someResult)
#设置按钮查看json类型的listFiles信息
filesInfoBtn = gr.Button("查看files信息")
filesInfoBtn.click(fn=list_files,outputs=someResult)
#设置mask上传遮罩按钮保存遮罩到mask目录-因为直接重启space利用docker上传会触发定时任务,也许造成不好结果
# mask_selected = gr.inputs.Image(label="待上传遮罩", type='pil')
mask_selected = gr.components.Image(label="待上传遮罩", type='pil')#代替inputs的新写法
markUploadBtn = gr.Button("上传遮罩mask")
markUploadBtn.click(fn=save_mask,inputs=[mask_selected, output_dir_gr],outputs=someResult)
#设置按钮关联新config输入进行更新操作
config_update_text = gr.Textbox(placeholder="输入新的JSON数据")#str类型
config_update_btn = gr.Button("更新config.json数据")
config_update_btn.click(fn=update_config_json, inputs=[config_update_text], outputs=someResult)
# 定义一个函数来处理输入的步骤转list然后执行,返回执行结果->taskResult/None
# gradio输入0 1 2 3就行,会自动转"0 1 2 3"然后给process_task_list处理
def process_task_list(input_task_str:str = None,baseUrl:str =None):
if not is_asp_task_valid():
print("任务基本条件不符合")
return None
if baseUrl is None:
print("baseUrl不存在")
return None
if input_task_str is None:
print("input_task_str不存在")
return None
chapter_title,cur_chapter_path = random_chapter()
task_results = {}#保存任务执行结果
#临时变量记录是否上传了章节-上传了需要删除,而且只能在for循环任务结束后删除
manga_has_uploaded = False
print(input_task_str)# "a b c" - >['a', 'b', 'c']
task_list = input_task_str.split(' ')
for task in task_list:
if task in task_functions:
task_func = task_functions[task]#func
task_param = tasks_params[task]#dict
if "baseUrl" in task_param:
task_param["baseUrl"] = baseUrl
if "mask_path" in task_param:
#使用后遮罩不删,否则维护成本太高
task_param["mask_path"] = configData.data['mask_abs_dir']
if "manga_path" in task_param:
task_param["manga_path"] = cur_chapter_path#上传一个章节作为本次素材,保存到space的manga下面
print("上传了章节:",cur_chapter_path,"下面提交后删除")
manga_has_uploaded = True
#执行了这个任务就要删除该章节
if "bili_meta" in task_param:
bili_meta_data["title"] = chapter_title
task_param["bili_meta"] = bili_meta_data
result = task_func(**task_param)#执行对应函数获取结果
task_results[task] = "success" if result is None else result#保存对应函数执行结果
else:
print("该执行流程函数不存在")
return None#立即推出循环结束函数
if manga_has_uploaded is True:
#如果上传章节任务得到了执行,那么删除
shutil.rmtree(cur_chapter_path) # 递归地删除指定路径的目录及其所有内容,包括文件和子目录 #删除应该在返回的时候删
return task_results
# 创建一个输入块,接受str+空格类型的输入,比如设置默认值3只查看结果output.mp4
# 将按钮添加到任务管理器中
with gr.Blocks() as taskManager:
# 任务需要 1:查看指定space的output video状态 2: 手动修改执行流程->比如上传后是发送还是查看output效果
# 3:可以添加按钮跳转路由
task_list_text = gr.components.Textbox(type="text", label="输入任务列表元素,用空格分隔(一般用来执首尾比如查看output状态)", lines=5)
baseSpaceUrl = gr.components.Textbox(type="text", label="指定手动任务的执行容器地址", lines=3)
# 获取到返回的结果-可以是上传的,可以是查看files信息,也可以是别的
someResult = gr.components.Textbox(label="获取按钮返回信息", type="text")
# 创建一个按钮块来触发处理函数
taskBtn = gr.Button("处理列表")
taskBtn.click(fn=process_task_list,inputs=[task_list_text, baseSpaceUrl],outputs=someResult)
new_text = gr.components.Textbox(placeholder="敬请期待") # str类型
|