Spaces:
Running
on
Zero
Running
on
Zero
File size: 2,687 Bytes
08e1ed6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
from func_timeout import FunctionTimedOut, func_timeout
import os
LOGO_HEADER = """from myturtle_cv import Turtle
from myturtle import HALF_INF, INF, EPS_DIST, EPS_ANGLE
turtle = Turtle()
def forward(dist):
turtle.forward(dist)
def left(angle):
turtle.left(angle)
def right(angle):
turtle.right(angle)
def teleport(x, y, theta):
turtle.teleport(x, y, theta)
def penup():
turtle.penup()
def pendown():
turtle.pendown()
def position():
return turtle.x, turtle.y
def heading():
return turtle.heading
def isdown():
return turtle.is_down
def fork_state():
\"\"\"
Fork the current state of the turtle.
Usage:
with fork_state():
forward(100)
left(90)
forward(100)
\"\"\"
return turtle._TurtleState(turtle)"""
def run_code(new_folder, counter, code):
import matplotlib
fname = f"{new_folder}/logo_{counter}_.jpg"
counter += 1
code_with_header_and_save= f"""
{LOGO_HEADER}
{code}
# turtle.save('{fname}')
gif = turtle.save_gif('')
"""
try:
results = {}
func_timeout(20, exec, args=(code_with_header_and_save, results))
# exec(code_with_header_and_save, globals())
if 'gif' in results:
return results['gif']
except FunctionTimedOut:
print("Timeout")
except Exception as e:
print(e)
MOCK_RESPONSE = [
"""for i in range(7):
with fork_state():
for j in range(4):
forward(2*i)
left(90.0)
"""
] * 16
if __name__ == "__main__":
codes = MOCK_RESPONSE
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
gif_results = []
import hashlib
import random
random_id = hashlib.md5(str(random.random()).encode()).hexdigest()[0:4]
gradio_test_images_folder = "gradio_test_images"
new_folder = os.path.join(gradio_test_images_folder, random_id)
with ProcessPoolExecutor() as executor:
futures = [executor.submit(run_code, new_folder, i, code) for i, code in enumerate(codes)]
for future in as_completed(futures):
try:
gif_results.append(future.result())
except Exception as exc:
print(f'Generated an exception: {exc}')
# with open("temp.py", 'w') as f:
# f.write(code_with_header_and_save)
# p = subprocess.Popen(["python", "temp.py"], stderr=subprocess.PIPE, stdout=subprocess.PIPE, env=my_env)
# out, errs = p.communicate()
# out, errs, = out.decode(), errs.decode()
# render
print(random_id)
folder_path = f"gradio_test_images/{random_id}"
breakpoint()
gif_results, codes |