File size: 5,371 Bytes
05164e0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import sys
from typing import List
import traceback
from transformers import AutoModelForCausalLM, AutoTokenizer
import json


# from flask import Flask, request, render_template
# from flask_cors import CORS
# app = Flask(__name__, static_folder='static')
# app.config['TEMPLATES_AUTO_RELOAD'] = True
# CORS(app, resources= {
#     r"/generate": {"origins": origins},
#     r"/infill": {"origins": origins},
# })
# origins=[f"http://localhost:{PORT}", "https://huggingface.co", "https://hf.space"]

CUDA = True
PORT = 7860
VERBOSE = False

from fastapi import FastAPI, Request
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse, StreamingResponse
app = FastAPI(docs_url=None, redoc_url=None)
app.mount("/static", StaticFiles(directory="static"), name="static")


print("loading model")
model = AutoModelForCausalLM.from_pretrained("facebook/incoder-6B")
print("loading tokenizer")
tokenizer = AutoTokenizer.from_pretrained("facebook/incoder-6B")
print("loading complete")

if CUDA:
    model = model.half().cuda()

BOS = "<|endoftext|>"
EOM = "<|endofmask|>"

def make_sentinel(i):
    return f"<|mask:{i}|>"

SPECIAL_TOKENS = [make_sentinel(i) for i in range(256)] + [EOM]

def generate(input, length_limit=None, temperature=None):
    input_ids = tokenizer(input, return_tensors="pt").input_ids
    if CUDA:
        input_ids = input_ids.cuda()
    output = model.generate(input_ids=input_ids, do_sample=True, top_p=0.95, temperature=temperature, max_length=length_limit)
    detok_hypo_str = tokenizer.decode(output.flatten())
    if detok_hypo_str.startswith(BOS):
        detok_hypo_str = detok_hypo_str[len(BOS):]
    return detok_hypo_str

def infill(parts: List[str], length_limit=None, temperature=None, extra_sentinel=False, max_retries=1):
    assert isinstance(parts, list)
    retries_attempted = 0
    done = False

    while (not done) and (retries_attempted < max_retries):
        retries_attempted += 1
        if VERBOSE:
            print(f"retry {retries_attempted}")
        if len(parts) == 1:
            prompt = parts[0]
        else:
            prompt = ""
            # encode parts separated by sentinel
            for sentinel_ix, part in enumerate(parts):
                prompt += part
                if extra_sentinel or (sentinel_ix < len(parts) - 1):
                    prompt += make_sentinel(sentinel_ix)
            
            # prompt += TokenizerWrapper.make_sentinel(0)
        
        infills = []
        complete = []

        done = True

        for sentinel_ix, part in enumerate(parts[:-1]):
            complete.append(part)
            prompt += make_sentinel(sentinel_ix)
            completion = generate(prompt, length_limit, temperature)
            completion = completion[len(prompt):]
            if EOM not in completion:
                if VERBOSE:
                    print(f"warning: {EOM} not found")
                completion += EOM
                # TODO: break inner loop here
                done = False
            completion = completion[:completion.index(EOM) + len(EOM)]
            infilled = completion[:-len(EOM)]
            infills.append(infilled)
            complete.append(infilled)
            prompt += completion
        complete.append(parts[-1])
        text = ''.join(complete)

    if VERBOSE:
        print("generated text:")
        print(prompt)
        print()
        print("parts:")
        print(parts)
        print()
        print("infills:")
        print(infills)
        print()
        print("restitched text:")
        print(text)
        print()
    
    return {
        'text': text,
        'parts': parts,
        'infills': infills,
        'retries_attempted': retries_attempted,
    } 


@app.head("/")
@app.get("/")
def index() -> FileResponse:
    return FileResponse(path="static/index.html", media_type="text/html")

@app.get('/generate')
async def generate_maybe(info: str):
    # form = await info.json()
    form = json.loads(info)
    prompt = form['prompt']
    length_limit = int(form['length'])
    temperature = float(form['temperature'])
    if VERBOSE:
        print(prompt)
    try:
        generation = generate(prompt, length_limit, temperature)
        return {'result': 'success', 'type': 'generate', 'prompt': prompt, 'text': generation}
    except Exception as e:
        traceback.print_exception(*sys.exc_info())
        return {'result': 'error', 'type': 'generate', 'prompt': prompt, 'text': f'There was an error: {e}. Tell Daniel.'}

@app.get('/infill')
async def infill_maybe(info: str):
    # form = await info.json()
    form = json.loads(info)
    length_limit = int(form['length'])
    temperature = float(form['temperature'])
    max_retries = 1
    extra_sentinel = True
    try:
        generation = infill(form['parts'], length_limit, temperature, extra_sentinel=extra_sentinel, max_retries=max_retries)
        generation['result'] = 'success'
        generation['type'] = 'infill'
        return generation
        # return {'result': 'success', 'prefix': prefix, 'suffix': suffix,  'text': generation['text']}
    except Exception as e:
        traceback.print_exception(*sys.exc_info())
        print(e)
        return {'result': 'error', 'type': 'infill', 'text': f'There was an error: {e}.'}


if __name__ == "__main__":
    app.run(host='0.0.0.0', port=PORT, threaded=False)