Spaces:
Running
Running
l='REQUEST_METHOD' | |
k='Content-Type' | |
j='wsgi.input' | |
i='CONTENT_LENGTH' | |
h='proxy-connection' | |
g='upgrade' | |
f='connection' | |
e='HTTP_' | |
d='PATH_INFO' | |
c='embykeeperweb' | |
b='embykeeper' | |
V='/ek' | |
U=None | |
T='install' | |
S='pip' | |
R='-m' | |
Q=set | |
P=Exception | |
M='CONTENT_TYPE' | |
G=False | |
D=int | |
B=print | |
A=True | |
import eventlet as H | |
H.monkey_patch() | |
import os as C,sys as K,tempfile as m,subprocess as E,shutil as F | |
from pathlib import Path | |
import threading as W | |
from appdirs import user_data_dir as n | |
import socketio as N,eventlet as H,requests as X | |
L='7.1.26' | |
o=Path(n(b)) | |
O=o/'hf'/'version' | |
def p(): | |
try: | |
O.mkdir(parents=A,exist_ok=A);M=O/f"emby-keeper-{L}";H=O/f"emby-keeper-{L}.tar.gz" | |
if M.exists():B(f"Using cached version from {M}",flush=A);return A | |
I=m.mkdtemp();B('Downloading EK...',flush=A);D=C.path.join(I,'embykeeper.tar.gz') | |
if H.exists():B(f"Using cached tarball from {H}",flush=A);F.copy2(H,D) | |
else:N=f"https://github.com/emby-keeper/emby-keeper/archive/refs/tags/v{L}.tar.gz";E.run(['wget','-q',N,'-O',D],check=A);F.copy2(D,H) | |
E.run(['tar','xf',D,'-C',I],check=A);C.remove(D);J=C.path.join(I,f"emby-keeper-{L}");B('Obfuscating code...',flush=A) | |
if not q(J):raise P('Obfuscation failed') | |
B('Installing dependencies...',flush=A);E.run([K.executable,R,S,T,'-r',C.path.join(J,'requirements.txt')],check=A);E.run([K.executable,R,S,T,J],check=A);F.copytree(J,M,dirs_exist_ok=A);F.rmtree(I);return A | |
except P as Q:B(f"Error setting up EK: {Q}",flush=A);return G | |
def q(package_path): | |
Q='dist';O='pyarmor';I=package_path | |
try: | |
E.run([K.executable,R,S,T,O],check=A) | |
for J in[b,c]: | |
D=C.path.join(I,J) | |
if not C.path.exists(D):B(f"Package directory not found: {D}",flush=A);continue | |
B(f"Obfuscating {J}...",flush=A);E.run([O,'gen','--recursive','--output',C.path.join(I,Q),D],check=A);H=C.path.join(I,Q) | |
if C.path.exists(H): | |
for M in C.listdir(H): | |
L=C.path.join(H,M);N=C.path.join(D,M) | |
if C.path.isdir(L):F.copytree(L,N,dirs_exist_ok=A) | |
else:F.copy2(L,N) | |
F.rmtree(H) | |
else:B(f"Dist directory not found after obfuscation for {J}",flush=A);return G | |
return A | |
except P as U:B(f"Error during obfuscation: {U}",flush=A);return G | |
def Y(): | |
with H.listen(('',0))as A:return A.getsockname()[1] | |
def r(): | |
J='Prompt Generator v0.2(More tags)';H='Prompt Generator v0.1(Better quality)';import gradio as C,random as M;from time import time,ctime | |
def F(choice,num,artist): | |
L='art by';I=choice;E=artist;C=num;N=time();B(ctime(N)) | |
if I==H:G=open('pr1.txt').read().splitlines() | |
elif I==J:G=open('pr2.txt').read().splitlines() | |
if D(C)<1 or D(C)>20:C=10 | |
if D(E)<0 or D(E)>40:E=2 | |
O=len(G);A=[];K=0 | |
while len(sorted(Q(A),key=lambda d:A.index(d)))<D(C): | |
F=M.choice(G) | |
if F.startswith(L)and K<D(E):K+=1;A.append(F) | |
elif not F.startswith(L):A.append(F) | |
B(', '.join(Q(A))+'\n\n');return', '.join(Q(A)) | |
E=C.Blocks() | |
with E: | |
C.HTML('\n <div style="text-align: center; margin: 0 auto;">\n <div style="display: inline-flex;align-items: center;gap: 0.8rem;font-size: 1.75rem;">\n <h1 style="font-weight: 900; margin-bottom: 7px;margin-top:5px">\n Simple Prompt Generator v0.6 (Gradio Demo)\n </h1>\n </div>\n <p style="margin-bottom: 10px; font-size: 94%; line-height: 23px;">\n Simple prompt generation script for Midjourney. EmbyKeeper is in <a href="#" onclick="const url = window.location.href; const isHfSpaces = url.includes(\'spaces/\'); const newUrl = isHfSpaces ? url.replace(\'huggingface.co/spaces/\', \'\').replace(\'prompt-generator\', \'prompt-generator.hf.space/ek\') : \'/ek\'; window.open(newUrl, \'_blank\'); return false;">/ek</a> path. <br> <p>More examples in <a class=\'link-info\' href="https://github.com/emby-keeper/emby-keeper" target="_blank">Github</a> and <a class=\'link-info\' href="https://emby-keeper.github.io/" target="_blank">Project site</a></p>\n </p>\n <center>\n <img style="display: inline-block, margin-right: 1%;" src=\'https://visitor-badge.laobi.icu/badge?page_id=WiNE-iNEFF.Simple_Prompt_Generator&left_color=red&right_color=green&left_text=Visitors\' alt=\'visitor badge\'>\n </center>\n </div>\n ') | |
with C.Column():K=C.Radio([H,J],label='Model Variant',value=H);L=C.Number(value='10',label='Num of tag (MAX 20)',show_label=A);N=C.Number(value='2',label='Num of artist (Standart 2)',show_label=A);O=C.Textbox(lines=4,label='Generated Prompts') | |
P=C.Button('Generate');P.click(fn=F,inputs=[K,L,N],outputs=O,concurrency_limit=4);C.HTML('\n <div class="footer">\n <div style=\'text-align: center;\'>Simple Prompt Generator by <a href=\'https://twitter.com/wine_ineff\' target=\'_blank\'>Artsem Holub (WiNE-iNEFF)</a><br>More information about this demo and script your can find in <a class=\'link-info\' href="https://github.com/emby-keeper/emby-keeper" target="_blank">Github</a> and <a class=\'link-info\' href="https://emby-keeper.github.io/" target="_blank">Project site</a></div>\n </div>\n ') | |
E.queue();B(f"Starting Gradio on port {I}",flush=A);E.launch(server_name='0.0.0.0',server_port=I,share=G,debug=A,show_error=A,prevent_thread_lock=A) | |
def s(): | |
O='disconnect';L='connect';C='/pty';E=N.Server(async_mode='eventlet');P=N.WSGIApp(E);F=N.Client();K={} | |
def R(sid,environ): | |
B(f"Client connected: {sid}") | |
if not F.connected:F.connect(f"http://127.0.0.1:{J}",namespaces=[C]) | |
K[sid]=A | |
def S(sid): | |
B(f"Client disconnected: {sid}");K.pop(sid,U) | |
if not K:F.disconnect() | |
def T(event,sid,*D): | |
A=event | |
if A not in[L,O]:B(f"Forward to ek: {A}");F.emit(A,*D,namespace=C) | |
def W(event,*D): | |
A=event | |
if A not in[L,O]:B(f"Forward from ek: {A}");E.emit(A,*D,namespace=C) | |
def Q(environ,start_response): | |
B=environ;H=B[d] | |
if H.startswith(V):K=f"http://127.0.0.1:{J}" | |
else:K=f"http://127.0.0.1:{I}" | |
P=f"{K}{H}";F={} | |
for(L,Q)in B.items(): | |
if L.startswith(e): | |
N=L[5:].replace('_','-').title() | |
if N.lower()not in[f,g,h]:F[N]=Q | |
C=B.get(i);O=U | |
if C: | |
C=D(C);O=B[j].read(C) | |
if B.get(M):F[k]=B[M] | |
E=X.request(method=B[l],url=P,headers=F,data=O,stream=A,allow_redirects=G);start_response(f"{E.status_code} {E.reason}",list(E.headers.items()));return E.iter_content(chunk_size=4096) | |
P.wsgi_app=Q;H.wsgi.server(H.listen(('',7860)),P) | |
if __name__=='__main__': | |
B('Setting up EK...',flush=A) | |
if not p():B('Failed to setup EK!',flush=A);K.exit(1) | |
I=Y();J=Y();B(f"Using ports - Gradio: {I}, EK: {J}",flush=A);Z=W.Thread(target=r);Z.daemon=A;Z.start();a=W.Thread(target=lambda:E.run([c,'--port',str(J),'--prefix',V,'--public']));a.daemon=A;a.start() | |
def t(environ,start_response): | |
B=environ;H=B[d] | |
if H.startswith(V):K=f"http://127.0.0.1:{J}" | |
else:K=f"http://127.0.0.1:{I}" | |
P=f"{K}{H}";F={} | |
for(L,Q)in B.items(): | |
if L.startswith(e): | |
N=L[5:].replace('_','-').title() | |
if N.lower()not in[f,g,h]:F[N]=Q | |
C=B.get(i);O=U | |
if C: | |
C=D(C);O=B[j].read(C) | |
if B.get(M):F[k]=B[M] | |
E=X.request(method=B[l],url=P,headers=F,data=O,stream=A,allow_redirects=G);start_response(f"{E.status_code} {E.reason}",list(E.headers.items()));return E.iter_content(chunk_size=4096) | |
B('Starting proxy server on port 7860...',flush=A);s() |