Spaces:
Sleeping
Sleeping
| # encoding:utf-8 | |
| import os | |
| from config import conf, load_config | |
| from channel import channel_factory | |
| from common.log import logger | |
| from plugins import * | |
| import signal | |
| import sys | |
| import config | |
| import gradio as gr | |
| from io import BytesIO | |
| from PIL import Image | |
| from concurrent.futures import ThreadPoolExecutor | |
| thread_pool = ThreadPoolExecutor(max_workers=8) | |
| def getImage(bytes): | |
| bytes_stream = BytesIO(bytes) | |
| image = Image.open(bytes_stream) | |
| return image | |
| def getLoginUrl(): | |
| # load config | |
| config.load_config() | |
| # create channel | |
| bot = channel_factory.create_channel("wx") | |
| thread_pool.submit(bot.startup) | |
| while (True): | |
| if bot.getQrCode(): | |
| return getImage(bot.getQrCode()) | |
| def sigterm_handler_wrap(_signo): | |
| old_handler = signal.getsignal(_signo) | |
| def func(_signo, _stack_frame): | |
| logger.info("signal {} received, exiting...".format(_signo)) | |
| conf().save_user_datas() | |
| return old_handler(_signo, _stack_frame) | |
| signal.signal(_signo, func) | |
| def run(): | |
| try: | |
| # load config | |
| load_config() | |
| # ctrl + c | |
| sigterm_handler_wrap(signal.SIGINT) | |
| # kill signal | |
| sigterm_handler_wrap(signal.SIGTERM) | |
| # create channel | |
| channel_name=conf().get('channel_type', 'wx') | |
| if channel_name == 'wxy': | |
| os.environ['WECHATY_LOG']="warn" | |
| # os.environ['WECHATY_PUPPET_SERVICE_ENDPOINT'] = '127.0.0.1:9001' | |
| channel = channel_factory.create_channel(channel_name) | |
| if channel_name in ['wx','wxy','wechatmp']: | |
| PluginManager().load_plugins() | |
| # startup channel | |
| channel.startup() | |
| except Exception as e: | |
| logger.error("App startup failed!") | |
| logger.exception(e) | |
| if __name__ == '__main__': | |
| #run() | |
| try: | |
| with gr.Blocks() as demo: | |
| with gr.Row(): | |
| with gr.Column(): | |
| btn = gr.Button(value="生成二维码") | |
| with gr.Column(): | |
| outputs=[gr.Pil()] | |
| btn.click(getLoginUrl, outputs=outputs) | |
| demo.launch() | |
| except Exception as e: | |
| logger.error("App startup failed!") | |
| logger.exception(e) | |