slackdemo / slack_summary.py
svummidi's picture
POC for passive monitoring
a31ba66
raw
history blame
No virus
12.9 kB
import datetime
import functools
import json
import os
import shelve
import sys
from typing import List, Optional
import re
import openai
import slack_sdk
from llama_index import Document, GPTListIndex, ServiceContext
from llama_index.llms import OpenAI
from ratelimit import limits, sleep_and_retry
import csv
from channel_id_mapper import ChannelIdMapper
from metadata_extracter import MetadataExtractor, ThreadMetadata
SKIP_AI = False
gpt_turbo: OpenAI = OpenAI(temperature=0, model="gpt-3.5-turbo")
service_context = ServiceContext.from_defaults(llm=gpt_turbo, chunk_size=1024)
metadata_extractor = MetadataExtractor(gpt_turbo)
@functools.lru_cache(maxsize=1024)
def cached_shelve_read(key):
with shelve.open("mydata.db") as db:
return db.get(key, None)
def cached_shelve_write(key, value):
with shelve.open("mydata.db") as db:
db[key] = value
cached_shelve_read.cache_clear()
def load_mapping_from_json(filepath):
if not os.path.exists(filepath):
return {}
with open(filepath, 'r') as file:
return json.load(file)
userIdMapping = load_mapping_from_json('user_id_to_name_mapping.json')
class SlackMessage:
def __init__(self, user, timestamp, content):
self.user = userIdMapping.get(user, user)
self.timestamp = timestamp
self.content = content
class SlackThread:
def __init__(self, min_ts: str, max_ts: str, user: str, unique_users: set[str], summary: str, message_count: int,
detected_metadata: Optional[ThreadMetadata], jira_tickets: list[str] = None):
self.min_ts = min_ts
self.max_ts = max_ts
self.user = user
self.unique_users = unique_users
self.unique_users_count = len(unique_users)
self.summary = SlackChannelReader.replace_user_ids_with_names_in_summary(summary)
self.message_count = message_count
self.positive_topics: list[str] = detected_metadata.list_of_positive_topics if detected_metadata else []
self.negative_topics: list[str] = detected_metadata.list_of_negative_topics if detected_metadata else []
self.overall_sentiment: str = detected_metadata.overall_sentiment if detected_metadata else None
self.jira_tickets: list[str] = jira_tickets if jira_tickets else []
class SlackChannelReader:
def __init__(self, token, channel_id, channel_name, openai_api_key):
self.client = slack_sdk.WebClient(token=token)
self.channel_id = channel_id
self.channel_name = channel_name
openai.api_key = openai_api_key
@staticmethod
def replace_user_ids_with_names(text):
def replacer(match):
user_id = match.group(1)
return userIdMapping.get(user_id, match.group(0))
return re.sub(r'<@(\w+)>', replacer, text)
@staticmethod
def replace_user_ids_with_names_in_summary(summary):
# This is a workaround, better to replace ahead of time
words = summary.split()
# Translate each word if it exists in the dictionary, otherwise keep the word as-is
translated_words = [userIdMapping.get(word, word) for word in words]
# Join the translated words back into a single string
translated_text = ' '.join(translated_words)
return translated_text
@staticmethod
def extract_jira_tickets(text: str):
# Define the pattern for JIRA tickets. Assuming that project names do not contain numbers.
jira_ticket_pattern = r'\b[A-Z]+-\d+\b'
# Use findall to get all occurrences of the pattern
tickets = re.findall(jira_ticket_pattern, text)
return tickets
def save_messages(self, db: shelve.Shelf, msg_csv_file: str):
count = 0
with open(msg_csv_file, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(
['thread_id', 'timestamp', 'user', 'content'])
for thread_ts, thread_messages in db.items():
writer.writerow([thread_ts, thread_messages[0].timestamp, thread_messages[0].user,
thread_messages[0].content, thread_messages[0].jira_tickets])
count += 1
print(f"Saved {count} messages for channel {self.channel_name} to csv")
def summarize_threads(self, thread_id_set: set):
thread_messages_file = f'csv/{self.channel_name}_messages.csv'
if not os.path.exists(thread_messages_file):
print(f"Thread messages file not exists, collecting thread messages for channel {self.channel_name}")
with open(thread_messages_file, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(
['thread_ts', 'messages_json'])
thread_count = 0
for thread_ts in thread_id_set:
thread_messages = self.read_thread_messages(self.channel_id, thread_ts)
message_dicts = [msg.__dict__ for msg in thread_messages]
writer.writerow([thread_ts, json.dumps(message_dicts)])
thread_count += 1
if thread_count % 100 == 0:
print(f"Collected messages for {thread_count} threads")
print(f"Collecting thread messages completed for : {len(thread_id_set)} threads")
# From file read messages and generate threads to another CSV
messages_count = 0
with open(f'csv/{self.channel_name}.csv', 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(
['Start Time', 'Last Updated', 'Originator', 'Unique Users', 'Unique Users Count', 'Summary',
'Message Count',
'Positive Topics',
'Negative Topics', 'Overall Sentiment'])
max_thread_size = 0
thread_count = 0
with open(thread_messages_file, 'r', newline='', encoding='utf-8') as csvfile_msgs:
csv_reader = csv.DictReader(csvfile_msgs)
for row in csv_reader:
thread_ts = str(row['thread_ts']).strip()
thread_messages_dict = json.loads(str(row['messages_json']))
thread_messages: list[SlackMessage] = [SlackMessage(d['user'], d['timestamp'], d['content']) for d
in thread_messages_dict]
messages_count += len(thread_messages)
print(f"Summarizing thread {thread_ts} with {len(thread_messages)} messages")
slack_thread = self._get_thread_summary(thread_messages)
max_thread_size = max(max_thread_size, len(thread_messages))
if slack_thread:
writer.writerow(slack_thread.__dict__.values())
thread_count += 1
print(f"Max thread size: {max_thread_size}")
return messages_count
def read_messages(self, start_date):
threads_file = f'csv/{self.channel_name}_threads.csv'
if not os.path.exists(threads_file):
print(f"Threads file not exists, collecting thread ids for channel {self.channel_name}")
start_timestamp = str(start_date.timestamp())
cursor = None
has_more = True
thread_id_set = set()
while has_more:
response_data = self.read_slack_conversations(cursor, start_timestamp).data
for response in response_data["messages"]:
if "thread_ts" in response:
thread_id_set.add(response["thread_ts"])
has_more = response_data["has_more"]
if has_more:
cursor = response_data["response_metadata"]["next_cursor"]
print(f"Collected {len(thread_id_set)} thread ids for channel {self.channel_name}")
with open(threads_file, 'w') as f:
for element in thread_id_set:
f.write(element + '\n')
with open(threads_file) as f:
thread_id_set = set(f.readlines())
message_count = self.summarize_threads(thread_id_set)
return message_count, len(thread_id_set)
@sleep_and_retry
@limits(calls=1, period=1)
def read_slack_conversations(self, cursor, start_timestamp):
response = self.client.conversations_history(
channel=self.channel_id,
limit=1000,
oldest=start_timestamp,
cursor=cursor
)
return response
def read_thread_messages(self, channel_id, thread_ts) -> List[SlackMessage]:
all_messages: list[SlackMessage] = []
next_cursor = None
has_more = True
while has_more:
# Call the conversations.replies method using the WebClient
result = self.read_slack_thread_messages(channel_id, next_cursor, thread_ts)
messages = result['messages']
for message in messages:
try:
if 'user' in message:
all_messages.append(SlackMessage(message['user'], message['ts'], message['text']))
elif 'subtype' in message:
all_messages.append(SlackMessage(message['subtype'], message['ts'], message['text']))
else:
print(f"Unknown message type: {message}")
except KeyError:
print(f"KeyError for message: {message}")
has_more = result["has_more"]
if has_more:
next_cursor = result["response_metadata"]["next_cursor"]
return all_messages
@sleep_and_retry
@limits(calls=2, period=1)
def read_slack_thread_messages(self, channel_id, next_cursor, thread_ts):
result = self.client.conversations_replies(
channel=channel_id,
ts=thread_ts,
cursor=next_cursor
)
return result
@staticmethod
def _get_thread_summary(thread_messages: List[SlackMessage]) -> Optional[SlackThread]:
if len(thread_messages) == 1:
return None
documents = []
unique_users = set()
jira_tickets = []
min_timestamp = datetime.datetime.now()
max_timestamp = datetime.datetime(1970, 1, 1)
for slack_message in thread_messages:
unique_users.add(slack_message.user)
updated_msg = SlackChannelReader.replace_user_ids_with_names(str(slack_message.content))
msg = str(slack_message.user) + " : " + str(updated_msg)
documents.append(Document(text=msg))
tickets = SlackChannelReader.extract_jira_tickets(updated_msg)
if tickets:
jira_tickets.extend(tickets)
min_timestamp = min(min_timestamp, datetime.datetime.fromtimestamp(float(slack_message.timestamp)))
max_timestamp = max(max_timestamp, datetime.datetime.fromtimestamp(float(slack_message.timestamp)))
if SKIP_AI or (len(thread_messages) == 1 and len(thread_messages[0].content) < 50):
summary = thread_messages[0].content
else:
qe = GPTListIndex.from_documents(documents).as_query_engine(service_context=service_context)
resp = qe.query(
"What is the key insight from this discussion thread, describe in less than 50 words"
)
summary = resp.response
metadata: Optional[ThreadMetadata] = metadata_extractor.extract_metadata(summary) if not SKIP_AI else None
return SlackThread(min_timestamp.isoformat(), max_timestamp.isoformat(), thread_messages[0].user, unique_users,
summary,
len(thread_messages), metadata, jira_tickets)
def main():
start_date = datetime.datetime(2023, 1, 1)
token = os.environ.get('SLACK_API_TOKEN')
openai_api_key = os.environ.get('OPENAI_API_KEY')
if not token:
print('Please set the SLACK_API_TOKEN environment variable.')
exit(1)
if not openai_api_key:
print('Please set the OPENAI_API_KEY environment variable.')
exit(1)
if len(sys.argv) < 2:
print('Usage: python3 index_slack.py channel1,channel2,channel3')
exit(1)
channel_id_mapping = ChannelIdMapper(token)
channels = sys.argv[1].split(',')
for channel in channels:
print(f'Indexing channel {channel} with id {channel_id_mapping.get_channel_id(channel)} from: {start_date}')
reader = SlackChannelReader(token, channel_id_mapping.get_channel_id(channel), channel, openai_api_key)
message_count, thread_count = reader.read_messages(start_date)
print(f'Indexed {thread_count} threads created from {message_count} messages for channel {channel}')
main()