kateforsberg commited on
Commit
5d4cc46
·
1 Parent(s): d22663e

updated for truth with uw-quiz-generator

Browse files
uw_programmatic/base_machine.py CHANGED
@@ -15,7 +15,6 @@ from griptape.configs import Defaults
15
  from griptape.configs.drivers import (
16
  OpenAiDriversConfig,
17
  )
18
- from griptape.configs.logging import TruncateLoggingFilter
19
  from griptape.drivers import (
20
  GriptapeCloudVectorStoreDriver,
21
  LocalStructureRunDriver,
@@ -38,16 +37,12 @@ from griptape.rules import Rule, Ruleset
38
  from griptape.structures import Agent, Workflow
39
  from griptape.tasks import CodeExecutionTask, StructureRunTask, ToolTask
40
  from griptape.tools import RagTool
 
41
  from statemachine import State, StateMachine
42
  from statemachine.factory import StateMachineMetaclass
43
 
44
- from griptape_statemachine.parsers.uw_config_parser import UWConfigParser
45
-
46
  logger = logging.getLogger(__name__)
47
- logger.setLevel(logging.ERROR)
48
- logger.addFilter(TruncateLoggingFilter(max_log_length=100))
49
- logger2 = logging.getLogger(Defaults.logging_config.logger_name).setLevel(logging.ERROR)
50
- #logging.getLogger("griptape").setLevel(logging.ERROR)
51
 
52
  if TYPE_CHECKING:
53
  from griptape.structures import Structure
@@ -56,6 +51,7 @@ if TYPE_CHECKING:
56
 
57
  load_dotenv()
58
 
 
59
  Defaults.drivers_config = OpenAiDriversConfig(
60
  prompt_driver=OpenAiChatPromptDriver(model="gpt-4o", max_tokens=4096)
61
  )
@@ -91,8 +87,11 @@ class UWBaseMachine(StateMachine):
91
  self.page_range: tuple = ()
92
  self.question_number: int = 0
93
  self.taxonomy: list = []
 
94
  self.give_up_count = 0
95
  self.current_question_count = 0
 
 
96
 
97
  self.state_status: dict[str, bool] = {}
98
 
@@ -286,42 +285,30 @@ class UWBaseMachine(StateMachine):
286
  for ruleset_config in ruleset_configs
287
  ]
288
 
289
- def get_prompt_by_structure(self, structure_id: str) -> str | None:
290
- try:
291
- state_structure_config = self._current_state_config.get(
292
- "structures", {}
293
- ).get(structure_id, {})
294
- global_structure_config = self.config["structures"][structure_id]
295
- except KeyError:
296
- return None
297
- prompt_id = None
298
- if "prompt_id" in global_structure_config:
299
- prompt_id = global_structure_config["prompt_id"]
300
- elif "prompt_id" in state_structure_config:
301
- prompt_id = state_structure_config["prompt_id"]
 
302
  else:
303
- return None
304
- return self.config["prompts"][prompt_id]["prompt"]
305
 
306
- def get_prompt_by_id(self, prompt_id: str) -> str | None:
307
- prompt_config = self.config["prompts"]
308
- if prompt_id in prompt_config:
309
- return prompt_config[prompt_id]["prompt"]
310
- return None
311
-
312
- # ALL METHODS RELATING TO THE WORKFLOW AND PIPELINE
313
- def end_workflow(self, task: CodeExecutionTask) -> ListArtifact:
314
- parent_outputs = task.parent_outputs
315
- questions = []
316
- for outputs in parent_outputs.values():
317
- if outputs.type == "InfoArtifact":
318
- continue
319
- questions.append(outputs)
320
- return ListArtifact(questions)
321
 
 
322
  def get_questions_workflow(self) -> Workflow:
323
  workflow = Workflow(id="create_question_workflow")
324
- # How many questions still need to be created
325
  for _ in range(self.question_number - len(self.question_list)):
326
  task = StructureRunTask(
327
  structure_run_driver=LocalStructureRunDriver(
@@ -329,57 +316,23 @@ class UWBaseMachine(StateMachine):
329
  ),
330
  child_ids=["end_task"],
331
  )
 
332
  workflow.add_task(task)
333
  end_task = CodeExecutionTask(id="end_task", on_run=self.end_workflow)
334
  workflow.add_task(end_task)
335
  return workflow
336
 
337
- def single_question_last_task(self, task: CodeExecutionTask) -> TextArtifact:
338
- parent_outputs = task.parent_outputs
339
- wrong_answers = parent_outputs["wrong_answers"].value # Output is a list
340
- wrong_answers = wrong_answers.split("\n")
341
- question_and_answer = parent_outputs["get_question"].value # Output is a json
342
- try:
343
- question_and_answer = json.loads(question_and_answer)
344
- except:
345
- question_and_answer = question_and_answer.split("\n")[1:]
346
- question_and_answer = "".join(question_and_answer)
347
- question_and_answer = json.loads(question_and_answer)
348
- inputs = task.input.value.split(",")
349
- question = {
350
- "Question": question_and_answer["Question"],
351
- "Answer": question_and_answer["Answer"],
352
- "Wrong Answers": wrong_answers,
353
- "Page": inputs[0],
354
- "Taxonomy": inputs[1],
355
- }
356
- return TextArtifact(question)
357
-
358
- def get_question_for_wrong_answers(self, task: CodeExecutionTask) -> TextArtifact:
359
- parent_outputs = task.parent_outputs
360
- question = parent_outputs["get_question"].value
361
- question = json.loads(question)["Question"]
362
- return TextArtifact(question)
363
-
364
- def get_separated_answer_for_wrong_answers(
365
- self, task: CodeExecutionTask
366
- ) -> TextArtifact:
367
  parent_outputs = task.parent_outputs
368
- answer = parent_outputs["get_question"].value
369
- print(answer)
370
- answer = json.loads(answer)["Answer"]
371
- return TextArtifact(answer)
372
-
373
- def make_rag_structure(
374
- self, vector_store: GriptapeCloudVectorStoreDriver
375
- ) -> Structure:
376
- if vector_store:
377
- tool = self.build_rag_tool(self.build_rag_engine(vector_store))
378
- use_rag_task = ToolTask(tool=tool)
379
- return Agent(tasks=[use_rag_task])
380
- errormsg = "No Vector Store"
381
- raise ValueError(errormsg)
382
 
 
383
  def get_single_question(self) -> Workflow:
384
  question_generator = Workflow(id="single_question")
385
  taxonomy = random.choice(self.taxonomy)
@@ -443,42 +396,65 @@ class UWBaseMachine(StateMachine):
443
  )
444
  return question_generator
445
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
446
  def get_vector_store_id_from_page(
447
  self,
448
  ) -> tuple[str, GriptapeCloudVectorStoreDriver]:
449
- base_url = "https://cloud.griptape.ai/api/"
450
- kb_url = f"{base_url}/knowledge-bases"
451
- headers = {"Authorization": f"Bearer {os.getenv('GT_CLOUD_API_KEY')}"}
452
- # TODO: This needs to change when I have my own bucket. Right now, I'm doing the 10 most recently made KBs
453
- response = requests.get(url=kb_url, headers=headers)
454
- response.raise_for_status()
455
- if response.status_code == 200:
456
- data = response.json()
457
- possible_kbs = {}
458
- for kb in data["knowledge_bases"]:
459
- name = kb["name"]
460
- if "KB_section" not in name:
461
- continue
462
- page_nums = name.split("p")[1:]
463
- start_page = int(page_nums[0].split("-")[0])
464
- end_page = int(page_nums[1])
465
- if end_page <= self.page_range[1] and start_page >= self.page_range[0]:
466
- possible_kbs[kb["knowledge_base_id"]] = f"{start_page}-{end_page}"
467
- kb_id = random.choice(list(possible_kbs.keys()))
468
- page_value = possible_kbs[kb_id] # TODO: This won't help at all actually
469
- return page_value, GriptapeCloudVectorStoreDriver(
470
- api_key=os.getenv("GT_CLOUD_API_KEY", ""),
471
- knowledge_base_id=kb_id,
472
- )
473
- else:
474
- raise ValueError(response.status_code)
475
-
476
- def get_taxonomy_vs(self) -> GriptapeCloudVectorStoreDriver:
477
- return GriptapeCloudVectorStoreDriver(
478
  api_key=os.getenv("GT_CLOUD_API_KEY", ""),
479
- knowledge_base_id="2c3a6f19-51a8-43c3-8445-c7fbe06bf460",
480
  )
481
 
 
482
  def build_rag_engine(
483
  self, vector_store_driver: GriptapeCloudVectorStoreDriver
484
  ) -> RagEngine:
@@ -500,3 +476,13 @@ class UWBaseMachine(StateMachine):
500
  description="Contains information about the textbook. Use it ONLY for context.",
501
  rag_engine=engine,
502
  )
 
 
 
 
 
 
 
 
 
 
 
15
  from griptape.configs.drivers import (
16
  OpenAiDriversConfig,
17
  )
 
18
  from griptape.drivers import (
19
  GriptapeCloudVectorStoreDriver,
20
  LocalStructureRunDriver,
 
37
  from griptape.structures import Agent, Workflow
38
  from griptape.tasks import CodeExecutionTask, StructureRunTask, ToolTask
39
  from griptape.tools import RagTool
40
+ from parsers import UWConfigParser
41
  from statemachine import State, StateMachine
42
  from statemachine.factory import StateMachineMetaclass
43
 
 
 
44
  logger = logging.getLogger(__name__)
45
+ logging.getLogger("griptape").setLevel(logging.ERROR)
 
 
 
46
 
47
  if TYPE_CHECKING:
48
  from griptape.structures import Structure
 
51
 
52
  load_dotenv()
53
 
54
+ # Sets max tokens and OpenAI as the driver.
55
  Defaults.drivers_config = OpenAiDriversConfig(
56
  prompt_driver=OpenAiChatPromptDriver(model="gpt-4o", max_tokens=4096)
57
  )
 
87
  self.page_range: tuple = ()
88
  self.question_number: int = 0
89
  self.taxonomy: list = []
90
+ # To track give up
91
  self.give_up_count = 0
92
  self.current_question_count = 0
93
+ # To keep vector stores on track
94
+ self.kb_ids = {}
95
 
96
  self.state_status: dict[str, bool] = {}
97
 
 
285
  for ruleset_config in ruleset_configs
286
  ]
287
 
288
+ def retrieve_vector_stores(self) -> None:
289
+ base_url = "https://cloud.griptape.ai/api/"
290
+ kb_url = f"{base_url}/knowledge-bases"
291
+ headers = {"Authorization": f"Bearer {os.getenv('GT_CLOUD_API_KEY')}"}
292
+ response = requests.get(url=kb_url, headers=headers)
293
+ response.raise_for_status()
294
+ all_kbs = {}
295
+ if response.status_code == 200:
296
+ data = response.json()
297
+ for kb in data["knowledge_bases"]:
298
+ name = kb["name"]
299
+ kb_id = kb["knowledge_base_id"]
300
+ if "KB_section" in name:
301
+ all_kbs[name] = kb_id
302
  else:
303
+ raise ValueError(response.status_code)
304
+ self.kb_ids = all_kbs
305
 
306
+ # ALL METHODS RELATING TO THE WORKFLOW AND PIPELINE ARE BELOW THIS LINE
 
 
 
 
 
 
 
 
 
 
 
 
 
 
307
 
308
+ # This is the overarching workflow. Creates a workflow with get_single_question x amount of times.
309
  def get_questions_workflow(self) -> Workflow:
310
  workflow = Workflow(id="create_question_workflow")
311
+ # How many questions still need to be created?
312
  for _ in range(self.question_number - len(self.question_list)):
313
  task = StructureRunTask(
314
  structure_run_driver=LocalStructureRunDriver(
 
316
  ),
317
  child_ids=["end_task"],
318
  )
319
+ # Create X amount of workflows to run for X amount of questions needed.
320
  workflow.add_task(task)
321
  end_task = CodeExecutionTask(id="end_task", on_run=self.end_workflow)
322
  workflow.add_task(end_task)
323
  return workflow
324
 
325
+ # Ends the get_questions_workflow. Compiles all workflow outputs into one output.
326
+ def end_workflow(self, task: CodeExecutionTask) -> ListArtifact:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
327
  parent_outputs = task.parent_outputs
328
+ questions = []
329
+ for outputs in parent_outputs.values():
330
+ if outputs.type == "InfoArtifact":
331
+ continue
332
+ questions.append(outputs)
333
+ return ListArtifact(questions)
 
 
 
 
 
 
 
 
334
 
335
+ # Generates one workflow to create a single question.
336
  def get_single_question(self) -> Workflow:
337
  question_generator = Workflow(id="single_question")
338
  taxonomy = random.choice(self.taxonomy)
 
396
  )
397
  return question_generator
398
 
399
+ # Task to separate the Question into a string
400
+ def get_question_for_wrong_answers(self, task: CodeExecutionTask) -> TextArtifact:
401
+ parent_outputs = task.parent_outputs
402
+ question = parent_outputs["get_question"].value
403
+ question = json.loads(question)["Question"]
404
+ return TextArtifact(question)
405
+
406
+ # Task to separate the Answer into a string
407
+ def get_separated_answer_for_wrong_answers(
408
+ self, task: CodeExecutionTask
409
+ ) -> TextArtifact:
410
+ parent_outputs = task.parent_outputs
411
+ answer = parent_outputs["get_question"].value
412
+ print(answer)
413
+ answer = json.loads(answer)["Answer"]
414
+ return TextArtifact(answer)
415
+
416
+ # Combines all the outputs into one dictionary that represents the question
417
+ def single_question_last_task(self, task: CodeExecutionTask) -> TextArtifact:
418
+ parent_outputs = task.parent_outputs
419
+ wrong_answers = parent_outputs["wrong_answers"].value # Output is a list
420
+ wrong_answers = wrong_answers.split("\n")
421
+ question_and_answer = parent_outputs["get_question"].value # Output is a json
422
+ try:
423
+ question_and_answer = json.loads(question_and_answer)
424
+ except:
425
+ question_and_answer = question_and_answer.split("\n")[1:]
426
+ question_and_answer = "".join(question_and_answer)
427
+ question_and_answer = json.loads(question_and_answer)
428
+ inputs = task.input.value.split(",")
429
+ question = {
430
+ "Question": question_and_answer["Question"],
431
+ "Answer": question_and_answer["Answer"],
432
+ "Wrong Answers": wrong_answers,
433
+ "Page": inputs[0],
434
+ "Taxonomy": inputs[1],
435
+ }
436
+ return TextArtifact(question)
437
+
438
+ # These are helper methods
439
+ # Picks the KB from the dictionary
440
  def get_vector_store_id_from_page(
441
  self,
442
  ) -> tuple[str, GriptapeCloudVectorStoreDriver]:
443
+ possible_kbs = {}
444
+ for name, kb_id in self.kb_ids.items():
445
+ page_nums = name.split("p")[1:]
446
+ start_page = int(page_nums[0].split("-")[0])
447
+ end_page = int(page_nums[1])
448
+ if end_page <= self.page_range[1] and start_page >= self.page_range[0]:
449
+ possible_kbs[kb_id] = f"{start_page}-{end_page}"
450
+ kb_id = random.choice(list(possible_kbs.keys()))
451
+ page_value = possible_kbs[kb_id]
452
+ return page_value, GriptapeCloudVectorStoreDriver(
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
453
  api_key=os.getenv("GT_CLOUD_API_KEY", ""),
454
+ knowledge_base_id=kb_id,
455
  )
456
 
457
+ # Uses this and all below to build the Rag Tool to get information from the KB
458
  def build_rag_engine(
459
  self, vector_store_driver: GriptapeCloudVectorStoreDriver
460
  ) -> RagEngine:
 
476
  description="Contains information about the textbook. Use it ONLY for context.",
477
  rag_engine=engine,
478
  )
479
+
480
+ def make_rag_structure(
481
+ self, vector_store: GriptapeCloudVectorStoreDriver
482
+ ) -> Structure:
483
+ if vector_store:
484
+ tool = self.build_rag_tool(self.build_rag_engine(vector_store))
485
+ use_rag_task = ToolTask(tool=tool)
486
+ return Agent(tasks=[use_rag_task])
487
+ errormsg = "No Vector Store"
488
+ raise ValueError(errormsg)
uw_programmatic/question_pipeline.py DELETED
@@ -1,300 +0,0 @@
1
- from __future__ import annotations
2
- import ast
3
- import json
4
- import os
5
- import random
6
- import logging
7
-
8
- import requests
9
- from dotenv import load_dotenv
10
- from griptape.artifacts import ListArtifact, TextArtifact
11
- from griptape.configs import Defaults
12
- from griptape.configs.drivers import OpenAiDriversConfig
13
- from griptape.drivers import (
14
- LocalStructureRunDriver,
15
- OpenAiChatPromptDriver,
16
- GriptapeCloudVectorStoreDriver,
17
- )
18
- from griptape.artifacts import ListArtifact, TextArtifact
19
- from griptape.rules import Ruleset, Rule
20
-
21
- import json
22
- import requests
23
- import random
24
- import os
25
- from dotenv import load_dotenv
26
-
27
- from griptape.engines.rag import RagEngine
28
- from griptape.engines.rag.modules import (
29
- VectorStoreRetrievalRagModule,
30
- TextChunksResponseRagModule,
31
- )
32
- from griptape.engines.rag.stages import ResponseRagStage, RetrievalRagStage
33
- from griptape.tools import RagTool
34
- from griptape.configs.logging import TruncateLoggingFilter
35
-
36
- from griptape_statemachine.parsers.uw_csv_parser import CsvParser
37
-
38
- load_dotenv()
39
-
40
- # openai default config pass in a new openai driver
41
- Defaults.drivers_config = OpenAiDriversConfig(
42
- prompt_driver=OpenAiChatPromptDriver(model="gpt-4o", max_tokens=4096)
43
- )
44
- # logger = logging.getLogger(Defaults.logging_config.logger_name)
45
- # logger.setLevel(logging.ERROR)
46
- # logger.addFilter(TruncateLoggingFilter(max_log_length=5000))
47
-
48
-
49
- # ALL METHODS RELATING TO THE WORKFLOW AND PIPELINE
50
- def end_workflow(task: CodeExecutionTask) -> ListArtifact:
51
- parent_outputs = task.parent_outputs
52
- questions = []
53
- for output in parent_outputs.values():
54
- output = output.value
55
- try:
56
- output = ast.literal_eval(output)
57
- question = {output["Question"]: output}
58
- questions.append(TextArtifact(question))
59
- except SyntaxError:
60
- pass
61
- return ListArtifact(questions)
62
-
63
-
64
- def get_questions_workflow() -> Workflow:
65
- workflow = Workflow(id="create_question_workflow")
66
- # How many questions still need to be created
67
- for _ in range(10):
68
- task = StructureRunTask(
69
- driver=LocalStructureRunDriver(create_structure=get_single_question),
70
- child_ids=["end_task"],
71
- )
72
- workflow.add_task(task)
73
- end_task = CodeExecutionTask(id="end_task", on_run=end_workflow)
74
- workflow.add_task(end_task)
75
- return workflow
76
-
77
-
78
- def single_question_last_task(task: CodeExecutionTask) -> TextArtifact:
79
- parent_outputs = task.parent_outputs
80
- print(f"PARENT OUTPUTS ARE: {parent_outputs}")
81
- wrong_answers = parent_outputs["wrong_answers"].value # Output is a list
82
- wrong_answers = wrong_answers.split("\n")
83
- question_and_answer = parent_outputs["get_question"].value # Output is a json
84
- question_and_answer = json.loads(question_and_answer)
85
- inputs = task.input.value.split(",")
86
- question = {
87
- "Question": question_and_answer["Question"],
88
- "Answer": question_and_answer["Answer"],
89
- "Wrong Answers": wrong_answers,
90
- "Page": int(inputs[0]),
91
- "Taxonomy": inputs[1],
92
- }
93
- return TextArtifact(question)
94
-
95
-
96
- def get_question_for_wrong_answers(task: CodeExecutionTask) -> TextArtifact:
97
- parent_outputs = task.parent_outputs
98
- question = parent_outputs["get_question"].value
99
- print(question)
100
- question = json.loads(question)["Question"]
101
- return TextArtifact(question)
102
-
103
-
104
- def get_single_question() -> Workflow:
105
- question_generator = Workflow()
106
- page_number = random.choice(list(range(1, 9)))
107
- taxonomy = random.choice(["Knowledge", "Comprehension", "Application"])
108
- taxonomyprompt = {
109
- "Knowledge": "Generate a quiz question based ONLY on this information: {{parent_outputs['information_task']}}, then write the answer to the question. The interrogative verb for the question should be one of 'define', 'list', 'state', 'identify', or 'label'.",
110
- "Comprehension": "Generate a quiz question based ONLY on this information: {{parent_outputs['information_task']}}, then write the answer to the question. The interrogative verb for the question should be one of 'explain', 'predict', 'interpret', 'infer', 'summarize', 'convert', or 'give an example of x'.",
111
- "Application": "Generate a quiz question based ONLY on this information: {{parent_outputs['information_task']}}, then write the answer to the question. The structure of the question should be one of 'How could x be used to y?' or 'How would you show/make use of/modify/demonstrate/solve/apply x to conditions y?'",
112
- }
113
- # Get KBs and select it, assign it to the structure or create the structure right here.
114
- # Rules for subject matter expert: return only a json with question and answer as keys.
115
- generate_q_task = StructureRunTask(
116
- id="get_question",
117
- input=taxonomyprompt[taxonomy],
118
- driver=LocalStructureRunDriver(
119
- create_structure=lambda: get_structure("subject_matter_expert", page_number)
120
- ),
121
- )
122
-
123
- get_question_code_task = CodeExecutionTask(
124
- id="get_only_question",
125
- on_run=get_question_for_wrong_answers,
126
- parent_ids=["get_question"],
127
- child_ids=["wrong_answers"],
128
- )
129
- # This will use the same KB as the previous task
130
- generate_wrong_answers = StructureRunTask(
131
- id="wrong_answers",
132
- input="""Write and return three incorrect answers for this question: {{parent_outputs['get_only_question']}} with this context: {{parent_outputs['information_task']}}""",
133
- structure_run_driver=LocalStructureRunDriver(
134
- create_structure=lambda: get_structure("wrong_answers_generator")
135
- ),
136
- parent_ids=["get_only_question"],
137
- )
138
- compile_task = CodeExecutionTask(
139
- id="compile_task",
140
- input=f"{page_number}, {taxonomy})",
141
- on_run=single_question_last_task,
142
- parent_ids=["wrong_answers", "get_question"],
143
- )
144
- question_generator.add_tasks(
145
- generate_q_task,
146
- get_question_code_task,
147
- generate_wrong_answers,
148
- compile_task,
149
- )
150
- return question_generator
151
-
152
-
153
- def get_structure(structure_id: str, page_number=0) -> Structure:
154
- match structure_id:
155
- case "subject_matter_expert":
156
- rulesets = Ruleset(
157
- name="specific_question_creator",
158
- rules=[
159
- Rule(
160
- "Return ONLY a json with 'Question' and 'Answer' as keys. No markdown, no commentary, no code, no backticks."
161
- ),
162
- Rule(
163
- "Query to knowledge base should always be 'find information for quiz question'"
164
- ),
165
- Rule("Use ONLY information from your knowledge base"),
166
- Rule(
167
- "Question should be a question based on the knowledge base. Answer should be from knowledge base."
168
- ),
169
- Rule(
170
- "The answer to the question should be short, but should not omit important information."
171
- ),
172
- Rule("Answer length should be 10 words maximum, 5 words minimum"),
173
- ],
174
- )
175
- structure = Agent(
176
- id="subject_matter_expert",
177
- prompt_driver=OpenAiChatPromptDriver(model="gpt-4o"),
178
- rulesets=[rulesets],
179
- tools=[tool],
180
- )
181
- case "taxonomy_expert":
182
- rulesets = Ruleset(
183
- name="KB Rules",
184
- rules=[
185
- Rule(
186
- "Use only your knowledge base. Do not make up any additional information."
187
- ),
188
- Rule("Maximum 10 words."),
189
- Rule(
190
- "Return information an AI chatbot could use to write a question on a subject."
191
- ),
192
- ],
193
- )
194
- kb_driver = get_taxonomy_vs()
195
- tool = build_rag_tool(build_rag_engine(kb_driver))
196
- structure = Agent(
197
- id="taxonomy_expert",
198
- prompt_driver=OpenAiChatPromptDriver(model="gpt-4o"),
199
- tools=[tool],
200
- )
201
- case "wrong_answers_generator":
202
- rulesets = Ruleset(
203
- name="incorrect_answers_creator",
204
- rules=[
205
- Rule(
206
- "Return ONLY a list of 3 incorrect answers. No markdown, no commentary, no backticks."
207
- ),
208
- Rule(
209
- "All incorrect answers should be different, but plausible answers to the question."
210
- ),
211
- Rule(
212
- "Incorrect answers may reference material from the knowledge base, but must not be correct answers to the question"
213
- ),
214
- Rule(
215
- "Length of incorrect answers should be 10 words max, 5 words minimum"
216
- ),
217
- ],
218
- )
219
- kb_driver = get_vector_store_id_from_page(page_number)
220
- tool = build_rag_tool(build_rag_engine(kb_driver))
221
- structure = Agent(
222
- id="wrong_answers_generator",
223
- prompt_driver=OpenAiChatPromptDriver(model="gpt-4o"),
224
- rulesets=[rulesets],
225
- tools=[tool],
226
- )
227
- case _:
228
- structure = Agent(prompt_driver=OpenAiChatPromptDriver(model="gpt-4o"))
229
- return structure
230
-
231
-
232
- def get_vector_store_id_from_page(page: int) -> GriptapeCloudVectorStoreDriver | None:
233
- base_url = "https://cloud.griptape.ai/api/"
234
- kb_url = f"{base_url}/knowledge-bases"
235
- headers = {"Authorization": f"Bearer {os.getenv('GT_CLOUD_API_KEY')}"}
236
- # TODO: This needs to change when I have my own bucket. Right now, I'm doing the 10 most recently made KBs
237
- response = requests.get(url=kb_url, headers=headers)
238
- response = requests.get(
239
- url=kb_url,
240
- headers=headers,
241
- )
242
- response.raise_for_status()
243
- if response.status_code == 200:
244
- data = response.json()
245
- for kb in data["knowledge_bases"]:
246
- name = kb["name"]
247
- if "KB_section" not in name:
248
- continue
249
- page_nums = name.split("pg")[1].split("-")
250
- start_page = int(page_nums[0])
251
- end_page = int(page_nums[1])
252
- if end_page <= 40 and start_page >= 1:
253
- possible_kbs[kb["knowledge_base_id"]] = f"{start_page}-{end_page}"
254
- kb_id = random.choice(list(possible_kbs.keys()))
255
- page_value = possible_kbs[kb_id]
256
- return page_value, GriptapeCloudVectorStoreDriver(
257
- api_key=os.getenv("GT_CLOUD_API_KEY", ""),
258
- knowledge_base_id=kb_id,
259
- )
260
- else:
261
- raise ValueError(response.status_code)
262
- return None
263
-
264
-
265
- def get_taxonomy_vs() -> GriptapeCloudVectorStoreDriver:
266
- return GriptapeCloudVectorStoreDriver(
267
- api_key=os.getenv("GT_CLOUD_API_KEY", ""),
268
- knowledge_base_id="2c3a6f19-51a8-43c3-8445-c7fbe06bf460",
269
- )
270
-
271
-
272
- def build_rag_engine(vector_store_driver) -> RagEngine:
273
- return RagEngine(
274
- retrieval_stage=RetrievalRagStage(
275
- retrieval_modules=[
276
- VectorStoreRetrievalRagModule(
277
- vector_store_driver=vector_store_driver,
278
- query_params={
279
- "count": 100,
280
- },
281
- )
282
- ],
283
- ),
284
- response_stage=ResponseRagStage(
285
- response_modules=[TextChunksResponseRagModule()]
286
- ),
287
- )
288
-
289
-
290
- def build_rag_tool(engine) -> RagTool:
291
- return RagTool(
292
- description="Contains information about the textbook. Use it to answer any related questions.",
293
- rag_engine=engine,
294
- )
295
-
296
-
297
- if __name__ == "__main__":
298
- # workflow = get_questions_workflow()
299
- # workflow.run()
300
- CsvParser("uw_programmatic").csv_parser()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
uw_programmatic/uw_machine.py CHANGED
@@ -24,8 +24,11 @@ class UWMachine(UWBaseMachine):
24
  """Starts the machine."""
25
  # Clear input history.
26
  # Clear csv file
 
27
  self.send("enter_first_state")
28
 
 
 
29
  def on_event_gather_parameters(self, event_: dict) -> None:
30
  event_source = event_["type"]
31
  event_value = event_["value"]
@@ -42,21 +45,23 @@ class UWMachine(UWBaseMachine):
42
  err_msg = f"Unexpected Transition Event ID: {event_value}."
43
  raise ValueError(err_msg)
44
 
 
 
45
  def on_enter_evaluate_q_count(self) -> None:
46
- # Check if the number of questions has incremented
47
  if len(self.question_list) <= self.current_question_count:
48
  self.give_up_count += 1
49
  else:
50
  self.current_question_count = len(self.question_list)
51
  self.give_up_count = 0
52
  if self.give_up_count >= 3:
53
- self.send("finish_state")
54
  return
55
  if len(self.question_list) >= self.question_number:
56
  self.send("finish_state") # go to output questions
57
  else:
58
  self.send("next_state") # go to need more questions
59
 
 
60
  def on_event_evaluate_q_count(self, event_: dict) -> None:
61
  pass
62
 
@@ -64,6 +69,8 @@ class UWMachine(UWBaseMachine):
64
  # Create the entire workflow to create another question.
65
  self.get_questions_workflow().run()
66
 
 
 
67
  def on_event_need_more_q(self, event_: dict) -> None:
68
  event_source = event_["type"]
69
  event_value = event_["value"]
@@ -81,7 +88,7 @@ class UWMachine(UWBaseMachine):
81
  for question in values
82
  ]
83
  self.most_recent_questions = (
84
- questions # This is a ListArtifact I'm pretty sure
85
  )
86
  self.send("next_state")
87
  case _:
@@ -89,13 +96,13 @@ class UWMachine(UWBaseMachine):
89
  case _:
90
  print(f"Unexpected: {event_}")
91
 
 
92
  def on_enter_assess_generated_q(self) -> None:
93
- # TODO: Should it append it to the list already and remove duplicates? or not?
94
- # TODO: Merge incoming lists
95
  merged_list = [*self.question_list, *self.most_recent_questions]
96
  prompt = f"{merged_list}"
97
  self.get_structure("similarity_auditor").run(prompt)
98
 
 
99
  def on_event_assess_generated_q(self, event_: dict) -> None:
100
  event_source = event_["type"]
101
  event_value = event_["value"]
@@ -114,11 +121,12 @@ class UWMachine(UWBaseMachine):
114
  new_question_list = json.loads(
115
  new_question_list
116
  ) # This must be in that JSON format
117
- except:
118
  new_question_list = self.question_list
119
  self.question_list = new_question_list
120
- self.send("next_state") # move on
121
 
 
122
  def on_enter_output_q(self) -> None:
123
  with Path(Path.cwd().joinpath("outputs/professor_guide.csv")).open(
124
  "w", newline=""
@@ -126,85 +134,26 @@ class UWMachine(UWBaseMachine):
126
  writer = csv.writer(file)
127
  for question in range(len(self.question_list)):
128
  new_row = ["MC", "", 1]
129
- try:
130
- new_row.append(self.question_list[question]["Question"])
131
- wrong_answers = list(self.question_list[question]["Wrong Answers"])
132
- column = random.randint(1, len(wrong_answers) + 1)
133
- new_row.append(column)
134
- for i in range(1, len(wrong_answers) + 2):
135
- if i == column:
136
- new_row.append(self.question_list[question]["Answer"])
137
- else:
138
- wrong_answer = wrong_answers.pop()
139
- if not wrong_answer:
140
- wrong_answer = ""
141
- new_row.append(wrong_answer)
142
- new_row.append(self.question_list[question]["Page"])
143
- new_row.append(self.question_list[question]["Taxonomy"])
144
- writer.writerow(new_row)
145
- except KeyError:
146
- new_row.append(self.question_list["Question"])
147
- wrong_answers = list(self.question_list["Wrong Answers"])
148
- column = random.randint(1, len(wrong_answers) + 1)
149
- new_row.append(column)
150
- for i in range(1, len(wrong_answers) + 2):
151
- if i == column:
152
- new_row.append(self.question_list["Answer"])
153
- else:
154
- new_row.append(wrong_answers.pop())
155
- new_row.append(self.question_list["Page"])
156
- new_row.append(self.question_list["Taxonomy"])
157
- writer.writerow(new_row)
158
- if self.give_up_count == 3:
159
- writer.writerow(["Failed to generate more questions.",])
160
- self.send("next_state")
161
-
162
- def on_event_output_q(self, event_: dict) -> None:
163
- pass
164
-
165
- def on_exit_output_q(self) -> None:
166
- # Reset the state machine values
167
- self.question_list = []
168
- self.most_recent_questions = []
169
-
170
- if __name__ == "__main__":
171
-
172
- question_list = [
173
- {
174
- "Page": "1-2",
175
- "Taxonomy": "Knowledge",
176
- "Question": "What is Python?",
177
- "Answer": "A programming language",
178
- "Wrong Answers": ["A snake", "A car brand", "A fruit"],
179
- },
180
- {
181
- "Page": "3-4",
182
- "Taxonomy": "Comprehension",
183
- "Question": "What does HTML stand for?",
184
- "Answer": "HyperText Markup Language",
185
- "Wrong Answers": [
186
- "High Text Machine Language",
187
- "Hyperlink Text Mode Language",
188
- "None of the above",
189
- ],
190
- },
191
- ]
192
-
193
- with Path(Path.cwd().joinpath("outputs/professor_guide.csv")).open(
194
- "w", newline=""
195
- ) as file:
196
- writer = csv.writer(file)
197
- for question in range(len(question_list)):
198
- # TODO: Shuffle answers according to row, keep correct answer in random section. Answer column is a number.
199
- new_row = [question_list[question]["Question"]]
200
- wrong_answers = list(question_list[question]["Wrong Answers"])
201
  column = random.randint(1, len(wrong_answers) + 1)
202
  new_row.append(column)
203
  for i in range(1, len(wrong_answers) + 2):
204
  if i == column:
205
- new_row.append(question_list[question]["Answer"])
206
  else:
207
  new_row.append(wrong_answers.pop())
208
- new_row.append(question_list[question]["Page"])
209
- new_row.append(question_list[question]["Taxonomy"])
210
  writer.writerow(new_row)
 
 
 
 
 
 
 
 
 
 
 
 
24
  """Starts the machine."""
25
  # Clear input history.
26
  # Clear csv file
27
+ self.retrieve_vector_stores()
28
  self.send("enter_first_state")
29
 
30
+ # The first state: Listens for Gradio and then gives us the parameters to search for.
31
+ # Reinitializes the Give Up counter.
32
  def on_event_gather_parameters(self, event_: dict) -> None:
33
  event_source = event_["type"]
34
  event_value = event_["value"]
 
45
  err_msg = f"Unexpected Transition Event ID: {event_value}."
46
  raise ValueError(err_msg)
47
 
48
+ # Checks if there have not been any new questions generated 3 tries in a row
49
+ # If # of questions is the same as the # of questions required - sends to end.
50
  def on_enter_evaluate_q_count(self) -> None:
 
51
  if len(self.question_list) <= self.current_question_count:
52
  self.give_up_count += 1
53
  else:
54
  self.current_question_count = len(self.question_list)
55
  self.give_up_count = 0
56
  if self.give_up_count >= 3:
57
+ self.send("finish_state") # go to output questions
58
  return
59
  if len(self.question_list) >= self.question_number:
60
  self.send("finish_state") # go to output questions
61
  else:
62
  self.send("next_state") # go to need more questions
63
 
64
+ # Necessary for state machine to not throw errors
65
  def on_event_evaluate_q_count(self, event_: dict) -> None:
66
  pass
67
 
 
69
  # Create the entire workflow to create another question.
70
  self.get_questions_workflow().run()
71
 
72
+ # Returns the output of the workflow - a ListArtifact of TextArtifacts of questions.
73
+ # Question, Answer, Wrong Answers, Taxonomy, Page Number
74
  def on_event_need_more_q(self, event_: dict) -> None:
75
  event_source = event_["type"]
76
  event_value = event_["value"]
 
88
  for question in values
89
  ]
90
  self.most_recent_questions = (
91
+ questions # This is a ListArtifact
92
  )
93
  self.send("next_state")
94
  case _:
 
96
  case _:
97
  print(f"Unexpected: {event_}")
98
 
99
+ # Merges the existing and new questions and sends to similarity auditor to get rid of similar questions.
100
  def on_enter_assess_generated_q(self) -> None:
 
 
101
  merged_list = [*self.question_list, *self.most_recent_questions]
102
  prompt = f"{merged_list}"
103
  self.get_structure("similarity_auditor").run(prompt)
104
 
105
+ # Sets the returned question list (with similar questions wiped) equal to self.question_list
106
  def on_event_assess_generated_q(self, event_: dict) -> None:
107
  event_source = event_["type"]
108
  event_value = event_["value"]
 
121
  new_question_list = json.loads(
122
  new_question_list
123
  ) # This must be in that JSON format
124
+ except: # If not in JSON decode format
125
  new_question_list = self.question_list
126
  self.question_list = new_question_list
127
+ self.send("next_state") # go to Evaluate Q Count
128
 
129
+ # Writes and saves a csv in the correct format to outputs/professor_guide.csv
130
  def on_enter_output_q(self) -> None:
131
  with Path(Path.cwd().joinpath("outputs/professor_guide.csv")).open(
132
  "w", newline=""
 
134
  writer = csv.writer(file)
135
  for question in range(len(self.question_list)):
136
  new_row = ["MC", "", 1]
137
+ new_row.append(self.question_list[question]["Question"])
138
+ wrong_answers = list(self.question_list[question]["Wrong Answers"])
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
139
  column = random.randint(1, len(wrong_answers) + 1)
140
  new_row.append(column)
141
  for i in range(1, len(wrong_answers) + 2):
142
  if i == column:
143
+ new_row.append(self.question_list[question]["Answer"])
144
  else:
145
  new_row.append(wrong_answers.pop())
146
+ new_row.append(self.question_list[question]["Page"])
147
+ new_row.append(self.question_list[question]["Taxonomy"])
148
  writer.writerow(new_row)
149
+ if self.give_up_count == 3:
150
+ writer.writerow(
151
+ [
152
+ "Failed to generate more questions.",
153
+ ]
154
+ )
155
+ self.send("next_state") # back to gather_parameters
156
+
157
+ # Necessary to prevent errors being thrown from state machine
158
+ def on_event_output_q(self, event_: dict) -> None:
159
+ pass