File size: 12,896 Bytes
a31ba66 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 |
import datetime
import functools
import json
import os
import shelve
import sys
from typing import List, Optional
import re
import openai
import slack_sdk
from llama_index import Document, GPTListIndex, ServiceContext
from llama_index.llms import OpenAI
from ratelimit import limits, sleep_and_retry
import csv
from channel_id_mapper import ChannelIdMapper
from metadata_extracter import MetadataExtractor, ThreadMetadata
SKIP_AI = False
gpt_turbo: OpenAI = OpenAI(temperature=0, model="gpt-3.5-turbo")
service_context = ServiceContext.from_defaults(llm=gpt_turbo, chunk_size=1024)
metadata_extractor = MetadataExtractor(gpt_turbo)
@functools.lru_cache(maxsize=1024)
def cached_shelve_read(key):
with shelve.open("mydata.db") as db:
return db.get(key, None)
def cached_shelve_write(key, value):
with shelve.open("mydata.db") as db:
db[key] = value
cached_shelve_read.cache_clear()
def load_mapping_from_json(filepath):
if not os.path.exists(filepath):
return {}
with open(filepath, 'r') as file:
return json.load(file)
userIdMapping = load_mapping_from_json('user_id_to_name_mapping.json')
class SlackMessage:
def __init__(self, user, timestamp, content):
self.user = userIdMapping.get(user, user)
self.timestamp = timestamp
self.content = content
class SlackThread:
def __init__(self, min_ts: str, max_ts: str, user: str, unique_users: set[str], summary: str, message_count: int,
detected_metadata: Optional[ThreadMetadata], jira_tickets: list[str] = None):
self.min_ts = min_ts
self.max_ts = max_ts
self.user = user
self.unique_users = unique_users
self.unique_users_count = len(unique_users)
self.summary = SlackChannelReader.replace_user_ids_with_names_in_summary(summary)
self.message_count = message_count
self.positive_topics: list[str] = detected_metadata.list_of_positive_topics if detected_metadata else []
self.negative_topics: list[str] = detected_metadata.list_of_negative_topics if detected_metadata else []
self.overall_sentiment: str = detected_metadata.overall_sentiment if detected_metadata else None
self.jira_tickets: list[str] = jira_tickets if jira_tickets else []
class SlackChannelReader:
def __init__(self, token, channel_id, channel_name, openai_api_key):
self.client = slack_sdk.WebClient(token=token)
self.channel_id = channel_id
self.channel_name = channel_name
openai.api_key = openai_api_key
@staticmethod
def replace_user_ids_with_names(text):
def replacer(match):
user_id = match.group(1)
return userIdMapping.get(user_id, match.group(0))
return re.sub(r'<@(\w+)>', replacer, text)
@staticmethod
def replace_user_ids_with_names_in_summary(summary):
# This is a workaround, better to replace ahead of time
words = summary.split()
# Translate each word if it exists in the dictionary, otherwise keep the word as-is
translated_words = [userIdMapping.get(word, word) for word in words]
# Join the translated words back into a single string
translated_text = ' '.join(translated_words)
return translated_text
@staticmethod
def extract_jira_tickets(text: str):
# Define the pattern for JIRA tickets. Assuming that project names do not contain numbers.
jira_ticket_pattern = r'\b[A-Z]+-\d+\b'
# Use findall to get all occurrences of the pattern
tickets = re.findall(jira_ticket_pattern, text)
return tickets
def save_messages(self, db: shelve.Shelf, msg_csv_file: str):
count = 0
with open(msg_csv_file, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(
['thread_id', 'timestamp', 'user', 'content'])
for thread_ts, thread_messages in db.items():
writer.writerow([thread_ts, thread_messages[0].timestamp, thread_messages[0].user,
thread_messages[0].content, thread_messages[0].jira_tickets])
count += 1
print(f"Saved {count} messages for channel {self.channel_name} to csv")
def summarize_threads(self, thread_id_set: set):
thread_messages_file = f'csv/{self.channel_name}_messages.csv'
if not os.path.exists(thread_messages_file):
print(f"Thread messages file not exists, collecting thread messages for channel {self.channel_name}")
with open(thread_messages_file, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(
['thread_ts', 'messages_json'])
thread_count = 0
for thread_ts in thread_id_set:
thread_messages = self.read_thread_messages(self.channel_id, thread_ts)
message_dicts = [msg.__dict__ for msg in thread_messages]
writer.writerow([thread_ts, json.dumps(message_dicts)])
thread_count += 1
if thread_count % 100 == 0:
print(f"Collected messages for {thread_count} threads")
print(f"Collecting thread messages completed for : {len(thread_id_set)} threads")
# From file read messages and generate threads to another CSV
messages_count = 0
with open(f'csv/{self.channel_name}.csv', 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(
['Start Time', 'Last Updated', 'Originator', 'Unique Users', 'Unique Users Count', 'Summary',
'Message Count',
'Positive Topics',
'Negative Topics', 'Overall Sentiment'])
max_thread_size = 0
thread_count = 0
with open(thread_messages_file, 'r', newline='', encoding='utf-8') as csvfile_msgs:
csv_reader = csv.DictReader(csvfile_msgs)
for row in csv_reader:
thread_ts = str(row['thread_ts']).strip()
thread_messages_dict = json.loads(str(row['messages_json']))
thread_messages: list[SlackMessage] = [SlackMessage(d['user'], d['timestamp'], d['content']) for d
in thread_messages_dict]
messages_count += len(thread_messages)
print(f"Summarizing thread {thread_ts} with {len(thread_messages)} messages")
slack_thread = self._get_thread_summary(thread_messages)
max_thread_size = max(max_thread_size, len(thread_messages))
if slack_thread:
writer.writerow(slack_thread.__dict__.values())
thread_count += 1
print(f"Max thread size: {max_thread_size}")
return messages_count
def read_messages(self, start_date):
threads_file = f'csv/{self.channel_name}_threads.csv'
if not os.path.exists(threads_file):
print(f"Threads file not exists, collecting thread ids for channel {self.channel_name}")
start_timestamp = str(start_date.timestamp())
cursor = None
has_more = True
thread_id_set = set()
while has_more:
response_data = self.read_slack_conversations(cursor, start_timestamp).data
for response in response_data["messages"]:
if "thread_ts" in response:
thread_id_set.add(response["thread_ts"])
has_more = response_data["has_more"]
if has_more:
cursor = response_data["response_metadata"]["next_cursor"]
print(f"Collected {len(thread_id_set)} thread ids for channel {self.channel_name}")
with open(threads_file, 'w') as f:
for element in thread_id_set:
f.write(element + '\n')
with open(threads_file) as f:
thread_id_set = set(f.readlines())
message_count = self.summarize_threads(thread_id_set)
return message_count, len(thread_id_set)
@sleep_and_retry
@limits(calls=1, period=1)
def read_slack_conversations(self, cursor, start_timestamp):
response = self.client.conversations_history(
channel=self.channel_id,
limit=1000,
oldest=start_timestamp,
cursor=cursor
)
return response
def read_thread_messages(self, channel_id, thread_ts) -> List[SlackMessage]:
all_messages: list[SlackMessage] = []
next_cursor = None
has_more = True
while has_more:
# Call the conversations.replies method using the WebClient
result = self.read_slack_thread_messages(channel_id, next_cursor, thread_ts)
messages = result['messages']
for message in messages:
try:
if 'user' in message:
all_messages.append(SlackMessage(message['user'], message['ts'], message['text']))
elif 'subtype' in message:
all_messages.append(SlackMessage(message['subtype'], message['ts'], message['text']))
else:
print(f"Unknown message type: {message}")
except KeyError:
print(f"KeyError for message: {message}")
has_more = result["has_more"]
if has_more:
next_cursor = result["response_metadata"]["next_cursor"]
return all_messages
@sleep_and_retry
@limits(calls=2, period=1)
def read_slack_thread_messages(self, channel_id, next_cursor, thread_ts):
result = self.client.conversations_replies(
channel=channel_id,
ts=thread_ts,
cursor=next_cursor
)
return result
@staticmethod
def _get_thread_summary(thread_messages: List[SlackMessage]) -> Optional[SlackThread]:
if len(thread_messages) == 1:
return None
documents = []
unique_users = set()
jira_tickets = []
min_timestamp = datetime.datetime.now()
max_timestamp = datetime.datetime(1970, 1, 1)
for slack_message in thread_messages:
unique_users.add(slack_message.user)
updated_msg = SlackChannelReader.replace_user_ids_with_names(str(slack_message.content))
msg = str(slack_message.user) + " : " + str(updated_msg)
documents.append(Document(text=msg))
tickets = SlackChannelReader.extract_jira_tickets(updated_msg)
if tickets:
jira_tickets.extend(tickets)
min_timestamp = min(min_timestamp, datetime.datetime.fromtimestamp(float(slack_message.timestamp)))
max_timestamp = max(max_timestamp, datetime.datetime.fromtimestamp(float(slack_message.timestamp)))
if SKIP_AI or (len(thread_messages) == 1 and len(thread_messages[0].content) < 50):
summary = thread_messages[0].content
else:
qe = GPTListIndex.from_documents(documents).as_query_engine(service_context=service_context)
resp = qe.query(
"What is the key insight from this discussion thread, describe in less than 50 words"
)
summary = resp.response
metadata: Optional[ThreadMetadata] = metadata_extractor.extract_metadata(summary) if not SKIP_AI else None
return SlackThread(min_timestamp.isoformat(), max_timestamp.isoformat(), thread_messages[0].user, unique_users,
summary,
len(thread_messages), metadata, jira_tickets)
def main():
start_date = datetime.datetime(2023, 1, 1)
token = os.environ.get('SLACK_API_TOKEN')
openai_api_key = os.environ.get('OPENAI_API_KEY')
if not token:
print('Please set the SLACK_API_TOKEN environment variable.')
exit(1)
if not openai_api_key:
print('Please set the OPENAI_API_KEY environment variable.')
exit(1)
if len(sys.argv) < 2:
print('Usage: python3 index_slack.py channel1,channel2,channel3')
exit(1)
channel_id_mapping = ChannelIdMapper(token)
channels = sys.argv[1].split(',')
for channel in channels:
print(f'Indexing channel {channel} with id {channel_id_mapping.get_channel_id(channel)} from: {start_date}')
reader = SlackChannelReader(token, channel_id_mapping.get_channel_id(channel), channel, openai_api_key)
message_count, thread_count = reader.read_messages(start_date)
print(f'Indexed {thread_count} threads created from {message_count} messages for channel {channel}')
main()
|