import datetime import functools import json import os import shelve import sys from typing import List, Optional import re import openai import slack_sdk from llama_index import Document, GPTListIndex, ServiceContext from llama_index.llms import OpenAI from ratelimit import limits, sleep_and_retry import csv from channel_id_mapper import ChannelIdMapper from metadata_extracter import MetadataExtractor, ThreadMetadata SKIP_AI = False gpt_turbo: OpenAI = OpenAI(temperature=0, model="gpt-3.5-turbo") service_context = ServiceContext.from_defaults(llm=gpt_turbo, chunk_size=1024) metadata_extractor = MetadataExtractor(gpt_turbo) @functools.lru_cache(maxsize=1024) def cached_shelve_read(key): with shelve.open("mydata.db") as db: return db.get(key, None) def cached_shelve_write(key, value): with shelve.open("mydata.db") as db: db[key] = value cached_shelve_read.cache_clear() def load_mapping_from_json(filepath): if not os.path.exists(filepath): return {} with open(filepath, 'r') as file: return json.load(file) userIdMapping = load_mapping_from_json('user_id_to_name_mapping.json') class SlackMessage: def __init__(self, user, timestamp, content): self.user = userIdMapping.get(user, user) self.timestamp = timestamp self.content = content class SlackThread: def __init__(self, min_ts: str, max_ts: str, user: str, unique_users: set[str], summary: str, message_count: int, detected_metadata: Optional[ThreadMetadata], jira_tickets: list[str] = None): self.min_ts = min_ts self.max_ts = max_ts self.user = user self.unique_users = unique_users self.unique_users_count = len(unique_users) self.summary = SlackChannelReader.replace_user_ids_with_names_in_summary(summary) self.message_count = message_count self.positive_topics: list[str] = detected_metadata.list_of_positive_topics if detected_metadata else [] self.negative_topics: list[str] = detected_metadata.list_of_negative_topics if detected_metadata else [] self.overall_sentiment: str = detected_metadata.overall_sentiment if detected_metadata else None self.jira_tickets: list[str] = jira_tickets if jira_tickets else [] class SlackChannelReader: def __init__(self, token, channel_id, channel_name, openai_api_key): self.client = slack_sdk.WebClient(token=token) self.channel_id = channel_id self.channel_name = channel_name openai.api_key = openai_api_key @staticmethod def replace_user_ids_with_names(text): def replacer(match): user_id = match.group(1) return userIdMapping.get(user_id, match.group(0)) return re.sub(r'<@(\w+)>', replacer, text) @staticmethod def replace_user_ids_with_names_in_summary(summary): # This is a workaround, better to replace ahead of time words = summary.split() # Translate each word if it exists in the dictionary, otherwise keep the word as-is translated_words = [userIdMapping.get(word, word) for word in words] # Join the translated words back into a single string translated_text = ' '.join(translated_words) return translated_text @staticmethod def extract_jira_tickets(text: str): # Define the pattern for JIRA tickets. Assuming that project names do not contain numbers. jira_ticket_pattern = r'\b[A-Z]+-\d+\b' # Use findall to get all occurrences of the pattern tickets = re.findall(jira_ticket_pattern, text) return tickets def save_messages(self, db: shelve.Shelf, msg_csv_file: str): count = 0 with open(msg_csv_file, 'w', newline='') as csvfile: writer = csv.writer(csvfile) writer.writerow( ['thread_id', 'timestamp', 'user', 'content']) for thread_ts, thread_messages in db.items(): writer.writerow([thread_ts, thread_messages[0].timestamp, thread_messages[0].user, thread_messages[0].content, thread_messages[0].jira_tickets]) count += 1 print(f"Saved {count} messages for channel {self.channel_name} to csv") def summarize_threads(self, thread_id_set: set): thread_messages_file = f'csv/{self.channel_name}_messages.csv' if not os.path.exists(thread_messages_file): print(f"Thread messages file not exists, collecting thread messages for channel {self.channel_name}") with open(thread_messages_file, 'w', newline='') as csvfile: writer = csv.writer(csvfile) writer.writerow( ['thread_ts', 'messages_json']) thread_count = 0 for thread_ts in thread_id_set: thread_messages = self.read_thread_messages(self.channel_id, thread_ts) message_dicts = [msg.__dict__ for msg in thread_messages] writer.writerow([thread_ts, json.dumps(message_dicts)]) thread_count += 1 if thread_count % 100 == 0: print(f"Collected messages for {thread_count} threads") print(f"Collecting thread messages completed for : {len(thread_id_set)} threads") # From file read messages and generate threads to another CSV messages_count = 0 with open(f'csv/{self.channel_name}.csv', 'w', newline='') as csvfile: writer = csv.writer(csvfile) writer.writerow( ['Start Time', 'Last Updated', 'Originator', 'Unique Users', 'Unique Users Count', 'Summary', 'Message Count', 'Positive Topics', 'Negative Topics', 'Overall Sentiment']) max_thread_size = 0 thread_count = 0 with open(thread_messages_file, 'r', newline='', encoding='utf-8') as csvfile_msgs: csv_reader = csv.DictReader(csvfile_msgs) for row in csv_reader: thread_ts = str(row['thread_ts']).strip() thread_messages_dict = json.loads(str(row['messages_json'])) thread_messages: list[SlackMessage] = [SlackMessage(d['user'], d['timestamp'], d['content']) for d in thread_messages_dict] messages_count += len(thread_messages) print(f"Summarizing thread {thread_ts} with {len(thread_messages)} messages") slack_thread = self._get_thread_summary(thread_messages) max_thread_size = max(max_thread_size, len(thread_messages)) if slack_thread: writer.writerow(slack_thread.__dict__.values()) thread_count += 1 print(f"Max thread size: {max_thread_size}") return messages_count def read_messages(self, start_date): threads_file = f'csv/{self.channel_name}_threads.csv' if not os.path.exists(threads_file): print(f"Threads file not exists, collecting thread ids for channel {self.channel_name}") start_timestamp = str(start_date.timestamp()) cursor = None has_more = True thread_id_set = set() while has_more: response_data = self.read_slack_conversations(cursor, start_timestamp).data for response in response_data["messages"]: if "thread_ts" in response: thread_id_set.add(response["thread_ts"]) has_more = response_data["has_more"] if has_more: cursor = response_data["response_metadata"]["next_cursor"] print(f"Collected {len(thread_id_set)} thread ids for channel {self.channel_name}") with open(threads_file, 'w') as f: for element in thread_id_set: f.write(element + '\n') with open(threads_file) as f: thread_id_set = set(f.readlines()) message_count = self.summarize_threads(thread_id_set) return message_count, len(thread_id_set) @sleep_and_retry @limits(calls=1, period=1) def read_slack_conversations(self, cursor, start_timestamp): response = self.client.conversations_history( channel=self.channel_id, limit=1000, oldest=start_timestamp, cursor=cursor ) return response def read_thread_messages(self, channel_id, thread_ts) -> List[SlackMessage]: all_messages: list[SlackMessage] = [] next_cursor = None has_more = True while has_more: # Call the conversations.replies method using the WebClient result = self.read_slack_thread_messages(channel_id, next_cursor, thread_ts) messages = result['messages'] for message in messages: try: if 'user' in message: all_messages.append(SlackMessage(message['user'], message['ts'], message['text'])) elif 'subtype' in message: all_messages.append(SlackMessage(message['subtype'], message['ts'], message['text'])) else: print(f"Unknown message type: {message}") except KeyError: print(f"KeyError for message: {message}") has_more = result["has_more"] if has_more: next_cursor = result["response_metadata"]["next_cursor"] return all_messages @sleep_and_retry @limits(calls=2, period=1) def read_slack_thread_messages(self, channel_id, next_cursor, thread_ts): result = self.client.conversations_replies( channel=channel_id, ts=thread_ts, cursor=next_cursor ) return result @staticmethod def _get_thread_summary(thread_messages: List[SlackMessage]) -> Optional[SlackThread]: if len(thread_messages) == 1: return None documents = [] unique_users = set() jira_tickets = [] min_timestamp = datetime.datetime.now() max_timestamp = datetime.datetime(1970, 1, 1) for slack_message in thread_messages: unique_users.add(slack_message.user) updated_msg = SlackChannelReader.replace_user_ids_with_names(str(slack_message.content)) msg = str(slack_message.user) + " : " + str(updated_msg) documents.append(Document(text=msg)) tickets = SlackChannelReader.extract_jira_tickets(updated_msg) if tickets: jira_tickets.extend(tickets) min_timestamp = min(min_timestamp, datetime.datetime.fromtimestamp(float(slack_message.timestamp))) max_timestamp = max(max_timestamp, datetime.datetime.fromtimestamp(float(slack_message.timestamp))) if SKIP_AI or (len(thread_messages) == 1 and len(thread_messages[0].content) < 50): summary = thread_messages[0].content else: qe = GPTListIndex.from_documents(documents).as_query_engine(service_context=service_context) resp = qe.query( "What is the key insight from this discussion thread, describe in less than 50 words" ) summary = resp.response metadata: Optional[ThreadMetadata] = metadata_extractor.extract_metadata(summary) if not SKIP_AI else None return SlackThread(min_timestamp.isoformat(), max_timestamp.isoformat(), thread_messages[0].user, unique_users, summary, len(thread_messages), metadata, jira_tickets) def main(): start_date = datetime.datetime(2023, 1, 1) token = os.environ.get('SLACK_API_TOKEN') openai_api_key = os.environ.get('OPENAI_API_KEY') if not token: print('Please set the SLACK_API_TOKEN environment variable.') exit(1) if not openai_api_key: print('Please set the OPENAI_API_KEY environment variable.') exit(1) if len(sys.argv) < 2: print('Usage: python3 index_slack.py channel1,channel2,channel3') exit(1) channel_id_mapping = ChannelIdMapper(token) channels = sys.argv[1].split(',') for channel in channels: print(f'Indexing channel {channel} with id {channel_id_mapping.get_channel_id(channel)} from: {start_date}') reader = SlackChannelReader(token, channel_id_mapping.get_channel_id(channel), channel, openai_api_key) message_count, thread_count = reader.read_messages(start_date) print(f'Indexed {thread_count} threads created from {message_count} messages for channel {channel}') main()