import datetime
import functools
import json
import os
import shelve
import sys
from typing import List, Optional
import re
import openai
import slack_sdk
from llama_index import Document, GPTListIndex, ServiceContext
from llama_index.llms import OpenAI
from ratelimit import limits, sleep_and_retry
import csv
from channel_id_mapper import ChannelIdMapper
from metadata_extracter import MetadataExtractor, ThreadMetadata
SKIP_AI = False
gpt_turbo: OpenAI = OpenAI(temperature=0, model="gpt-3.5-turbo")
service_context = ServiceContext.from_defaults(llm=gpt_turbo, chunk_size=1024)
metadata_extractor = MetadataExtractor(gpt_turbo)
def cached_shelve_read(key):
with"mydata.db") as db:
return db.get(key, None)
def cached_shelve_write(key, value):
with"mydata.db") as db:
db[key] = value
def load_mapping_from_json(filepath):
if not os.path.exists(filepath):
return {}
with open(filepath, 'r') as file:
return json.load(file)
userIdMapping = load_mapping_from_json('user_id_to_name_mapping.json')
class SlackMessage:
def __init__(self, user, timestamp, content):
self.user = userIdMapping.get(user, user)
self.timestamp = timestamp
self.content = content
class SlackThread:
def __init__(self, min_ts: str, max_ts: str, user: str, unique_users: set[str], summary: str, message_count: int,
detected_metadata: Optional[ThreadMetadata], jira_tickets: list[str] = None):
self.min_ts = min_ts
self.max_ts = max_ts
self.user = user
self.unique_users = unique_users
self.unique_users_count = len(unique_users)
self.summary = SlackChannelReader.replace_user_ids_with_names_in_summary(summary)
self.message_count = message_count
self.positive_topics: list[str] = detected_metadata.list_of_positive_topics if detected_metadata else []
self.negative_topics: list[str] = detected_metadata.list_of_negative_topics if detected_metadata else []
self.overall_sentiment: str = detected_metadata.overall_sentiment if detected_metadata else None
self.jira_tickets: list[str] = jira_tickets if jira_tickets else []
class SlackChannelReader:
def __init__(self, token, channel_id, channel_name, openai_api_key):
self.client = slack_sdk.WebClient(token=token)
self.channel_id = channel_id
self.channel_name = channel_name
openai.api_key = openai_api_key
def replace_user_ids_with_names(text):
def replacer(match):
user_id =
return userIdMapping.get(user_id,
return re.sub(r'<@(\w+)>', replacer, text)
def replace_user_ids_with_names_in_summary(summary):
# This is a workaround, better to replace ahead of time
words = summary.split()
# Translate each word if it exists in the dictionary, otherwise keep the word as-is
translated_words = [userIdMapping.get(word, word) for word in words]
# Join the translated words back into a single string
translated_text = ' '.join(translated_words)
return translated_text
def extract_jira_tickets(text: str):
# Define the pattern for JIRA tickets. Assuming that project names do not contain numbers.
jira_ticket_pattern = r'\b[A-Z]+-\d+\b'
# Use findall to get all occurrences of the pattern
tickets = re.findall(jira_ticket_pattern, text)
return tickets
def save_messages(self, db: shelve.Shelf, msg_csv_file: str):
count = 0
with open(msg_csv_file, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
['thread_id', 'timestamp', 'user', 'content'])
for thread_ts, thread_messages in db.items():
writer.writerow([thread_ts, thread_messages[0].timestamp, thread_messages[0].user,
thread_messages[0].content, thread_messages[0].jira_tickets])
count += 1
print(f"Saved {count} messages for channel {self.channel_name} to csv")
def summarize_threads(self, thread_id_set: set):
thread_messages_file = f'csv/{self.channel_name}_messages.csv'
if not os.path.exists(thread_messages_file):
print(f"Thread messages file not exists, collecting thread messages for channel {self.channel_name}")
with open(thread_messages_file, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
['thread_ts', 'messages_json'])
thread_count = 0
for thread_ts in thread_id_set:
thread_messages = self.read_thread_messages(self.channel_id, thread_ts)
message_dicts = [msg.__dict__ for msg in thread_messages]
writer.writerow([thread_ts, json.dumps(message_dicts)])
thread_count += 1
if thread_count % 100 == 0:
print(f"Collected messages for {thread_count} threads")
print(f"Collecting thread messages completed for : {len(thread_id_set)} threads")
# From file read messages and generate threads to another CSV
messages_count = 0
with open(f'csv/{self.channel_name}.csv', 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
['Start Time', 'Last Updated', 'Originator', 'Unique Users', 'Unique Users Count', 'Summary',
'Message Count',
'Positive Topics',
'Negative Topics', 'Overall Sentiment'])
max_thread_size = 0
thread_count = 0
with open(thread_messages_file, 'r', newline='', encoding='utf-8') as csvfile_msgs:
csv_reader = csv.DictReader(csvfile_msgs)
for row in csv_reader:
thread_ts = str(row['thread_ts']).strip()
thread_messages_dict = json.loads(str(row['messages_json']))
thread_messages: list[SlackMessage] = [SlackMessage(d['user'], d['timestamp'], d['content']) for d
in thread_messages_dict]
messages_count += len(thread_messages)
print(f"Summarizing thread {thread_ts} with {len(thread_messages)} messages")
slack_thread = self._get_thread_summary(thread_messages)
max_thread_size = max(max_thread_size, len(thread_messages))
if slack_thread:
thread_count += 1
print(f"Max thread size: {max_thread_size}")
return messages_count
def read_messages(self, start_date):
threads_file = f'csv/{self.channel_name}_threads.csv'
if not os.path.exists(threads_file):
print(f"Threads file not exists, collecting thread ids for channel {self.channel_name}")
start_timestamp = str(start_date.timestamp())
cursor = None
has_more = True
thread_id_set = set()
while has_more:
response_data = self.read_slack_conversations(cursor, start_timestamp).data
for response in response_data["messages"]:
if "thread_ts" in response:
has_more = response_data["has_more"]
if has_more:
cursor = response_data["response_metadata"]["next_cursor"]
print(f"Collected {len(thread_id_set)} thread ids for channel {self.channel_name}")
with open(threads_file, 'w') as f:
for element in thread_id_set:
f.write(element + '\n')
with open(threads_file) as f:
thread_id_set = set(f.readlines())
message_count = self.summarize_threads(thread_id_set)
return message_count, len(thread_id_set)
@limits(calls=1, period=1)
def read_slack_conversations(self, cursor, start_timestamp):
response = self.client.conversations_history(
return response
def read_thread_messages(self, channel_id, thread_ts) -> List[SlackMessage]:
all_messages: list[SlackMessage] = []
next_cursor = None
has_more = True
while has_more:
# Call the conversations.replies method using the WebClient
result = self.read_slack_thread_messages(channel_id, next_cursor, thread_ts)
messages = result['messages']
for message in messages:
if 'user' in message:
all_messages.append(SlackMessage(message['user'], message['ts'], message['text']))
elif 'subtype' in message:
all_messages.append(SlackMessage(message['subtype'], message['ts'], message['text']))
print(f"Unknown message type: {message}")
except KeyError:
print(f"KeyError for message: {message}")
has_more = result["has_more"]
if has_more:
next_cursor = result["response_metadata"]["next_cursor"]
return all_messages
@limits(calls=2, period=1)
def read_slack_thread_messages(self, channel_id, next_cursor, thread_ts):
result = self.client.conversations_replies(
return result
def _get_thread_summary(thread_messages: List[SlackMessage]) -> Optional[SlackThread]:
if len(thread_messages) == 1:
return None
documents = []
unique_users = set()
jira_tickets = []
min_timestamp =
max_timestamp = datetime.datetime(1970, 1, 1)
for slack_message in thread_messages:
updated_msg = SlackChannelReader.replace_user_ids_with_names(str(slack_message.content))
msg = str(slack_message.user) + " : " + str(updated_msg)
tickets = SlackChannelReader.extract_jira_tickets(updated_msg)
if tickets:
min_timestamp = min(min_timestamp, datetime.datetime.fromtimestamp(float(slack_message.timestamp)))
max_timestamp = max(max_timestamp, datetime.datetime.fromtimestamp(float(slack_message.timestamp)))
if SKIP_AI or (len(thread_messages) == 1 and len(thread_messages[0].content) < 50):
summary = thread_messages[0].content
qe = GPTListIndex.from_documents(documents).as_query_engine(service_context=service_context)
resp = qe.query(
"What is the key insight from this discussion thread, describe in less than 50 words"
summary = resp.response
metadata: Optional[ThreadMetadata] = metadata_extractor.extract_metadata(summary) if not SKIP_AI else None
return SlackThread(min_timestamp.isoformat(), max_timestamp.isoformat(), thread_messages[0].user, unique_users,
len(thread_messages), metadata, jira_tickets)
def main():
start_date = datetime.datetime(2023, 1, 1)
token = os.environ.get('SLACK_API_TOKEN')
openai_api_key = os.environ.get('OPENAI_API_KEY')
if not token:
print('Please set the SLACK_API_TOKEN environment variable.')
if not openai_api_key:
print('Please set the OPENAI_API_KEY environment variable.')
if len(sys.argv) < 2:
print('Usage: python3 channel1,channel2,channel3')
channel_id_mapping = ChannelIdMapper(token)
channels = sys.argv[1].split(',')
for channel in channels:
print(f'Indexing channel {channel} with id {channel_id_mapping.get_channel_id(channel)} from: {start_date}')
reader = SlackChannelReader(token, channel_id_mapping.get_channel_id(channel), channel, openai_api_key)
message_count, thread_count = reader.read_messages(start_date)
print(f'Indexed {thread_count} threads created from {message_count} messages for channel {channel}')