Spaces:
Runtime error
Runtime error
""" | |
This file contains all the code which defines architectures and | |
architecture components. | |
""" | |
import chromadb | |
import json | |
import os | |
import regex as re | |
import traceback | |
from abc import ABC, abstractmethod | |
from enum import Enum | |
from time import time | |
from typing import List, Optional | |
from better_profanity import profanity | |
from src.common import config_dir, data_dir, hf_api_token, escape_dollars | |
from src.models import HFLlamaChatModel | |
class ArchitectureRequest: | |
""" | |
This class represents a request (chat query) from a user which can then be built up or | |
modified through the pipeline process. It also holds the response to the request which again | |
is a stack which can be modified through life. | |
""" | |
def __init__(self, query: str): | |
self._request: List[str] = [query] # Stack for the request text as it evolves down the pipeline | |
self._response: List[str] = [] # Stack for the response text as it evolves down the pipeline | |
self.early_exit: bool = False | |
self.early_exit_message: str = None | |
def request(self): | |
return self._request[-1] | |
def request(self, value: str): | |
self._request.append(value) | |
def response(self): | |
if len(self._response) > 0: | |
return self._response[-1] | |
return None | |
def response(self, value: str): | |
self._response.append(value) | |
def as_markdown(self) -> str: | |
""" | |
Returns a markdown representation for display / testing | |
:return: str - the markdown | |
""" | |
md = "- **Request evolution**" | |
for r in self._request: | |
md += f"\n - {r}" | |
md += "\n- **Response evolution**" | |
for r in self._response: | |
md += f"\n - {r}" | |
return escape_dollars(md) | |
class ArchitectureTraceOutcome(Enum): | |
""" | |
Class representing the outcome of a component step in an architecture | |
""" | |
NONE = 0 | |
SUCCESS = 1 | |
EARLY_EXIT = 2 | |
EXCEPTION = 3 | |
class ArchitectureTraceStep: | |
""" | |
Class to hold the details of a single trace step | |
""" | |
def __init__(self, name: str): | |
self.name = name | |
self.start_ms = int(time() * 1000) | |
self.end_ms = None | |
self.outcome = ArchitectureTraceOutcome.NONE | |
self._exception: str = None | |
self.early_exit_message: str = None | |
def end(self, outcome: ArchitectureTraceOutcome): | |
self.end_ms = int(time() * 1000) | |
self.outcome = outcome | |
def exception(self) -> str: | |
return self._exception | |
def exception(self, value: Exception): | |
self._exception = f'{value}' # Hold any exception as a string in the trace | |
def as_markdown(self) -> str: | |
""" | |
Converts the trace to markdown for simple display purposes | |
:return: a string of markdown | |
""" | |
md = f"- **Step**: {self.name} \n" | |
md += f" - **Start**: {self.start_ms}; **End**: {self.end_ms} \n" | |
md += f" - **Elapsed time**: {self.end_ms - self.start_ms}ms \n" | |
outcome = "None" | |
if self.outcome == ArchitectureTraceOutcome.SUCCESS: | |
outcome = "Success" | |
elif self.outcome == ArchitectureTraceOutcome.EARLY_EXIT: | |
outcome = f"Early Exit ({self.early_exit_message})" | |
elif self.outcome == ArchitectureTraceOutcome.EXCEPTION: | |
outcome = f"Exception ({self._exception})" | |
md += f" - **Outcome**: {outcome}" | |
return escape_dollars(md) | |
class ArchitectureTrace: | |
""" | |
This class represents the system instrumentation / trace for a request. It holds the name | |
for each component called, the start and end time of the component processing and the outcome | |
of the step. | |
""" | |
def __init__(self): | |
self.steps: List[ArchitectureTraceStep] = [] | |
def start_trace(self, name: str): | |
self.steps.append(ArchitectureTraceStep(name=name)) | |
def end_trace(self, outcome: ArchitectureTraceOutcome, early_exit_message: str = None): | |
assert len(self.steps) > 0 | |
assert self.steps[-1].outcome == ArchitectureTraceOutcome.NONE | |
self.steps[-1].end(outcome=outcome) | |
if early_exit_message is not None: | |
self.steps[-1].early_exit_message = early_exit_message | |
def as_markdown(self) -> str: | |
""" | |
Converts the trace to markdown for simple display purposes | |
:return: a string of markdown | |
""" | |
md = ' \n'.join([s.as_markdown() for s in self.steps]) | |
return md | |
class ArchitectureComponent(ABC): | |
description = "Components should override a description" | |
def process_request(self, request: ArchitectureRequest) -> None: | |
""" | |
The principle method that concrete implementations of a component must implement. | |
They should signal anything to the pipeline through direct modification of the provided | |
request (i.e. amending the request text or response text, or setting the early_exit flag). | |
:param request: The request which is flowing down the pipeline | |
:return: None | |
""" | |
pass | |
def config_description(self) -> str: | |
""" | |
Optional method to override for providing a string of description in markdown format for | |
display purposes for the component | |
:return: a markdwon string (defaulting to empty in the base class) | |
""" | |
return "" | |
class Architecture: | |
""" | |
An architecture is built as a callable pipeline of steps. An | |
ArchitectureRequest object is passed down the pipeline sequentially | |
to each component. A component can modify the request if needed, update the response | |
or signal an early exit. The Architecture framework also provides trace timing | |
and logging, plus exception handling so an individual request cannot | |
crash the system. | |
""" | |
architectures = None | |
def load_architectures(cls, force_reload: bool = False) -> None: | |
""" | |
Class method to load the configuration file and try and set up architectures for each | |
config entry (a named sequence of components with optional setup params). | |
:param force_reload: A bool of whether to force a reload, defaults to False. | |
""" | |
if cls.architectures is None or force_reload: | |
config_file = os.path.join(config_dir, "architectures.json") | |
with open(config_file, "r") as f: | |
configs = json.load(f)['architectures'] | |
archs = [] | |
for c in configs: | |
arch_name = c['name'] | |
arch_description = c['description'] | |
arch_img = None | |
if 'img' in c: | |
arch_img = c['img'] | |
arch_comps = [] | |
for s in c['steps']: | |
component_class_name = s['class'] | |
component_init_params = {} | |
if 'params' in s: | |
component_init_params = s['params'] | |
arch_comps.append(globals()[component_class_name](**component_init_params)) | |
arch = Architecture(name=arch_name, description=arch_description, steps=arch_comps, img=arch_img) | |
archs.append(arch) | |
cls.architectures = archs | |
def get_architecture(cls, name: str): | |
""" | |
Lookup an architecture by name | |
:param name: The name of the architecture to look up | |
:return: The architecture object | |
""" | |
if cls.architectures is None: | |
cls.load_architectures() | |
for a in cls.architectures: | |
if a.name == name: | |
return a | |
raise ValueError(f"Could not find an architecture named {name}") | |
def __init__(self, | |
name: str, | |
description: str, | |
steps: List[ArchitectureComponent], | |
img: Optional[str] = None, | |
exception_text: str = "Sorry an internal technical error occurred.", | |
no_response_text: str = "Sorry I can't answer that."): | |
self.name = name | |
self.description = description | |
self.steps = steps | |
self.img = img | |
self.exception_text = exception_text | |
self.no_response_text = no_response_text | |
def __call__(self, request: ArchitectureRequest) -> ArchitectureTrace: | |
""" | |
The main entry point to call the pipeline. Passes the request through each pipeline step | |
in sequence, allowing them to amend the request or early exit the processing. Also captures | |
exceptions and generates the trace, plus saves the request/response and the trace to a store | |
for analysis. | |
:param request: | |
:return: | |
""" | |
trace = ArchitectureTrace() | |
for component in self.steps: | |
trace.start_trace(name=component.__class__.__name__) | |
try: | |
component.process_request(request) | |
if request.early_exit: | |
trace.end_trace(outcome=ArchitectureTraceOutcome.EARLY_EXIT, | |
early_exit_message=request.early_exit_message) | |
break | |
else: | |
trace.end_trace(outcome=ArchitectureTraceOutcome.SUCCESS) | |
except Exception as err: | |
trace.end_trace(outcome=ArchitectureTraceOutcome.EXCEPTION) | |
trace.steps[-1].exception = err | |
traceback.print_exc() | |
break | |
# TODO - save the request / response | |
# TODO - save the trace | |
return trace | |
class InputRequestScreener(ArchitectureComponent): | |
description = "Simplistic input screener for demonstration. Screens inputs for profanity." | |
def process_request(self, request: ArchitectureRequest) -> None: | |
if profanity.contains_profanity(request.request): | |
request.response = "Sorry - I cannot answer this question. Please try and rephrase it." | |
request.early_exit = True | |
request.early_exit_message = "Profanity detected in request" | |
class OutputResponseScreener(ArchitectureComponent): | |
description = "Screens outputs for offensive responses." | |
def process_request(self, request: ArchitectureRequest) -> None: | |
system_prompt = "You are screening for offensive content. In a single word (yes or no), is the response offensive?" | |
llm = HFLlamaChatModel.for_model('meta-llama/Llama-2-7b-chat-hf') | |
if llm is None: | |
raise ValueError(f'Screener model "meta-llama/Llama-2-7b-chat-hf" not set up') | |
response = llm(request.response, system_prompt=system_prompt) | |
if response[0:2].lower() != 'no': # Lean cautious even if the model fails to return yes/no | |
request.response = "Sorry - I cannot answer this question. Please try and rephrase it." | |
request.early_exit = True | |
class RetrievalAugmentor(ArchitectureComponent): | |
description = "Retrieves appropriate documents from the store and then augments the request." | |
def __init__(self, vector_store: str, doc_count: int = 5): | |
chroma_db = os.path.join(data_dir, 'vector_stores', f'{vector_store}_chroma') | |
self.vector_store = chroma_db | |
client = chromadb.PersistentClient(path=chroma_db) | |
self.collection = client.get_collection(name='products') | |
self.doc_count = doc_count | |
def process_request(self, request: ArchitectureRequest) -> None: | |
# Get the count nearest documents from the doc store | |
input_query = request.request | |
results = self.collection.query(query_texts=[input_query], n_results=self.doc_count) | |
print(results) | |
documents = results['documents'][0] # Index 0 as we are always asking one question | |
# Update the request to include the retrieved documents | |
#new_query = f'QUESTION: {input_query}\n\n' | |
#new_query += '\n'.join([f'FACT: {d}' for d in documents]) | |
new_query = '{"background": [' | |
new_query += ', '.join([f'"{d}"' for d in documents]) | |
new_query += ']}\n\nQUESTION: ' | |
new_query += input_query | |
# Put the request back into the architecture request | |
request.request = new_query | |
def config_description(self) -> str: | |
""" | |
Custom config details as markdown | |
""" | |
desc = f"Vector Store: {self.vector_store}; " | |
desc += f"Max docs: {self.doc_count}" | |
return desc | |
class HFLlamaHttpRequestor(ArchitectureComponent): | |
""" | |
A concrete pipeline component which sends the user text to a given llama chat based | |
model on hugging face. | |
""" | |
description = "Passes the request to a model hosted on hugging face hub" | |
def __init__(self, model: str, system_prompt: str, max_tokens: int, temperature: float = 1.0): | |
self.model: str = model | |
self.system_prompt: str = system_prompt | |
self.max_tokens = max_tokens | |
self.api_token = hf_api_token() | |
self.temperature = temperature | |
def config_description(self) -> str: | |
""" | |
Custom config details as markdown | |
""" | |
desc = f"Model: {self.model}; " | |
desc += f"Max tokens: {self.max_tokens}; " | |
desc += f"Temperature: {self.temperature}; " | |
desc += f"System prompt: {self.system_prompt}" | |
return desc | |
def process_request(self, request: ArchitectureRequest) -> None: | |
""" | |
Main processing method for this function. Calls the HTTP service for the model | |
by port if provided or attempting to lookup by name, and then adds this to the | |
response element of the request. | |
""" | |
llm = HFLlamaChatModel.for_model(self.model) | |
if llm is None: | |
raise ValueError(f'No model {self.model} configured in the environment') | |
response = llm(request.request, system_prompt=self.system_prompt, max_new_tokens=self.max_tokens, temperature=self.temperature) | |
request.response = response | |
class ResponseTrimmer(ArchitectureComponent): | |
""" | |
A concrete pipeline component which trims the response based on a regex match, | |
then uppercases the first character of what is left. | |
""" | |
description = "Trims the response based on a regex" | |
def __init__(self, regexes: List[str]): | |
quoted_regexes = [f'"{r}"' for r in regexes] | |
self.regex_display = f"[{', '.join(quoted_regexes)}]" | |
self.regexes = [re.compile(r, re.IGNORECASE) for r in regexes] | |
def process_request(self, request: ArchitectureRequest): | |
new_response = request.response | |
for regex in self.regexes: | |
new_response = regex.sub('', new_response) | |
new_response = new_response[:1].upper() + new_response[1:] | |
request.response = new_response | |
def config_description(self) -> str: | |
return f"Regexes: {self.regex_display}" | |