| | import re
|
| | from rich.traceback import install
|
| | install(show_locals=False)
|
| |
|
| | import gradio as gr
|
| | import yaml
|
| | import functools
|
| | import time
|
| | import sys
|
| | import os
|
| | import copy
|
| |
|
| | sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
| | sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
| |
|
| | from config import RENDER_SAVE_LOAD_BTN, RENDER_STOP_BTN
|
| | from core.backend import call_write, call_accept, match_quote_text
|
| | from core.frontend_copy import enable_copy_js, on_copy
|
| | from core.frontend_setting import new_setting, render_setting
|
| | from core.frontend_utils import (
|
| | title, info,
|
| | create_progress_md, create_text_md, messages2chatbot,
|
| | init_writer, has_accept, is_running, try_cancel, writer_y_is_empty, writer_x_is_empty,
|
| | cancellable, process_writer_to_backend, process_writer_from_backend,
|
| | init_chapters_w, init_draft_w
|
| | )
|
| | from core.writer_utils import KeyPointMsg
|
| |
|
| | from prompts.baseprompt import clean_txt_content, load_prompt
|
| |
|
| |
|
| |
|
| | with open('prompts/idea-examples.yaml', 'r', encoding='utf-8') as file:
|
| | examples_data = yaml.safe_load(file)
|
| |
|
| |
|
| | examples = [[example['idea']] for example in examples_data['examples']]
|
| |
|
| | with gr.Blocks(head=enable_copy_js) as demo:
|
| | gr.HTML(title)
|
| | with gr.Accordion("使用指南"):
|
| | gr.Markdown(info)
|
| |
|
| | writer_state = gr.State(init_writer('', check_empty=False))
|
| | setting_state = gr.State(new_setting())
|
| |
|
| | if RENDER_SAVE_LOAD_BTN:
|
| | with gr.Row():
|
| | save_button = gr.Button("保存状态")
|
| | load_button = gr.Button("加载状态")
|
| | save_file_name = gr.Textbox(value='states', placeholder='输入文件名', lines=1, label=None, show_label=False, container=False)
|
| |
|
| | def save_states(save_file_name, writer, setting):
|
| | import json
|
| | json_file_name = save_file_name + '.json'
|
| | with open(json_file_name, 'w', encoding='utf-8') as f:
|
| | json.dump({
|
| | 'writer': writer,
|
| | 'setting': setting
|
| | }, f, ensure_ascii=False, indent=2)
|
| | gr.Info(f"状态已保存到文件:{json_file_name}")
|
| |
|
| | def load_states(save_file_name):
|
| | import json
|
| | json_file_name = save_file_name + '.json'
|
| | try:
|
| | with open(json_file_name, 'r', encoding='utf-8') as f:
|
| | states = json.load(f)
|
| | gr.Info(f"状态文件已加载:{json_file_name}")
|
| | states['setting']['render_time'] = time.time()
|
| |
|
| |
|
| | return states['writer'], states['setting']
|
| | except FileNotFoundError:
|
| | raise gr.Error(f"未找到保存的状态文件:{json_file_name}")
|
| |
|
| | idea_textbox = gr.Textbox(placeholder='用一段话描述你要写的小说,或者从下方示例中选择一个创意...', lines=2, scale=1, label=None, show_label=False, container=False, max_length=1000)
|
| |
|
| | gr.Examples(
|
| | label='示例',
|
| | examples=examples,
|
| | inputs=[idea_textbox],
|
| | )
|
| |
|
| | with gr.Row():
|
| | outline_btn = gr.Button("创作大纲", scale=1, min_width=1, interactive = True, variant='primary')
|
| | chapters_btn = gr.Button("创作剧情", scale=1, min_width=1, interactive = False, variant='secondary')
|
| | draft_btn = gr.Button("创作正文", scale=1, min_width=1, interactive = False, variant='secondary')
|
| | auto_checkbox = gr.Checkbox(label='一键生成', scale=1, value=False, visible=False)
|
| |
|
| | progress_md = create_progress_md(writer_state.value)
|
| | text_md = create_text_md(writer_state.value)
|
| |
|
| | @gr.render(inputs=writer_state)
|
| | def create_prompt_preview(writer):
|
| | prompt_outputs = writer['prompt_outputs'] if 'prompt_outputs' in writer else []
|
| | with gr.Accordion("Prompt预览", open=bool(prompt_outputs)):
|
| | pause_on_prompt_finished_checkbox = gr.Checkbox(label='允许在LLM响应完成后,预览Prompt', scale=1, value=writer['pause_on_prompt_finished_flag'])
|
| |
|
| | for i, prompt_output in enumerate(prompt_outputs, 1):
|
| | with gr.Tab(f"Prompt {i}"):
|
| | gr.Chatbot(messages2chatbot(prompt_output['response_msgs']), type='messages')
|
| | if not prompt_outputs:
|
| | gr.Markdown('当前没有可预览的Prompt。')
|
| |
|
| | continue_btn = gr.Button('继续', visible=bool(prompt_outputs), variant='primary')
|
| |
|
| | def on_pause_on_prompt_finished(value):
|
| | if value:
|
| | gr.Info("在LLM响应完成后,将可以预览Prompt")
|
| | writer['pause_on_prompt_finished_flag'] = value
|
| |
|
| | pause_on_prompt_finished_checkbox.change(on_pause_on_prompt_finished, [pause_on_prompt_finished_checkbox])
|
| |
|
| | def on_continue(writer):
|
| | writer['pause_flag'] = False
|
| | writer['prompt_outputs'] = []
|
| | return writer
|
| |
|
| | continue_btn.click(on_continue, writer_state, writer_state)
|
| |
|
| |
|
| | with gr.Row():
|
| | rewrite_all_button = gr.Button("开始创作", min_width=100, scale=2, variant='secondary', interactive=False)
|
| | suggestion_dropdown = gr.Dropdown(choices=[], min_width=100, scale=2, label=None, show_label=False, container=False, allow_custom_value=False)
|
| | quote_checkbox = gr.Checkbox(label='允许引用', min_width=100, scale=2, value=False)
|
| | gr.Textbox('窗口大小:', container=False, text_align='right', scale=1, min_width=100)
|
| | chunk_length_dropdown = gr.Dropdown(choices=[], min_width=80, scale=1, label=None, show_label=False, container=False, allow_custom_value=False)
|
| |
|
| | quote_md = gr.Markdown(visible=False)
|
| |
|
| | def on_quote_checkbox_change(writer, value):
|
| | if writer['current_w'] == 'outline_w':
|
| | gr.Info("大纲创作不支持引用\n考虑在剧情和正文创作中使用吧~")
|
| | return gr.update(value=False, visible=False)
|
| |
|
| | if value:
|
| | gr.Info("允许引用(右键或Ctrl+C复制你想引用的文本)")
|
| | writer['quote_span'] = None
|
| | writer['quoted_text'] = ''
|
| | return gr.update(value=None, visible=False)
|
| |
|
| | quote_checkbox.change(on_quote_checkbox_change, [writer_state, quote_checkbox], [quote_md])
|
| |
|
| | def on_chunk_length_change(writer, value):
|
| | current_w_name = writer['current_w']
|
| | writer[current_w_name]['y_chunk_length'] = value
|
| | return gr.update(value=value)
|
| |
|
| | chunk_length_dropdown.change(on_chunk_length_change, [writer_state, chunk_length_dropdown], [chunk_length_dropdown])
|
| |
|
| | def on_copy_handle(text, writer, setting, quote_checkbox):
|
| |
|
| | text = text.strip()
|
| |
|
| | if has_accept(writer):
|
| | gr.Info('考虑先接受或拒绝修改哦~')
|
| | return gr.update(visible=False)
|
| |
|
| | if len(text) < 10:
|
| | gr.Info('选中的文本太短,无法引用')
|
| | return gr.update(visible=False)
|
| |
|
| | if quote_checkbox:
|
| | quote_span, quoted_text = match_quote_text(writer, setting, text)
|
| | if quote_span:
|
| | writer['quote_span'] = quote_span
|
| | writer['quoted_text'] = quoted_text
|
| | lines = quoted_text.split('\n')
|
| | if len(lines) > 10:
|
| | lines[5:-5] = ['......']
|
| | lines = ['```', ] + lines + ['```', ]
|
| | quoted_text = '\n'.join(["> " + e for e in lines])
|
| | return gr.update(value=quoted_text, visible=True)
|
| | else:
|
| | gr.Info('未找到匹配的引用文本')
|
| |
|
| | writer['quote_span'] = None
|
| | writer['quoted_text'] = ''
|
| | return gr.update(visible=False)
|
| |
|
| | on_copy(on_copy_handle, [writer_state, setting_state, quote_checkbox], [quote_md])
|
| |
|
| |
|
| | suggestion_textbox = gr.Textbox(max_length=1000, placeholder='在这里输入你的意见,或者从右上单选框选择', lines=2, scale=1, label=None, show_label=False, container=False)
|
| |
|
| | with gr.Row():
|
| | accept_button = gr.Button("接受", scale=1, min_width=1, variant='secondary', interactive=False)
|
| | pause_button = gr.Button("暂停", scale=1, min_width=1, variant='secondary', visible=RENDER_STOP_BTN)
|
| | stop_button = gr.Button("取消", scale=1, min_width=1, variant='secondary')
|
| | flash_button = gr.Button("刷新", scale=1, min_width=1, variant='secondary')
|
| |
|
| | def flash_interface(writer):
|
| | current_w_name = writer['current_w']
|
| |
|
| | can_accept_flag = has_accept(writer) and not is_running(writer)
|
| | can_write_flag = not writer_x_is_empty(writer, current_w_name) and not can_accept_flag
|
| |
|
| | match current_w_name:
|
| | case 'outline_w':
|
| | rewrite_all_button = gr.update(value='开始创作', variant='primary' if can_write_flag else 'secondary', interactive=can_write_flag)
|
| | case 'chapters_w':
|
| | rewrite_all_button = gr.update(value='开始创作', variant='primary' if can_write_flag else 'secondary', interactive=can_write_flag)
|
| | case 'draft_w':
|
| | rewrite_all_button = gr.update(value='开始创作', variant='primary' if can_write_flag else 'secondary', interactive=can_write_flag)
|
| |
|
| | accept_button = gr.update(variant='primary' if can_accept_flag else 'secondary', interactive=can_accept_flag)
|
| |
|
| |
|
| | outline_btn = gr.update(
|
| | variant='primary' if current_w_name == 'outline_w' else 'secondary'
|
| | )
|
| | chapters_btn = gr.update(
|
| | interactive=not writer_y_is_empty(writer, 'outline_w'),
|
| | variant='primary' if current_w_name == 'chapters_w' else 'secondary'
|
| | )
|
| | draft_btn = gr.update(
|
| | interactive=not writer_y_is_empty(writer, 'chapters_w'),
|
| | variant='primary' if current_w_name == 'draft_w' else 'secondary'
|
| | )
|
| |
|
| | pause_button = gr.update(
|
| | value="继续" if writer['pause_flag'] else "暂停",
|
| | variant='secondary',
|
| | )
|
| |
|
| | suggestion_choices = writer['suggestions'][current_w_name]
|
| |
|
| | if writer_y_is_empty(writer, current_w_name):
|
| | suggestion_dropdown = gr.update(choices=suggestion_choices, value=suggestion_choices[0])
|
| | else:
|
| | suggestion_dropdown = gr.update(choices=suggestion_choices,)
|
| |
|
| | chunk_length_choices = writer['chunk_length'][current_w_name]
|
| | if cur_chunk_length := writer[current_w_name].get('y_chunk_length', None):
|
| | chunk_length_dropdown = gr.update(choices=chunk_length_choices, value=cur_chunk_length)
|
| | else:
|
| | chunk_length_dropdown = gr.update(choices=chunk_length_choices, value=chunk_length_choices[0])
|
| |
|
| | return (
|
| | create_text_md(writer),
|
| | create_progress_md(writer),
|
| | rewrite_all_button,
|
| | accept_button,
|
| | outline_btn,
|
| | chapters_btn,
|
| | draft_btn,
|
| | pause_button,
|
| | suggestion_dropdown,
|
| | chunk_length_dropdown
|
| | )
|
| |
|
| |
|
| | flash_event = dict(
|
| | fn=flash_interface,
|
| | inputs=[writer_state],
|
| | outputs=[
|
| | text_md,
|
| | progress_md,
|
| | rewrite_all_button,
|
| | accept_button,
|
| | outline_btn,
|
| | chapters_btn,
|
| | draft_btn,
|
| | pause_button,
|
| | suggestion_dropdown,
|
| | chunk_length_dropdown
|
| | ]
|
| | )
|
| |
|
| | flash_button.click(**flash_event)
|
| | if RENDER_SAVE_LOAD_BTN:
|
| | save_button.click(save_states, inputs=[save_file_name, writer_state, setting_state], outputs=[])
|
| | load_button.click(load_states, inputs=[save_file_name], outputs=[writer_state, setting_state]).success(**flash_event)
|
| |
|
| | stop_button.click(try_cancel, inputs=[writer_state]).success(**flash_event).success(lambda :gr.update(), None, writer_state)
|
| |
|
| |
|
| | @cancellable
|
| | def _on_write_all(writer, setting, auto_write=False, suggestion=None):
|
| | current_w_name = writer['current_w']
|
| |
|
| | if writer_x_is_empty(writer, current_w_name):
|
| | gr.Info('请先输入需要创作的内容!')
|
| | return
|
| |
|
| | writer['prompt_outputs'].clear()
|
| |
|
| | if writer['quote_span']:
|
| | quote_span, quoted_text = match_quote_text(writer, setting, writer['quoted_text'])
|
| | if quote_span != writer['quote_span'] or quoted_text != writer['quoted_text']:
|
| | raise gr.Error('引用文本不存在!')
|
| |
|
| | generator = call_write(process_writer_to_backend(writer), setting, auto_write, suggestion)
|
| |
|
| | new_writer = None
|
| | while True:
|
| | try:
|
| | kp_msg = next(generator)
|
| | if isinstance(kp_msg, KeyPointMsg):
|
| |
|
| | if kp_msg.is_prompt() and kp_msg.is_finished() and writer['pause_on_prompt_finished_flag']:
|
| | gr.Info('LLM响应完成,可以预览Prompt')
|
| | writer['pause_flag'] = True
|
| | if new_writer is None: continue
|
| | elif kp_msg.is_title():
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | continue
|
| |
|
| | else:
|
| | continue
|
| | else:
|
| | new_writer = kp_msg
|
| |
|
| | if writer['pause_flag']:
|
| | writer['prompt_outputs'] = copy.deepcopy(new_writer['prompt_outputs'])
|
| |
|
| | yield create_text_md(new_writer), writer
|
| |
|
| | while writer['pause_flag'] and not writer['cancel_flag']:
|
| | time.sleep(0.1)
|
| | else:
|
| | yield create_text_md(new_writer), gr.update()
|
| | except StopIteration as e:
|
| |
|
| | process_writer_from_backend(writer, e.value)
|
| | yield create_text_md(writer), writer
|
| | if has_accept(writer):
|
| | gr.Info('创作完成!点击接受按钮接受修改。')
|
| | else:
|
| | gr.Info('本次创作没有任何更改。')
|
| | return
|
| |
|
| | def on_auto_write_all(writer, setting, auto_write):
|
| | if auto_write:
|
| | yield from _on_write_all(writer, setting, True)
|
| | else:
|
| | pass
|
| |
|
| |
|
| |
|
| | writer_all_events = dict(
|
| | fn=on_auto_write_all,
|
| | queue=True,
|
| | inputs=[writer_state, setting_state, auto_checkbox],
|
| | outputs=[text_md, writer_state],
|
| | concurrency_limit=10
|
| | )
|
| |
|
| | def on_init_outline(idea, writer):
|
| | if not idea.strip():
|
| | gr.Info("先输入小说简介或从示例中选择一个")
|
| | return gr.update()
|
| | new_writer = init_writer(idea)
|
| | writer.update({
|
| | k:v for k, v in new_writer.items() if k in ['current_w', 'outline_w', 'prompt_outputs']
|
| | })
|
| | return writer
|
| |
|
| | outline_btn.click(on_init_outline, inputs=[idea_textbox, writer_state], outputs=[writer_state]).success(**writer_all_events).then(**flash_event)
|
| | chapters_btn.click(lambda writer: init_chapters_w(writer), inputs=[writer_state], outputs=[writer_state]).success(**writer_all_events).then(**flash_event)
|
| | draft_btn.click(lambda writer: init_draft_w(writer), inputs=[writer_state], outputs=[writer_state]).success(**writer_all_events).then(**flash_event)
|
| |
|
| | def on_select_suggestion(writer, setting, choice):
|
| | if choice == '自动':
|
| | return gr.update(value=choice, visible=False)
|
| |
|
| | current_w_name = writer['current_w']
|
| | dirname = writer['suggestions_dirname'][current_w_name]
|
| | suggestion = clean_txt_content(load_prompt(dirname, choice))
|
| | if suggestion.startswith("user:\n"):
|
| | suggestion = suggestion[len("user:\n"):]
|
| |
|
| | return gr.update(value=suggestion, visible=True)
|
| |
|
| | suggestion_dropdown.change(on_select_suggestion, inputs=[writer_state, setting_state, suggestion_dropdown], outputs=[suggestion_textbox])
|
| |
|
| | def on_write_all(writer, setting, suggestion):
|
| | if not suggestion.strip():
|
| | gr.Info('需要输入创作意见!')
|
| | return
|
| | yield from _on_write_all(writer, setting, False, suggestion)
|
| |
|
| | rewrite_all_button.click(
|
| | on_write_all,
|
| | queue=True,
|
| | inputs=[writer_state, setting_state, suggestion_textbox],
|
| | outputs=[text_md, writer_state],
|
| | concurrency_limit=10
|
| | ).then(**flash_event)
|
| |
|
| |
|
| | @cancellable
|
| | def on_accept_write(writer, setting):
|
| | current_w_name = writer['current_w']
|
| | current_w = writer[current_w_name]
|
| |
|
| | if not current_w['apply_chunks']:
|
| | raise gr.Error('请先进行创作!')
|
| |
|
| | new_writer = call_accept(process_writer_to_backend(writer), setting)
|
| | process_writer_from_backend(writer, new_writer)
|
| | yield create_text_md(writer), writer
|
| |
|
| | accept_button.click(fn=on_accept_write, inputs=[writer_state, setting_state], outputs=[text_md, writer_state]).then(**flash_event)
|
| |
|
| | def toggle_pause(writer):
|
| | if not is_running(writer):
|
| | gr.Info('当前没有正在进行的操作')
|
| | return gr.update()
|
| |
|
| | writer['pause_flag'] = not writer['pause_flag']
|
| |
|
| | return gr.update(value="暂停" if not writer['pause_flag'] else "继续")
|
| |
|
| | pause_button.click(
|
| | toggle_pause,
|
| | inputs=[writer_state],
|
| | outputs=[pause_button]
|
| | )
|
| |
|
| | @gr.render(inputs=setting_state)
|
| | def _render_setting(setting):
|
| | return render_setting(setting, setting_state)
|
| |
|
| |
|
| | demo.queue()
|
| | demo.launch(server_name="0.0.0.0", server_port=7860)
|
| |
|
| |
|
| |
|
| |
|