Spaces:
Runtime error
Runtime error
""" | |
This file contains all the code which defines architectures and | |
architecture components. | |
""" | |
import chromadb | |
import json | |
import os | |
import regex as re | |
import requests | |
import shutil | |
import traceback | |
from abc import ABC, abstractmethod | |
from enum import Enum | |
from huggingface_hub import Repository | |
from queue import Queue | |
from threading import Thread, Timer | |
from time import time | |
from typing import List, Optional, Dict, Callable | |
from better_profanity import profanity | |
from src.common import config_dir, data_dir, hf_api_token, escape_dollars | |
class ArchitectureRequest: | |
""" | |
This class represents a request (chat query) from a user which can then be built up or | |
modified through the pipeline process. It also holds the response to the request which again | |
is a stack which can be modified through life. | |
""" | |
def __init__(self, query: str): | |
self._request: List[str] = [query] # Stack for the request text as it evolves down the pipeline | |
self._response: List[str] = [] # Stack for the response text as it evolves down the pipeline | |
self.early_exit: bool = False | |
self.early_exit_message: str = None | |
def request(self): | |
return self._request[-1] | |
def request(self, value: str): | |
self._request.append(value) | |
def response(self): | |
if len(self._response) > 0: | |
return self._response[-1] | |
return None | |
def response(self, value: str): | |
self._response.append(value) | |
def as_markdown(self) -> str: | |
""" | |
Returns a markdown representation for display / testing | |
:return: str - the markdown | |
""" | |
md = "- **Request evolution**" | |
for r in self._request: | |
md += f"\n - {r}" | |
md += "\n- **Response evolution**" | |
for r in self._response: | |
md += f"\n - {r}" | |
return escape_dollars(md) | |
def as_dict(self) -> Dict: | |
return {'request_evolution': self._request, 'response_evolution': self._response} | |
class ArchitectureTraceOutcome(Enum): | |
""" | |
Class representing the outcome of a component step in an architecture | |
""" | |
NONE = 0 | |
SUCCESS = 1 | |
EARLY_EXIT = 2 | |
EXCEPTION = 3 | |
class ArchitectureTraceStep: | |
""" | |
Class to hold the details of a single trace step | |
""" | |
def __init__(self, name: str): | |
self.name = name | |
self.start_ms = int(time() * 1000) | |
self.end_ms = None | |
self.outcome = ArchitectureTraceOutcome.NONE | |
self._exception: str = None | |
self.early_exit_message: str = None | |
def end(self, outcome: ArchitectureTraceOutcome): | |
self.end_ms = int(time() * 1000) | |
self.outcome = outcome | |
def exception(self) -> str: | |
return self._exception | |
def exception(self, value: Exception): | |
self._exception = f'{value}' # Hold any exception as a string in the trace | |
def as_markdown(self) -> str: | |
""" | |
Converts the trace to markdown for simple display purposes | |
:return: a string of markdown | |
""" | |
md = f"- **Step**: {self.name} \n" | |
md += f" - **Start**: {self.start_ms}; **End**: {self.end_ms} \n" | |
md += f" - **Elapsed time**: {self.end_ms - self.start_ms}ms \n" | |
outcome = "None" | |
if self.outcome == ArchitectureTraceOutcome.SUCCESS: | |
outcome = "Success" | |
elif self.outcome == ArchitectureTraceOutcome.EARLY_EXIT: | |
outcome = f"Early Exit ({self.early_exit_message})" | |
elif self.outcome == ArchitectureTraceOutcome.EXCEPTION: | |
outcome = f"Exception ({self._exception})" | |
md += f" - **Outcome**: {outcome}" | |
return escape_dollars(md) | |
def as_dict(self) -> Dict: | |
return { | |
'name': self.name, | |
'start_ms': self.start_ms, | |
'end_ms': self.end_ms, | |
'outcome': str(self.outcome), | |
'exception': "" if self._exception is None else f"{self._exception}", | |
'early_exit_message': "" if self.early_exit_message is None else self.early_exit_message | |
} | |
class ArchitectureTrace: | |
""" | |
This class represents the system instrumentation / trace for a request. It holds the name | |
for each component called, the start and end time of the component processing and the outcome | |
of the step. | |
""" | |
def __init__(self): | |
self.steps: List[ArchitectureTraceStep] = [] | |
def start_trace(self, name: str): | |
self.steps.append(ArchitectureTraceStep(name=name)) | |
def end_trace(self, outcome: ArchitectureTraceOutcome, early_exit_message: str = None): | |
assert len(self.steps) > 0 | |
assert self.steps[-1].outcome == ArchitectureTraceOutcome.NONE | |
self.steps[-1].end(outcome=outcome) | |
if early_exit_message is not None: | |
self.steps[-1].early_exit_message = early_exit_message | |
def as_markdown(self) -> str: | |
""" | |
Converts the trace to markdown for simple display purposes | |
:return: a string of markdown | |
""" | |
md = ' \n'.join([s.as_markdown() for s in self.steps]) | |
return md | |
def as_dict(self) -> Dict: | |
return {'steps': [s.as_dict() for s in self.steps]} | |
class ArchitectureComponent(ABC): | |
description = "Components should override a description" | |
def process_request(self, request: ArchitectureRequest) -> None: | |
""" | |
The principal method that concrete implementations of a component must implement. | |
They should signal anything to the pipeline through direct modification of the provided | |
request (i.e. amending the request text or response text, or setting the early_exit flag). | |
:param request: The request which is flowing down the pipeline | |
:return: None | |
""" | |
pass | |
def config_description(self) -> str: | |
""" | |
Optional method to override for providing a string of description in markdown format for | |
display purposes for the component | |
:return: a markdwon string (defaulting to empty in the base class) | |
""" | |
return "" | |
class LogWorker(Thread): | |
instance = None | |
architectures = None | |
save_repo = None | |
save_repo_load_error = False | |
save_repo_url = "https://huggingface.co/datasets/alfraser/llm-arch-trace" | |
trace_dir = "trace" | |
trace_file_name = "trace.json" | |
trace_file = os.path.join(trace_dir, trace_file_name) | |
queue = Queue() | |
commit_time = 5 # Number of seconds after which to commit with no activity | |
commit_after = 20 # Number of records after which to commit irrespective of time | |
commit_count = 0 # Current uncommitted records | |
commit_timer = None # The actual commit timer - we will schedule the commit on this | |
timeout_functions: List[Callable[[], None]] = [] # Callbacks which will be fired on timeout | |
def run(self): | |
while True: | |
arch_name, request, trace, trace_tags, trace_comment = LogWorker.queue.get() | |
if request is None: | |
for func in LogWorker.timeout_functions: | |
print(f"LogWorker commit running {func.__name__}") | |
try: | |
func() | |
except Exception as e: | |
print(f"Timeout func {func.__name__} had error {e}") | |
else: | |
if LogWorker.commit_timer is not None and LogWorker.commit_timer.is_alive(): | |
LogWorker.commit_timer.cancel() | |
LogWorker.commit_timer = None | |
try: | |
save_dict = { | |
'architecture': arch_name, | |
'request': request.as_dict(), | |
'trace': trace.as_dict(), | |
'test_tags': trace_tags, | |
'test_comment': trace_comment | |
} | |
LogWorker.append_and_save_data_as_json(save_dict) | |
LogWorker.commit_count += 1 | |
if LogWorker.commit_count >= LogWorker.commit_after: | |
LogWorker.commit_repo() | |
except Exception as err: | |
print(f"Request / trace save failed {err}") | |
LogWorker.commit_timer = Timer(LogWorker.commit_time, LogWorker.signal_commit) | |
LogWorker.commit_timer.start() | |
def append_and_save_data_as_json(cls, data: Dict): | |
print(f"LogWorker logging open record {LogWorker.commit_count + 1}") | |
if cls.save_repo is None and not cls.save_repo_load_error: | |
try: | |
hf_write_token = hf_api_token(write=True) | |
cls.save_repo = Repository(local_dir=cls.trace_dir, clone_from=cls.save_repo_url, token=hf_write_token) | |
except Exception as err: | |
cls.save_repo_load_error = True | |
print(f"Error connecting to the save repo {err} - persistence now disabled") | |
if cls.save_repo is not None: | |
with open(cls.trace_file, 'r') as f: | |
test_json = json.load(f) | |
test_json['tests'].append(data) | |
with open(cls.trace_file, 'w') as f: | |
json.dump(test_json, f, indent=2) | |
def commit_repo(cls): | |
if cls.commit_count > 0: | |
print(f"LogWorker committing {LogWorker.commit_count} open records") | |
cls.save_repo.push_to_hub() | |
LogWorker.commit_count = 0 | |
def signal_commit(cls): | |
# Signalling this back via the queue and not doing the work here as it would | |
# be executed on the Timer thread and may conflict with resources if the main | |
# LogWorker starts doing work concurrently. | |
print("LogWorker signalling commit based on time elapsed") | |
cls.queue.put((None, None, None, None, None)) | |
def write(cls, arch_name: str, request: ArchitectureRequest, trace: ArchitectureTrace, | |
trace_tags: List[str] = None, trace_comment: str = None): | |
trace_tags = [] if trace_tags is None else trace_tags | |
trace_comment = "" if trace_comment is None else trace_comment | |
cls.queue.put((arch_name, request, trace, trace_tags, trace_comment)) | |
# Instantiate and run worker on import | |
if LogWorker.instance is None: | |
LogWorker.instance = LogWorker() | |
LogWorker.daemon = True | |
LogWorker.instance.start() | |
LogWorker.timeout_functions.append(LogWorker.commit_repo) | |
class Architecture: | |
""" | |
An architecture is built as a callable pipeline of steps. An | |
ArchitectureRequest object is passed down the pipeline sequentially | |
to each component. A component can modify the request if needed, update the response | |
or signal an early exit. The Architecture framework also provides trace timing | |
and logging, plus exception handling so an individual request cannot | |
crash the system. | |
""" | |
architectures = None | |
save_repo = None | |
save_repo_load_error = False | |
save_repo_url = "https://huggingface.co/datasets/alfraser/llm-arch-trace" | |
trace_dir = "trace" | |
trace_file_name = "trace.json" | |
trace_file = os.path.join(trace_dir, trace_file_name) | |
def wipe_trace(cls, hf_write_token:str = None): | |
if os.path.exists(cls.trace_dir): | |
shutil.rmtree(cls.trace_dir) | |
try: | |
if hf_write_token is None: | |
hf_write_token = hf_api_token(write=True) | |
cls.save_repo = Repository(local_dir=cls.trace_dir, clone_from=cls.save_repo_url, token=hf_write_token) | |
test_json = {'tests': []} | |
with open(cls.trace_file, 'w') as f: | |
json.dump(test_json, f, indent=2) | |
cls.save_repo.push_to_hub() | |
except Exception as err: | |
cls.save_repo_load_error = True | |
print(f"Error connecting to the save repo {err} - persistence now disabled") | |
def get_trace_records(cls) -> List[Dict]: | |
if not os.path.isfile(cls.trace_file): | |
hf_write_token = hf_api_token(write=True) | |
try: | |
cls.save_repo = Repository(local_dir=cls.trace_dir, clone_from=cls.save_repo_url, token=hf_write_token) | |
except Exception as err: | |
cls.save_repo_load_error = True | |
print(f"Error connecting to the save repo {err} - persistence now disabled") | |
return [] | |
with open(cls.trace_file, 'r') as f: | |
test_json = json.load(f) | |
return test_json['tests'] | |
def load_architectures(cls, force_reload: bool = False) -> None: | |
""" | |
Class method to load the configuration file and try and set up architectures for each | |
config entry (a named sequence of components with optional setup params). | |
:param force_reload: A bool of whether to force a reload, defaults to False. | |
""" | |
if cls.architectures is None or force_reload: | |
config_file = os.path.join(config_dir, "architectures.json") | |
with open(config_file, "r") as f: | |
configs = json.load(f)['architectures'] | |
archs = [] | |
for c in configs: | |
arch_name = c['name'] | |
arch_description = c['description'] | |
arch_img = None | |
if 'img' in c: | |
arch_img = c['img'] | |
arch_comps = [] | |
for s in c['steps']: | |
component_class_name = s['class'] | |
component_init_params = {} | |
if 'params' in s: | |
component_init_params = s['params'] | |
arch_comps.append(globals()[component_class_name](**component_init_params)) | |
arch = Architecture(name=arch_name, description=arch_description, steps=arch_comps, img=arch_img) | |
archs.append(arch) | |
cls.architectures = archs | |
def get_architecture(cls, name: str): | |
""" | |
Lookup an architecture by name | |
:param name: The name of the architecture to look up | |
:return: The architecture object | |
""" | |
if cls.architectures is None: | |
cls.load_architectures() | |
for a in cls.architectures: | |
if a.name == name: | |
return a | |
raise ValueError(f"Could not find an architecture named {name}") | |
def __init__(self, | |
name: str, | |
description: str, | |
steps: List[ArchitectureComponent], | |
img: Optional[str] = None, | |
exception_text: str = "Sorry an internal technical error occurred.", | |
no_response_text: str = "Sorry I can't answer that."): | |
self.name = name | |
self.description = description | |
self.steps = steps | |
self.img = img | |
self.exception_text = exception_text | |
self.no_response_text = no_response_text | |
def __call__(self, request: ArchitectureRequest, trace_tags: List[str] = None, trace_comment: str = None) -> ArchitectureTrace: | |
""" | |
The main entry point to call the pipeline. Passes the request through each pipeline step | |
in sequence, allowing them to amend the request or early exit the processing. Also captures | |
exceptions and generates the trace, plus saves the request/response and the trace to a store | |
for analysis. | |
:param request: | |
:return: | |
""" | |
print(f'{self.name} processing query "{request.request}"') | |
trace = ArchitectureTrace() | |
for component in self.steps: | |
trace.start_trace(name=component.__class__.__name__) | |
try: | |
component.process_request(request) | |
if request.early_exit: | |
trace.end_trace(outcome=ArchitectureTraceOutcome.EARLY_EXIT, | |
early_exit_message=request.early_exit_message) | |
break | |
else: | |
trace.end_trace(outcome=ArchitectureTraceOutcome.SUCCESS) | |
except Exception as err: | |
trace.end_trace(outcome=ArchitectureTraceOutcome.EXCEPTION) | |
trace.steps[-1].exception = err | |
traceback.print_exc() | |
break | |
LogWorker.write(self.name, request, trace, trace_tags, trace_comment) | |
return trace | |
class InputRequestScreener(ArchitectureComponent): | |
description = "Simplistic input screener for demonstration. Screens inputs for profanity." | |
def process_request(self, request: ArchitectureRequest) -> None: | |
if profanity.contains_profanity(request.request): | |
request.response = "Sorry - I cannot answer this question. Please try and rephrase it." | |
request.early_exit = True | |
request.early_exit_message = "Profanity detected in request" | |
class OutputResponseScreener(ArchitectureComponent): | |
description = "Screens outputs for offensive responses." | |
def __init__(self): | |
self.api_token = hf_api_token() | |
self.endpoint_url = "https://yl89ru8gdr1wkbej.eu-west-1.aws.endpoints.huggingface.cloud" | |
def process_request(self, request: ArchitectureRequest) -> None: | |
system_prompt = "You are screening for offensive content. In a single word (yes or no), is the response offensive?" | |
headers = { | |
"Accept": "application/json", | |
"Authorization": f"Bearer {self.api_token}", | |
"Content-Type": "application/json" | |
} | |
query_input = f"[INST] <<SYS>> {system_prompt} <<SYS>> {request.response} [/INST] " | |
payload = { | |
"inputs": query_input, | |
"parameters": { | |
"temperature": 0.1, | |
"max_new_tokens": 10 | |
} | |
} | |
llm_response = requests.post(self.endpoint_url, headers=headers, json=payload) | |
generated_text = json.loads(llm_response.text)[0]['generated_text'].strip() | |
if generated_text[0:2].lower() != 'no': # Lean cautious even if the model fails to return yes/no | |
request.response = "Sorry - I cannot answer this question. Please try and rephrase it." | |
request.early_exit = True | |
class RetrievalAugmentor(ArchitectureComponent): | |
description = "Retrieves appropriate documents from the store and then augments the request." | |
def __init__(self, vector_store: str, doc_count: int = 5): | |
chroma_db = os.path.join(data_dir, 'vector_stores', f'{vector_store}_chroma') | |
self.vector_store = chroma_db | |
client = chromadb.PersistentClient(path=chroma_db) | |
self.collection = client.get_collection(name='products') | |
self.doc_count = doc_count | |
def process_request(self, request: ArchitectureRequest) -> None: | |
# Get the count nearest documents from the doc store | |
input_query = request.request | |
results = self.collection.query(query_texts=[input_query], n_results=self.doc_count) | |
documents = results['documents'][0] # Index 0 as we are always asking one question | |
# Update the request to include the retrieved documents | |
new_query = '{"background": [' | |
new_query += ', '.join([f'"{d}"' for d in documents]) | |
new_query += ']}\n\nQUESTION: ' | |
new_query += input_query | |
# Put the request back into the architecture request | |
request.request = new_query | |
def config_description(self) -> str: | |
""" | |
Custom config details as markdown | |
""" | |
desc = f"Vector Store: {self.vector_store}; " | |
desc += f"Max docs: {self.doc_count}" | |
return desc | |
class HFInferenceEndpoint(ArchitectureComponent): | |
""" | |
A concrete pipeline component which sends the user text to a given llama chat based | |
inference endpoint on HuggingFace | |
""" | |
def __init__(self, endpoint_url: str, model_name: str, system_prompt: str, max_new_tokens: int, | |
temperature: float = 1.0, prompt_style: str = "multi_line"): | |
self.endpoint_url: str = endpoint_url | |
self.prompt_style = prompt_style | |
self.model_name: str = model_name | |
self.system_prompt: str = system_prompt | |
self.max_new_tokens = max_new_tokens | |
self.api_token = hf_api_token() | |
self.temperature = temperature | |
def config_description(self) -> str: | |
""" | |
Custom config details as markdown | |
""" | |
desc = f"Model: {self.model_name}; " | |
desc += f"Endpoint: {self.endpoint_url}; " | |
desc += f"Max tokens: {self.max_new_tokens}; " | |
desc += f"Temperature: {self.temperature}; " | |
desc += f"System prompt: {self.system_prompt}" | |
return desc | |
def process_request(self, request: ArchitectureRequest) -> None: | |
""" | |
Main processing method for this function. Calls the HTTP service for the model | |
by port if provided or attempting to lookup by name, and then adds this to the | |
response element of the request. | |
""" | |
headers = { | |
"Accept": "application/json", | |
"Authorization": f"Bearer {self.api_token}", | |
"Content-Type": "application/json" | |
} | |
if self.prompt_style == "multi_line": | |
query_input = f"<s>[INST] <<SYS>>\n{self.system_prompt}\n<</SYS>>\n\n{request.request} [/INST] " | |
elif self.prompt_style == "multi_line_no_sys": | |
query_input = f"<s>[INST]\n{request.request} [/INST] " | |
elif self.prompt_style == "single_line_no_sys": | |
query_input = f"<s>[INST] {request.request} [/INST] " | |
elif self.prompt_style == "single_line": | |
query_input = f"<s>[INST] <<SYS>>\n{self.system_prompt}\n<</SYS>> {request.request} [/INST] " | |
elif self.prompt_style == "multi_line_with_roles": | |
query_input = f"<<SYS>>\n{self.system_prompt}\n<</SYS>>\n[INST]\nUser:{request.request}\n[/INST]\n\nAssistant:" | |
elif self.prompt_style == "raw": | |
# No formatting - used to just send things straight through from the front end | |
query_input = request.request | |
else: | |
raise ValueError(f"Config error - Unknown prompt style: {self.prompt_style}") | |
payload = { | |
"inputs": query_input, | |
"parameters": { | |
"temperature": self.temperature, | |
"max_new_tokens": self.max_new_tokens | |
} | |
} | |
llm_response = requests.post(self.endpoint_url, headers=headers, json=payload) | |
if llm_response.status_code == 200: | |
generated_text = llm_response.json()[0]['generated_text'].strip() | |
request.response = generated_text | |
elif llm_response.status_code == 502: | |
request.response = "Received 502 error from LLM service - service initialising, try again shortly" | |
else: | |
request.response = f"Received {llm_response.status_code} - {llm_response.text}" | |
class ResponseTrimmer(ArchitectureComponent): | |
""" | |
A concrete pipeline component which trims the response based on a regex match, | |
then uppercases the first character of what is left. | |
""" | |
description = "Trims the response based on a regex" | |
def __init__(self, regexes: List[str]): | |
quoted_regexes = [f'"{r}"' for r in regexes] | |
self.regex_display = f"[{', '.join(quoted_regexes)}]" | |
self.regexes = [re.compile(r, re.IGNORECASE) for r in regexes] | |
def process_request(self, request: ArchitectureRequest): | |
new_response = request.response | |
for regex in self.regexes: | |
new_response = regex.sub('', new_response) | |
new_response = new_response[:1].upper() + new_response[1:] | |
request.response = new_response | |
def config_description(self) -> str: | |
return f"Regexes: {self.regex_display}" | |
if __name__ == "__main__": | |
req = ArchitectureRequest("Testing") | |
a = Architecture.get_architecture("1. Baseline LLM") | |
a(req) | |
print("Hold") | |