File size: 18,241 Bytes
95bc5d9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
"""
Details : BardCoder Library is code genrator for bard. It is used to generate code from bard response.
its using Bard API to interact with bard and refine the results for coding purpose.
The main purpose of this is to integrate bard with any projects and make code generation easy.
Language : Python
Author : HeavenHM.
License : MIT
Date : 21-05-2023
"""

# import libraries
import json
import logging
import os
import json
from bardapi import Bard
import traceback
import subprocess
import time
from os import path
import lib.extensions_map as extensions_map
from lib.extensions_map import get_file_extesion
import inspect

class BardCoder:
    global bard
    global logger
    bard_init = False
    logs_enabled = False
    logs_file = "bardcoder.log"
    response_id, conversation_id, content, factuality_queries, text_query, code_choices, code_extension = None, None, None, None, None, None, None
    
    # Initial setup
    def __init__(self,api_key=None,timeout=10,enable_logs=False):
        try:
            BardCoder.write_log("Starting to initialize BardCoder.")
            # Dont initialize if api key is None.
            if api_key is None or api_key == "" or '.' not in api_key:
                BardCoder.write_log("BardCoder API key is missing or is invalid Skipping init")
                self.bard = None
                BardCoder.bard_init = False
                return None
            
            # Setting up the api key.
            BardCoder.write_log("Setting up the api key")
            if api_key and '.' in api_key:
                self.set_api_key(api_key)
            
            BardCoder.write_log("Setting up the prompt")
            # Setting up Bard from BardAPI.
            self.bard = Bard(timeout=timeout)  # Set timeout in seconds
            
            if not self.bard:
                BardCoder.write_log("BardCoder not initialized...Skipping init.")
                self.bard = None
                BardCoder.bard_init = False
                return None

            # Enable logs
            if enable_logs:
                self.enable_logs()
            
            BardCoder.write_log("Setting up the logger")
            # Setups the logging.
            self.logger = self.setup_logger(self.logs_file)
            
            BardCoder.bard_init = True
            BardCoder.write_log("BardCoder initialized successfully.")
            
        except Exception as e:
            self.add_log(str(e))
            stack_trace = traceback.format_exc()
            self.add_log(stack_trace)
            self.bard = None
    
    def write_log(data:str=None):
        if data is None:
            raise ValueError("Data cannot be None")
        with open("bardcoder.log",'a') as f:
            # Get the name of the calling function
            caller_name = inspect.stack()[1][3]            
            f.write(f"{time.strftime('%d-%m-%Y %H:%M:%S')} {caller_name}: {data}\n")

    # Set the api key
    def set_api_key(self, api_key):
        if api_key:
            os.environ['_BARD_API_KEY'] = api_key
    
    # Set the prompt for bard
    def set_prompt(self, prompt):
        try:
            # Get the response from the prompt.
            response = self.get_response(prompt)
            if response:
                data = json.dumps(response, indent=4)

                if data:
                    # Getting the data from the response.
                    json_data = json.loads(data)
                    if json_data:
                        self.content = json_data['content']
                        self.add_log("Content: " + self.content)

                        # Saving the response to file.
                        self.add_log("Saving response to file.")
                        self.save_file("response/response.json",json.dumps(response, indent=4))
                        self.save_file("response/content.md", self.content)

                        # Getting the content from the response.
                        self.conversation_id = json_data['conversation_id']
                        if self.conversation_id:
                            self.add_log(f"Conversation ID: {self.conversation_id}")

                        # Getting the conversation ID from the response.
                        self.response_id = json_data['response_id']
                        if self.response_id:
                            self.add_log(f"Response ID: {self.response_id}")

                        # Get the factuality queries from the response.
                        self.factuality_queries = json_data['factualityQueries']
                        if self.factuality_queries:
                            for factualityQuery in self.factuality_queries:
                                self.add_log(f"Factuality Query: {factualityQuery}")
                            # Get the links from the response.
                            links = self.get_links()
                            self.add_log(f"Links: {links}")

                        # Get the text query from the response.
                        self.text_query = json_data['textQuery']
                        if self.text_query:
                            self.add_log(f"Text Query: {self.text_query}")

                        # Getting the code choices from the response.
                        self.code_choices = json_data['choices']
                        self.add_log(f"Code Choices: {self.code_choices}")
                        if self.code_choices:
                            for code_choice in self.code_choices:
                                self.add_log(f"Code Choice: {code_choice}")

                        # Mark end of init. - Success
                        self.add_log("Success.")
                        return True,"Success."
                    else:
                        self.add_log("Json data is empty.")
                        return False,"Json data is empty."
                else:
                    self.add_log("Data is empty.")
                    return False,"Data is empty."
            else:
                self.add_log("Response is empty.\nCheck if the API key is valid.")
                return False,"Response is empty.\nCheck if the API key is valid."

        except Exception as e:
            # show stack trace
            stack_trace = traceback.format_exc()
            self.add_log(stack_trace)
            return False,str(e)

    # get the response from bard
    def get_response(self, prompt: str):
        if not prompt:
            self.add_log("Prompt is empty.")
            return ""
        if self.bard:
            self.add_log("Getting response from bard.")
            response = self.bard.get_answer(prompt)
        else:
            self.add_log("Bard is not initialized.")
            return None

        # get response from bard
        return response

    # get multiple responses from bard
    def get_code_choice(self, index):
        if index < len(self.code_choices):
            choice_content = self.code_choices[index]['content'][0]
            start_index = choice_content.find('```') + 3
            end_index = choice_content.find('```', start_index)
            if start_index != -1 and end_index != -1:
                extracted_data = choice_content[start_index:end_index]
                result = extracted_data.strip()
                # Remove the code language identifier
                result = result[result.find('\n') + 1:]
                return result
            else:
                return None
        else:
            return None

    # setting the logger
    def setup_logger(self, filename: str, level=logging.INFO):
        # Remove existing handlers from the root logger
        for handler in logging.root.handlers[:]:
            logging.root.removeHandler(handler)

        # Set up a file handler to write logs to a file
        file_handler = logging.FileHandler(filename)
        formatter = logging.Formatter('%(asctime)s - %(message)s', datefmt='%d-%m-%y %H:%M:%S')
        file_handler.setFormatter(formatter)
        logging.root.addHandler(file_handler)
        logging.root.setLevel(level)

        return logging.getLogger(__name__)

    # get the code from bard response
    def get_code(self):
        try:
            if self.content:
                self.add_log("Getting code from content.")
                
                data = self.content
                start_index = data.find("```")
                if start_index == -1:
                    return None
                start_index += 3
                end_index = data.find("```", start_index)
                if end_index == -1:
                    return None
                extracted_data = data[start_index:end_index]
                result = extracted_data.strip()
                
                # Remove the code language identifier
                result = result[result.find('\n') + 1:]
                self.add_log(f"Code: {result}")
                return result
        except Exception as e:
            self.add_log(str(e))
            stack_trace = traceback.format_exc()
            self.add_log(stack_trace)
            
    def save_code(self, filename="code.txt"):
        code = self.get_code()
        self.code_extenstion = '.' + self.get_code_extension()
        if code:
            code = code.replace("\\n", "\n").replace("\\t", "\t")
            self.add_log(f"Saving code with filename: {filename} and extension: {self.code_extenstion} and code: {code}")

            # Add extension to filename
            extension = extensions_map.get_file_extesion(self.code_extenstion) or self.code_extenstion
            filename = filename + extension

            with open(filename, 'w') as f:
                f.write(code)
                self.add_log(f"{filename} saved.")
            return filename

    def save_code(self, filename="code.txt", code='self.add_log("Hello World")'):
        self.add_log(f"Saving code with filename: {filename}")
        extension = self.get_code_extension()
        if extension:
            self.code_extenstion = '.' + extension
            #code = self.get_code()
            if code:
                code = code.replace("\\n", "\n").replace("\\t", "\t")
                self.add_log(f"Saving code with filename: {filename} and extension: {self.code_extenstion} and code: {code}")

                # Add extension to filename
                extension = extensions_map.get_file_extesion(self.code_extenstion) or self.code_extenstion
                filename = filename + extension

                with open(filename, 'w') as f:
                    f.write(code)
                    self.add_log(f"{filename} saved.")
                return filename

    # save multiple codes from bard response
    def save_code_choices(self, filename):
        self.add_log(f"Saving code choices with filename: {filename}")
        extension = self.get_code_extension()
        if extension:
            self.code_extension = '.' + extension
            self.code_extension = extensions_map.get_file_extesion(self.code_extenstion) or self.code_extenstion

            for index, choice in enumerate(self.code_choices):
                choice_content = self.get_code_choice(index)
                self.add_log(f"Enumurated Choice content: {choice}")
                self.save_file("codes/"+filename+'_'+str(index+1) +
                               self.code_extension, choice_content)
                
    # execute code from bard response using locally installed compilers.
    # a support for online compilers will be added soon.
    def execute_code(self, filename):
        if filename:
            self.add_log(f"Running {filename}")
            output = self.run_code_exec(filename)
            self.add_log(f"Output: {output}")
            return output
        return None
    
    # execute code from bard response using locally installed compilers.
    def run_code_exec(self, filename: str, debug: bool = False, cpp_version: str = "c++17"):
        compiler_map = {
            ".c": ("gcc", "c"),
            ".cpp": ("g++", "c++"),
            ".java": ("java", "java"),
            ".go": ("go run", "go"),
            ".cs": ("csc", "csharp"),
            ".swift": ("swift", "swift"),
            ".py": ("python3", "python"),
            ".js": ("node", "javascript"),
            ".rs": ("rustc", "rust")
        }

        _, extension = os.path.splitext(filename)
        self.add_log(f"Extension: {extension}")
        if extension not in compiler_map:
            self.add_log(f"Extension {extension} not supported.")
            return

        compiler, language = compiler_map[extension]
        self.add_log(f"Compiler: {compiler}")

        if language == "c++" and cpp_version.startswith("c++"):
            version = cpp_version[3:]
            if version in ["17", "14", "11", "0x"]:
                cpp_version = f"c++{version}"
                self.add_log(f":C++ Version: {cpp_version}")

        if debug:
            if language == "c++":
                 self.add_log(f"Compiling {filename} with {compiler} (C++ {cpp_version})...")
            else:
                 self.add_log(f"Compiling {filename} with {compiler}...")

        output = ""
        try:
            if language == "c":
                output = subprocess.check_output([compiler, filename, "-o", f"{filename[:-len(extension)]}"], stderr=subprocess.STDOUT).decode('utf-8')
            elif language == "c++":
                output = subprocess.check_output([compiler, filename, f"-std={cpp_version}", "-o", f"{filename[:-len(extension)]}"], stderr=subprocess.STDOUT).decode('utf-8')
            elif language == "java":
                output = subprocess.check_output([compiler, filename], stderr=subprocess.STDOUT).decode('utf-8')
            elif language in ["go", "swift", "python", "javascript","java"]:
                output = subprocess.check_output([compiler, filename], stderr=subprocess.STDOUT).decode('utf-8')
            elif language == "csharp":
                output = subprocess.check_output([compiler, f"/out:{filename[:-len(extension)]}.exe", filename], stderr=subprocess.STDOUT).decode('utf-8')
            elif language == "rust":
                output = subprocess.check_output([compiler, filename], stderr=subprocess.STDOUT).decode('utf-8')
            else:
                self.add_log("Error: Unsupported file type")
                return
            self.add_log(f"Output: {output}")
        except subprocess.CalledProcessError as e:
            output += e.output.decode('utf-8')
            self.add_log(f"Error: {output}")

        if debug:
            self.add_log(f"Running {filename[:-len(extension)]}...")

        # Checking further output for syntax ./path/filename to run the executable
        try:
            # run C# with mono command. like this mono ./path/filename.exe
            if language == "csharp":
                output_file_exec = f"{filename[:-len(extension)]}.exe"
                output += subprocess.check_output(['mono',output_file_exec], stderr=subprocess.STDOUT).decode('utf-8')
            
            else:
                output_file_exec = f"./{filename[:-len(extension)]}"
                # checking if file exists output_file_exec
                if os.path.isfile(output_file_exec):
                    output += subprocess.check_output([output_file_exec], stderr=subprocess.STDOUT).decode('utf-8')
        except (subprocess.CalledProcessError, Exception) as e:
            if isinstance(e, subprocess.CalledProcessError):
                output += '\n' + e.output.decode('utf-8')
            else:
                output += '\n' + str(e)
            self.add_log(f"Error: {output}")
        

        if debug:
            self.add_log(f"Finished running {filename[:-len(extension)]}")

        self.add_log(f"Output: {output}")
        return output

    # execute all the code choices from bard response using locally installed compilers.
    def execute_code_choices(self):
        self.add_log("Running codes")
        codes_choices_output = list()
        for filename in os.listdir('codes'):
            filepath = path.join('codes', filename)
            self.add_log(f"Running {filepath}")
            output = self.execute_code(filepath)
            if output:
                codes_choices_output.append(output)
            time.sleep(5)
        return codes_choices_output

    # get the code extension from bard response - automatically detects the language from bard response.
    def get_code_extension(self):
        try:
            code_content = self.content
            if code_content and not code_content in "can't help":
                self.code_extension = code_content.split('```')[1].split('\n')[0]
                self.add_log(f"Code extension: {self.code_extension}")
                return self.code_extension
        except Exception as e:
            stack_trace = traceback.format_exc()
            self.add_log(stack_trace)
        return None

    # get the links from bard response
    def get_links(self):
        data = self.factuality_queries
        links = []
        self.add_log("Data: " + str(data))
        if data is None or len(data) == 0:
            self.add_log("Data is None.")
            return links
        try:
            for inner_list in data[0]:
                link = inner_list[2][0]
                if link:
                    links.append(link)
        except Exception as e:
            stack_trace = traceback.format_exc()
            self.add_log(stack_trace)
            return links
        self.add_log("Links: " + str(links))
        return links

    def save_file(self, filename, data):
        with open(filename, 'w') as f:
            f.write(data)

    def read_file(self, filename):
        with open(filename, 'r') as f:
            return f.read()

    def add_log(self, log, level=logging.INFO):
        log_msg = inspect.stack()[1][3] + ": " + log
        if self.logs_enabled:
            self.logger.log(level, log_msg)
        else:
            self.logger = self.setup_logger(self.logs_file)
            self.logger.log(level, log_msg)

    def enable_logs(self):
        self.logs_enabled = True