Spaces:
Runtime error
Runtime error
from dotenv import load_dotenv, find_dotenv | |
from textwrap import dedent | |
import io, string | |
from contextlib import redirect_stdout | |
load_dotenv(find_dotenv()) | |
from agents import CryptoAnalysisAgents | |
from tasks import CryptoAnalysisTasks | |
from crewai import Crew | |
class CryptoCrew: | |
def __init__(self, coin): | |
self.coin = coin | |
def run(self, logging=False): | |
agents = CryptoAnalysisAgents() | |
tasks = CryptoAnalysisTasks() | |
crypto_analyst = agents.crypto_analyst() | |
content_writer = agents.content_writer() | |
research_task = tasks.research(crypto_analyst, self.coin) | |
recommend_task = tasks.recommend(content_writer, self.coin) | |
crew = Crew( | |
agents=[ | |
crypto_analyst, | |
content_writer, | |
], | |
tasks=[ | |
research_task, | |
recommend_task, | |
], | |
verbose=True | |
) | |
if logging: | |
f = io.StringIO() | |
with redirect_stdout(f): | |
result = crew.kickoff() | |
log = ''.join([str(char) for char in f.getvalue() if char in string.printable]) | |
result = log + "## Here is the Report\n\n" + result | |
else: | |
result = crew.kickoff() | |
return result | |
# if __name__ == "__main__": | |
# print("## Welcome to Crypto Analysis Crew") | |
# print('-------------------------------') | |
# company = input( | |
# dedent(""" | |
# Which cryptocurrency are you looking to delve into? | |
# """)) | |
# crypto_crew = CryptoCrew(company) | |
# result = crypto_crew.run() | |
# print("\n\n########################") | |
# print("## Here is the Report") | |
# print("########################\n") | |
# print(result) | |