Spaces:
Runtime error
Runtime error
""" | |
Details : BardCoder Library is code genrator for bard. It is used to generate code from bard response. | |
its using Bard API to interact with bard and refine the results for coding purpose. | |
The main purpose of this is to integrate bard with any projects and make code generation easy. | |
Language : Python | |
Author : HeavenHM. | |
License : MIT | |
Date : 21-05-2023 | |
""" | |
# import libraries | |
import json | |
import logging | |
import os | |
import json | |
from bardapi import Bard | |
import traceback | |
import subprocess | |
import time | |
from os import path | |
import lib.extensions_map as extensions_map | |
from lib.extensions_map import get_file_extesion | |
import inspect | |
class BardCoder: | |
global bard | |
global logger | |
bard_init = False | |
logs_enabled = False | |
logs_file = "bardcoder.log" | |
response_id, conversation_id, content, factuality_queries, text_query, code_choices, code_extension = None, None, None, None, None, None, None | |
# Initial setup | |
def __init__(self,api_key=None,timeout=10,enable_logs=False): | |
try: | |
BardCoder.write_log("Starting to initialize BardCoder.") | |
# Dont initialize if api key is None. | |
if api_key is None or api_key == "" or '.' not in api_key: | |
BardCoder.write_log("BardCoder API key is missing or is invalid Skipping init") | |
self.bard = None | |
BardCoder.bard_init = False | |
return None | |
# Setting up the api key. | |
BardCoder.write_log("Setting up the api key") | |
if api_key and '.' in api_key: | |
self.set_api_key(api_key) | |
BardCoder.write_log("Setting up the prompt") | |
# Setting up Bard from BardAPI. | |
self.bard = Bard(timeout=timeout) # Set timeout in seconds | |
if not self.bard: | |
BardCoder.write_log("BardCoder not initialized...Skipping init.") | |
self.bard = None | |
BardCoder.bard_init = False | |
return None | |
# Enable logs | |
if enable_logs: | |
self.enable_logs() | |
BardCoder.write_log("Setting up the logger") | |
# Setups the logging. | |
self.logger = self.setup_logger(self.logs_file) | |
BardCoder.bard_init = True | |
BardCoder.write_log("BardCoder initialized successfully.") | |
except Exception as e: | |
self.add_log(str(e)) | |
stack_trace = traceback.format_exc() | |
self.add_log(stack_trace) | |
self.bard = None | |
def write_log(data:str=None): | |
if data is None: | |
raise ValueError("Data cannot be None") | |
with open("bardcoder.log",'a') as f: | |
# Get the name of the calling function | |
caller_name = inspect.stack()[1][3] | |
f.write(f"{time.strftime('%d-%m-%Y %H:%M:%S')} {caller_name}: {data}\n") | |
# Set the api key | |
def set_api_key(self, api_key): | |
if api_key: | |
os.environ['_BARD_API_KEY'] = api_key | |
# Set the prompt for bard | |
def set_prompt(self, prompt): | |
try: | |
# Get the response from the prompt. | |
response = self.get_response(prompt) | |
if response: | |
data = json.dumps(response, indent=4) | |
if data: | |
# Getting the data from the response. | |
json_data = json.loads(data) | |
if json_data: | |
self.content = json_data['content'] | |
self.add_log("Content: " + self.content) | |
# Saving the response to file. | |
self.add_log("Saving response to file.") | |
self.save_file("response/response.json",json.dumps(response, indent=4)) | |
self.save_file("response/content.md", self.content) | |
# Getting the content from the response. | |
self.conversation_id = json_data['conversation_id'] | |
if self.conversation_id: | |
self.add_log(f"Conversation ID: {self.conversation_id}") | |
# Getting the conversation ID from the response. | |
self.response_id = json_data['response_id'] | |
if self.response_id: | |
self.add_log(f"Response ID: {self.response_id}") | |
# Get the factuality queries from the response. | |
self.factuality_queries = json_data['factualityQueries'] | |
if self.factuality_queries: | |
for factualityQuery in self.factuality_queries: | |
self.add_log(f"Factuality Query: {factualityQuery}") | |
# Get the links from the response. | |
links = self.get_links() | |
self.add_log(f"Links: {links}") | |
# Get the text query from the response. | |
self.text_query = json_data['textQuery'] | |
if self.text_query: | |
self.add_log(f"Text Query: {self.text_query}") | |
# Getting the code choices from the response. | |
self.code_choices = json_data['choices'] | |
self.add_log(f"Code Choices: {self.code_choices}") | |
if self.code_choices: | |
for code_choice in self.code_choices: | |
self.add_log(f"Code Choice: {code_choice}") | |
# Mark end of init. - Success | |
self.add_log("Success.") | |
return True,"Success." | |
else: | |
self.add_log("Json data is empty.") | |
return False,"Json data is empty." | |
else: | |
self.add_log("Data is empty.") | |
return False,"Data is empty." | |
else: | |
self.add_log("Response is empty.\nCheck if the API key is valid.") | |
return False,"Response is empty.\nCheck if the API key is valid." | |
except Exception as e: | |
# show stack trace | |
stack_trace = traceback.format_exc() | |
self.add_log(stack_trace) | |
return False,str(e) | |
# get the response from bard | |
def get_response(self, prompt: str): | |
if not prompt: | |
self.add_log("Prompt is empty.") | |
return "" | |
if self.bard: | |
self.add_log("Getting response from bard.") | |
response = self.bard.get_answer(prompt) | |
else: | |
self.add_log("Bard is not initialized.") | |
return None | |
# get response from bard | |
return response | |
# get multiple responses from bard | |
def get_code_choice(self, index): | |
if index < len(self.code_choices): | |
choice_content = self.code_choices[index]['content'][0] | |
start_index = choice_content.find('```') + 3 | |
end_index = choice_content.find('```', start_index) | |
if start_index != -1 and end_index != -1: | |
extracted_data = choice_content[start_index:end_index] | |
result = extracted_data.strip() | |
# Remove the code language identifier | |
result = result[result.find('\n') + 1:] | |
return result | |
else: | |
return None | |
else: | |
return None | |
# setting the logger | |
def setup_logger(self, filename: str, level=logging.INFO): | |
# Remove existing handlers from the root logger | |
for handler in logging.root.handlers[:]: | |
logging.root.removeHandler(handler) | |
# Set up a file handler to write logs to a file | |
file_handler = logging.FileHandler(filename) | |
formatter = logging.Formatter('%(asctime)s - %(message)s', datefmt='%d-%m-%y %H:%M:%S') | |
file_handler.setFormatter(formatter) | |
logging.root.addHandler(file_handler) | |
logging.root.setLevel(level) | |
return logging.getLogger(__name__) | |
# get the code from bard response | |
def get_code(self): | |
try: | |
if self.content: | |
self.add_log("Getting code from content.") | |
data = self.content | |
start_index = data.find("```") | |
if start_index == -1: | |
return None | |
start_index += 3 | |
end_index = data.find("```", start_index) | |
if end_index == -1: | |
return None | |
extracted_data = data[start_index:end_index] | |
result = extracted_data.strip() | |
# Remove the code language identifier | |
result = result[result.find('\n') + 1:] | |
self.add_log(f"Code: {result}") | |
return result | |
except Exception as e: | |
self.add_log(str(e)) | |
stack_trace = traceback.format_exc() | |
self.add_log(stack_trace) | |
def save_code(self, filename="code.txt"): | |
code = self.get_code() | |
self.code_extenstion = '.' + self.get_code_extension() | |
if code: | |
code = code.replace("\\n", "\n").replace("\\t", "\t") | |
self.add_log(f"Saving code with filename: {filename} and extension: {self.code_extenstion} and code: {code}") | |
# Add extension to filename | |
extension = extensions_map.get_file_extesion(self.code_extenstion) or self.code_extenstion | |
filename = filename + extension | |
with open(filename, 'w') as f: | |
f.write(code) | |
self.add_log(f"{filename} saved.") | |
return filename | |
def save_code(self, filename="code.txt", code='self.add_log("Hello World")'): | |
self.add_log(f"Saving code with filename: {filename}") | |
extension = self.get_code_extension() | |
if extension: | |
self.code_extenstion = '.' + extension | |
#code = self.get_code() | |
if code: | |
code = code.replace("\\n", "\n").replace("\\t", "\t") | |
self.add_log(f"Saving code with filename: {filename} and extension: {self.code_extenstion} and code: {code}") | |
# Add extension to filename | |
extension = extensions_map.get_file_extesion(self.code_extenstion) or self.code_extenstion | |
filename = filename + extension | |
with open(filename, 'w') as f: | |
f.write(code) | |
self.add_log(f"{filename} saved.") | |
return filename | |
# save multiple codes from bard response | |
def save_code_choices(self, filename): | |
self.add_log(f"Saving code choices with filename: {filename}") | |
extension = self.get_code_extension() | |
if extension: | |
self.code_extension = '.' + extension | |
self.code_extension = extensions_map.get_file_extesion(self.code_extenstion) or self.code_extenstion | |
for index, choice in enumerate(self.code_choices): | |
choice_content = self.get_code_choice(index) | |
self.add_log(f"Enumurated Choice content: {choice}") | |
self.save_file("codes/"+filename+'_'+str(index+1) + | |
self.code_extension, choice_content) | |
# execute code from bard response using locally installed compilers. | |
# a support for online compilers will be added soon. | |
def execute_code(self, filename): | |
if filename: | |
self.add_log(f"Running {filename}") | |
output = self.run_code_exec(filename) | |
self.add_log(f"Output: {output}") | |
return output | |
return None | |
# execute code from bard response using locally installed compilers. | |
def run_code_exec(self, filename: str, debug: bool = False, cpp_version: str = "c++17"): | |
compiler_map = { | |
".c": ("gcc", "c"), | |
".cpp": ("g++", "c++"), | |
".java": ("java", "java"), | |
".go": ("go run", "go"), | |
".cs": ("csc", "csharp"), | |
".swift": ("swift", "swift"), | |
".py": ("python3", "python"), | |
".js": ("node", "javascript"), | |
".rs": ("rustc", "rust") | |
} | |
_, extension = os.path.splitext(filename) | |
self.add_log(f"Extension: {extension}") | |
if extension not in compiler_map: | |
self.add_log(f"Extension {extension} not supported.") | |
return | |
compiler, language = compiler_map[extension] | |
self.add_log(f"Compiler: {compiler}") | |
if language == "c++" and cpp_version.startswith("c++"): | |
version = cpp_version[3:] | |
if version in ["17", "14", "11", "0x"]: | |
cpp_version = f"c++{version}" | |
self.add_log(f":C++ Version: {cpp_version}") | |
if debug: | |
if language == "c++": | |
self.add_log(f"Compiling {filename} with {compiler} (C++ {cpp_version})...") | |
else: | |
self.add_log(f"Compiling {filename} with {compiler}...") | |
output = "" | |
try: | |
if language == "c": | |
output = subprocess.check_output([compiler, filename, "-o", f"{filename[:-len(extension)]}"], stderr=subprocess.STDOUT).decode('utf-8') | |
elif language == "c++": | |
output = subprocess.check_output([compiler, filename, f"-std={cpp_version}", "-o", f"{filename[:-len(extension)]}"], stderr=subprocess.STDOUT).decode('utf-8') | |
elif language == "java": | |
output = subprocess.check_output([compiler, filename], stderr=subprocess.STDOUT).decode('utf-8') | |
elif language in ["go", "swift", "python", "javascript","java"]: | |
output = subprocess.check_output([compiler, filename], stderr=subprocess.STDOUT).decode('utf-8') | |
elif language == "csharp": | |
output = subprocess.check_output([compiler, f"/out:{filename[:-len(extension)]}.exe", filename], stderr=subprocess.STDOUT).decode('utf-8') | |
elif language == "rust": | |
output = subprocess.check_output([compiler, filename], stderr=subprocess.STDOUT).decode('utf-8') | |
else: | |
self.add_log("Error: Unsupported file type") | |
return | |
self.add_log(f"Output: {output}") | |
except subprocess.CalledProcessError as e: | |
output += e.output.decode('utf-8') | |
self.add_log(f"Error: {output}") | |
if debug: | |
self.add_log(f"Running {filename[:-len(extension)]}...") | |
# Checking further output for syntax ./path/filename to run the executable | |
try: | |
# run C# with mono command. like this mono ./path/filename.exe | |
if language == "csharp": | |
output_file_exec = f"{filename[:-len(extension)]}.exe" | |
output += subprocess.check_output(['mono',output_file_exec], stderr=subprocess.STDOUT).decode('utf-8') | |
else: | |
output_file_exec = f"./{filename[:-len(extension)]}" | |
# checking if file exists output_file_exec | |
if os.path.isfile(output_file_exec): | |
output += subprocess.check_output([output_file_exec], stderr=subprocess.STDOUT).decode('utf-8') | |
except (subprocess.CalledProcessError, Exception) as e: | |
if isinstance(e, subprocess.CalledProcessError): | |
output += '\n' + e.output.decode('utf-8') | |
else: | |
output += '\n' + str(e) | |
self.add_log(f"Error: {output}") | |
if debug: | |
self.add_log(f"Finished running {filename[:-len(extension)]}") | |
self.add_log(f"Output: {output}") | |
return output | |
# execute all the code choices from bard response using locally installed compilers. | |
def execute_code_choices(self): | |
self.add_log("Running codes") | |
codes_choices_output = list() | |
for filename in os.listdir('codes'): | |
filepath = path.join('codes', filename) | |
self.add_log(f"Running {filepath}") | |
output = self.execute_code(filepath) | |
if output: | |
codes_choices_output.append(output) | |
time.sleep(5) | |
return codes_choices_output | |
# get the code extension from bard response - automatically detects the language from bard response. | |
def get_code_extension(self): | |
try: | |
code_content = self.content | |
if code_content and not code_content in "can't help": | |
self.code_extension = code_content.split('```')[1].split('\n')[0] | |
self.add_log(f"Code extension: {self.code_extension}") | |
return self.code_extension | |
except Exception as e: | |
stack_trace = traceback.format_exc() | |
self.add_log(stack_trace) | |
return None | |
# get the links from bard response | |
def get_links(self): | |
data = self.factuality_queries | |
links = [] | |
self.add_log("Data: " + str(data)) | |
if data is None or len(data) == 0: | |
self.add_log("Data is None.") | |
return links | |
try: | |
for inner_list in data[0]: | |
link = inner_list[2][0] | |
if link: | |
links.append(link) | |
except Exception as e: | |
stack_trace = traceback.format_exc() | |
self.add_log(stack_trace) | |
return links | |
self.add_log("Links: " + str(links)) | |
return links | |
def save_file(self, filename, data): | |
with open(filename, 'w') as f: | |
f.write(data) | |
def read_file(self, filename): | |
with open(filename, 'r') as f: | |
return f.read() | |
def add_log(self, log, level=logging.INFO): | |
log_msg = inspect.stack()[1][3] + ": " + log | |
if self.logs_enabled: | |
self.logger.log(level, log_msg) | |
else: | |
self.logger = self.setup_logger(self.logs_file) | |
self.logger.log(level, log_msg) | |
def enable_logs(self): | |
self.logs_enabled = True | |