Spaces:
Sleeping
Sleeping
File size: 5,371 Bytes
05164e0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
import sys
from typing import List
import traceback
from transformers import AutoModelForCausalLM, AutoTokenizer
import json
# from flask import Flask, request, render_template
# from flask_cors import CORS
# app = Flask(__name__, static_folder='static')
# app.config['TEMPLATES_AUTO_RELOAD'] = True
# CORS(app, resources= {
# r"/generate": {"origins": origins},
# r"/infill": {"origins": origins},
# })
# origins=[f"http://localhost:{PORT}", "https://huggingface.co", "https://hf.space"]
CUDA = True
PORT = 7860
VERBOSE = False
from fastapi import FastAPI, Request
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse, StreamingResponse
app = FastAPI(docs_url=None, redoc_url=None)
app.mount("/static", StaticFiles(directory="static"), name="static")
print("loading model")
model = AutoModelForCausalLM.from_pretrained("facebook/incoder-6B")
print("loading tokenizer")
tokenizer = AutoTokenizer.from_pretrained("facebook/incoder-6B")
print("loading complete")
if CUDA:
model = model.half().cuda()
BOS = "<|endoftext|>"
EOM = "<|endofmask|>"
def make_sentinel(i):
return f"<|mask:{i}|>"
SPECIAL_TOKENS = [make_sentinel(i) for i in range(256)] + [EOM]
def generate(input, length_limit=None, temperature=None):
input_ids = tokenizer(input, return_tensors="pt").input_ids
if CUDA:
input_ids = input_ids.cuda()
output = model.generate(input_ids=input_ids, do_sample=True, top_p=0.95, temperature=temperature, max_length=length_limit)
detok_hypo_str = tokenizer.decode(output.flatten())
if detok_hypo_str.startswith(BOS):
detok_hypo_str = detok_hypo_str[len(BOS):]
return detok_hypo_str
def infill(parts: List[str], length_limit=None, temperature=None, extra_sentinel=False, max_retries=1):
assert isinstance(parts, list)
retries_attempted = 0
done = False
while (not done) and (retries_attempted < max_retries):
retries_attempted += 1
if VERBOSE:
print(f"retry {retries_attempted}")
if len(parts) == 1:
prompt = parts[0]
else:
prompt = ""
# encode parts separated by sentinel
for sentinel_ix, part in enumerate(parts):
prompt += part
if extra_sentinel or (sentinel_ix < len(parts) - 1):
prompt += make_sentinel(sentinel_ix)
# prompt += TokenizerWrapper.make_sentinel(0)
infills = []
complete = []
done = True
for sentinel_ix, part in enumerate(parts[:-1]):
complete.append(part)
prompt += make_sentinel(sentinel_ix)
completion = generate(prompt, length_limit, temperature)
completion = completion[len(prompt):]
if EOM not in completion:
if VERBOSE:
print(f"warning: {EOM} not found")
completion += EOM
# TODO: break inner loop here
done = False
completion = completion[:completion.index(EOM) + len(EOM)]
infilled = completion[:-len(EOM)]
infills.append(infilled)
complete.append(infilled)
prompt += completion
complete.append(parts[-1])
text = ''.join(complete)
if VERBOSE:
print("generated text:")
print(prompt)
print()
print("parts:")
print(parts)
print()
print("infills:")
print(infills)
print()
print("restitched text:")
print(text)
print()
return {
'text': text,
'parts': parts,
'infills': infills,
'retries_attempted': retries_attempted,
}
@app.head("/")
@app.get("/")
def index() -> FileResponse:
return FileResponse(path="static/index.html", media_type="text/html")
@app.get('/generate')
async def generate_maybe(info: str):
# form = await info.json()
form = json.loads(info)
prompt = form['prompt']
length_limit = int(form['length'])
temperature = float(form['temperature'])
if VERBOSE:
print(prompt)
try:
generation = generate(prompt, length_limit, temperature)
return {'result': 'success', 'type': 'generate', 'prompt': prompt, 'text': generation}
except Exception as e:
traceback.print_exception(*sys.exc_info())
return {'result': 'error', 'type': 'generate', 'prompt': prompt, 'text': f'There was an error: {e}. Tell Daniel.'}
@app.get('/infill')
async def infill_maybe(info: str):
# form = await info.json()
form = json.loads(info)
length_limit = int(form['length'])
temperature = float(form['temperature'])
max_retries = 1
extra_sentinel = True
try:
generation = infill(form['parts'], length_limit, temperature, extra_sentinel=extra_sentinel, max_retries=max_retries)
generation['result'] = 'success'
generation['type'] = 'infill'
return generation
# return {'result': 'success', 'prefix': prefix, 'suffix': suffix, 'text': generation['text']}
except Exception as e:
traceback.print_exception(*sys.exc_info())
print(e)
return {'result': 'error', 'type': 'infill', 'text': f'There was an error: {e}.'}
if __name__ == "__main__":
app.run(host='0.0.0.0', port=PORT, threaded=False)
|