EureCA / dsp /modules /hf_client.py
tonneli's picture
Delete history
f5776d3
import functools
import os
import random
import requests
from dsp.modules.hf import HFModel, openai_to_hf
from dsp.modules.cache_utils import CacheMemory, NotebookCacheMemory, cache_turn_on
import os
import subprocess
import re
import shutil
import time
# from dsp.modules.adapter import TurboAdapter, DavinciAdapter, LlamaAdapter
import backoff
ERRORS = (Exception)
def backoff_hdlr(details):
"""Handler from https://pypi.org/project/backoff/"""
print(
"Backing off {wait:0.1f} seconds after {tries} tries "
"calling function {target} with kwargs "
"{kwargs}".format(**details)
)
class HFClientTGI(HFModel):
def __init__(self, model, port, url="http://future-hgx-1", http_request_kwargs=None, **kwargs):
super().__init__(model=model, is_client=True)
self.url = url
self.ports = port if isinstance(port, list) else [port]
self.http_request_kwargs = http_request_kwargs or {}
self.headers = {"Content-Type": "application/json"}
self.kwargs = {
"temperature": 0.01,
"max_tokens": 75,
"top_p": 0.97,
"n": 1,
"stop": ["\n", "\n\n"],
**kwargs,
}
# print(self.kwargs)
def _generate(self, prompt, **kwargs):
kwargs = {**self.kwargs, **kwargs}
payload = {
"inputs": prompt,
"parameters": {
"do_sample": kwargs["n"] > 1,
"best_of": kwargs["n"],
"details": kwargs["n"] > 1,
# "max_new_tokens": kwargs.get('max_tokens', kwargs.get('max_new_tokens', 75)),
# "stop": ["\n", "\n\n"],
**kwargs,
}
}
payload["parameters"] = openai_to_hf(**payload["parameters"])
payload["parameters"]["temperature"] = max(
0.1, payload["parameters"]["temperature"]
)
# print(payload['parameters'])
# response = requests.post(self.url + "/generate", json=payload, headers=self.headers)
response = send_hftgi_request_v01_wrapped(
f"{self.url}:{random.Random().choice(self.ports)}" + "/generate",
url=self.url,
ports=tuple(self.ports),
json=payload,
headers=self.headers,
**self.http_request_kwargs,
)
try:
json_response = response.json()
# completions = json_response["generated_text"]
completions = [json_response["generated_text"]]
if (
"details" in json_response
and "best_of_sequences" in json_response["details"]
):
completions += [
x["generated_text"]
for x in json_response["details"]["best_of_sequences"]
]
response = {"prompt": prompt, "choices": [{"text": c} for c in completions]}
return response
except Exception as e:
print("Failed to parse JSON response:", response.text)
raise Exception("Received invalid JSON response from server")
@CacheMemory.cache(ignore=['arg'])
def send_hftgi_request_v01(arg, url, ports, **kwargs):
return requests.post(arg, **kwargs)
# @functools.lru_cache(maxsize=None if cache_turn_on else 0)
@NotebookCacheMemory.cache(ignore=['arg'])
def send_hftgi_request_v01_wrapped(arg, url, ports, **kwargs):
return send_hftgi_request_v01(arg, url, ports, **kwargs)
@CacheMemory.cache
def send_hftgi_request_v00(arg, **kwargs):
return requests.post(arg, **kwargs)
class HFClientVLLM(HFModel):
def __init__(self, model, port, url="http://localhost", **kwargs):
super().__init__(model=model, is_client=True)
self.url = f"{url}:{port}"
self.headers = {"Content-Type": "application/json"}
def _generate(self, prompt, **kwargs):
kwargs = {**self.kwargs, **kwargs}
payload = {
"model": kwargs["model"],
"prompt": prompt,
"max_tokens": kwargs["max_tokens"],
"temperature": kwargs["temperature"],
}
response = send_hfvllm_request_v00(
f"{self.url}/v1/completions",
json=payload,
headers=self.headers,
)
try:
json_response = response.json()
completions = json_response["choices"]
response = {
"prompt": prompt,
"choices": [{"text": c["text"]} for c in completions],
}
return response
except Exception as e:
print("Failed to parse JSON response:", response.text)
raise Exception("Received invalid JSON response from server")
@CacheMemory.cache
def send_hfvllm_request_v00(arg, **kwargs):
return requests.post(arg, **kwargs)
class HFServerTGI:
def __init__(self, user_dir):
self.model_weights_dir = os.path.abspath(os.path.join(os.getcwd(), "text-generation-inference", user_dir))
if not os.path.exists(self.model_weights_dir):
os.makedirs(self.model_weights_dir)
def close_server(self, port):
process = subprocess.Popen(['docker', 'ps'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, _ = process.communicate()
print(stdout)
if stdout:
container_ids = stdout.decode().strip().split('\n')
container_ids = container_ids[1:]
for container_id in container_ids:
match = re.search(r'^([a-zA-Z0-9]+)', container_id)
if match:
container_id = match.group(1)
port_mapping = subprocess.check_output(['docker', 'port', container_id]).decode().strip()
if f'0.0.0.0:{port}' in port_mapping:
subprocess.run(['docker', 'stop', container_id])
def run_server(self, port, model_name=None, model_path=None, env_variable=None, gpus="all", num_shard=1, max_input_length=4000, max_total_tokens=4096, max_best_of=100):
self.close_server(port)
if model_path:
model_file_name = os.path.basename(model_path)
link_path = os.path.join(self.model_weights_dir, model_file_name)
shutil.copytree(model_path, link_path)
model_name = os.path.sep + os.path.basename(self.model_weights_dir) + os.path.sep + os.path.basename(model_path)
docker_command = f'docker run --gpus {gpus} --shm-size 1g -p {port}:80 -v {self.model_weights_dir}:{os.path.sep + os.path.basename(self.model_weights_dir)} -e {env_variable} ghcr.io/huggingface/text-generation-inference:1.1.0 --model-id {model_name} --num-shard {num_shard} --max-input-length {max_input_length} --max-total-tokens {max_total_tokens} --max-best-of {max_best_of}'
print(f"Connect Command: {docker_command}")
docker_process = subprocess.Popen(docker_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
connected = False
output = []
while True:
line = docker_process.stdout.readline()
if not line:
break
output.append(line.strip())
if 'Connected' in line:
connected = True
break
if not connected:
print("Could not connect to server. Error log:")
for line in output:
print(line)
docker_process.terminate()
docker_process.wait()
class Together(HFModel):
def __init__(self, model, **kwargs):
super().__init__(model=model, is_client=True)
self.session = requests.Session()
self.api_base = os.getenv("TOGETHER_API_BASE")
self.token = os.getenv("TOGETHER_API_KEY")
self.model = model
self.use_inst_template = False
if any(keyword in self.model.lower() for keyword in ["inst", "instruct"]):
self.use_inst_template = True
stop_default = "\n\n---"
self.kwargs = {
"temperature": 0.0,
"max_tokens": 512,
"top_p": 1,
"top_k": 20,
"repetition_penalty": 1,
"n": 1,
"stop": stop_default if "stop" not in kwargs else kwargs["stop"],
**kwargs
}
@backoff.on_exception(
backoff.expo,
ERRORS,
max_time=1000,
on_backoff=backoff_hdlr,
)
def _generate(self, prompt, use_chat_api=False, **kwargs):
url = f"{self.api_base}"
kwargs = {**self.kwargs, **kwargs}
stop = kwargs.get("stop")
temperature = kwargs.get("temperature")
max_tokens = kwargs.get("max_tokens", 150)
top_p = kwargs.get("top_p", 0.7)
top_k = kwargs.get("top_k", 50)
repetition_penalty = kwargs.get("repetition_penalty", 1)
prompt = f"[INST]{prompt}[/INST]" if self.use_inst_template else prompt
if use_chat_api:
url = f"{self.api_base}/chat/completions"
messages = [
{"role": "system", "content": "You are a helpful assistant. You must continue the user text directly without *any* additional interjections."},
{"role": "user", "content": prompt}
]
body = {
"model": self.model,
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens,
"top_p": top_p,
"top_k": top_k,
"repetition_penalty": repetition_penalty,
"stop": stop,
}
else:
body = {
"model": self.model,
"prompt": prompt,
"temperature": temperature,
"max_tokens": max_tokens,
"top_p": top_p,
"top_k": top_k,
"repetition_penalty": repetition_penalty,
"stop": stop,
}
headers = {"Authorization": f"Bearer {self.token}"}
try:
with self.session.post(url, headers=headers, json=body) as resp:
resp_json = resp.json()
if use_chat_api:
completions = [resp_json['output'].get('choices', [])[0].get('message', {}).get('content', "")]
else:
completions = [resp_json['output'].get('choices', [])[0].get('text', "")]
response = {"prompt": prompt, "choices": [{"text": c} for c in completions]}
return response
except Exception as e:
if resp_json:
print(f"resp_json:{resp_json}")
print(f"Failed to parse JSON response: {e}")
raise Exception("Received invalid JSON response from server")
class Anyscale(HFModel):
def __init__(self, model, **kwargs):
super().__init__(model=model, is_client=True)
self.session = requests.Session()
self.api_base = os.getenv("ANYSCALE_API_BASE")
self.token = os.getenv("ANYSCALE_API_KEY")
self.model = model
self.kwargs = {
"temperature": 0.0,
"n": 1,
**kwargs
}
def _generate(self, prompt, use_chat_api=False, **kwargs):
url = f"{self.api_base}/completions"
kwargs = {**self.kwargs, **kwargs}
temperature = kwargs.get("temperature")
max_tokens = kwargs.get("max_tokens", 150)
if use_chat_api:
url = f"{self.api_base}/chat/completions"
messages = [
{"role": "system", "content": "You are a helpful assistant. You must continue the user text directly without *any* additional interjections."},
{"role": "user", "content": prompt}
]
body = {
"model": self.model,
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens
}
else:
body = {
"model": self.model,
"prompt": f"[INST]{prompt}[/INST]",
"temperature": temperature,
"max_tokens": max_tokens
}
headers = {"Authorization": f"Bearer {self.token}"}
try:
completions = []
for i in range(kwargs.get('n', 1)):
with self.session.post(url, headers=headers, json=body) as resp:
resp_json = resp.json()
if use_chat_api:
completions.extend([resp_json.get('choices', [])[0].get('message', {}).get('content', "")])
else:
completions.extend([resp_json.get('choices', [])[0].get('text', "")])
response = {"prompt": prompt, "choices": [{"text": c} for c in completions]}
return response
except Exception as e:
print(f"Failed to parse JSON response: {e}")
raise Exception("Received invalid JSON response from server")
class ChatModuleClient(HFModel):
def __init__(self, model, model_path):
super().__init__(model=model, is_client=True)
from mlc_chat import ChatModule
from mlc_chat import ChatConfig
self.cm = ChatModule(
model=model, lib_path=model_path, chat_config=ChatConfig(conv_template="LM")
)
def _generate(self, prompt, **kwargs):
output = self.cm.generate(
prompt=prompt,
)
try:
completions = [{"text": output}]
response = {"prompt": prompt, "choices": completions}
return response
except Exception as e:
print("Failed to parse output:", response.text)
raise Exception("Received invalid output")
class HFClientSGLang(HFModel):
def __init__(self, model, port, url="http://localhost", **kwargs):
super().__init__(model=model, is_client=True)
self.url = f"{url}:{port}"
self.headers = {"Content-Type": "application/json"}
self.kwargs = {
"temperature": 0.01,
"max_tokens": 75,
"top_p": 0.97,
"n": 1,
"stop": ["\n", "\n\n"],
**kwargs,
}
def _generate(self, prompt, **kwargs):
kwargs = {**self.kwargs, **kwargs}
payload = {
"model": kwargs.get("model", "default"),
"prompt": prompt,
**kwargs,
}
response = send_hfsglang_request_v00(
f"{self.url}/v1/completions",
json=payload,
headers=self.headers,
)
try:
json_response = response.json()
completions = json_response["choices"]
response = {
"prompt": prompt,
"choices": [{"text": c["text"]} for c in completions],
}
return response
except Exception as e:
print("Failed to parse JSON response:", response.text)
raise Exception("Received invalid JSON response from server")
@CacheMemory.cache
def send_hfsglang_request_v00(arg, **kwargs):
return requests.post(arg, **kwargs)