|
import datetime |
|
import functools |
|
import json |
|
import os |
|
import shelve |
|
import sys |
|
from typing import List, Optional |
|
import re |
|
import openai |
|
import slack_sdk |
|
from llama_index import Document, GPTListIndex, ServiceContext |
|
from llama_index.llms import OpenAI |
|
from ratelimit import limits, sleep_and_retry |
|
|
|
import csv |
|
from channel_id_mapper import ChannelIdMapper |
|
from metadata_extracter import MetadataExtractor, ThreadMetadata |
|
|
|
SKIP_AI = False |
|
gpt_turbo: OpenAI = OpenAI(temperature=0, model="gpt-3.5-turbo") |
|
service_context = ServiceContext.from_defaults(llm=gpt_turbo, chunk_size=1024) |
|
metadata_extractor = MetadataExtractor(gpt_turbo) |
|
|
|
|
|
@functools.lru_cache(maxsize=1024) |
|
def cached_shelve_read(key): |
|
with shelve.open("mydata.db") as db: |
|
return db.get(key, None) |
|
|
|
|
|
def cached_shelve_write(key, value): |
|
with shelve.open("mydata.db") as db: |
|
db[key] = value |
|
cached_shelve_read.cache_clear() |
|
|
|
|
|
def load_mapping_from_json(filepath): |
|
if not os.path.exists(filepath): |
|
return {} |
|
with open(filepath, 'r') as file: |
|
return json.load(file) |
|
|
|
|
|
userIdMapping = load_mapping_from_json('user_id_to_name_mapping.json') |
|
|
|
|
|
class SlackMessage: |
|
def __init__(self, user, timestamp, content): |
|
self.user = userIdMapping.get(user, user) |
|
self.timestamp = timestamp |
|
self.content = content |
|
|
|
|
|
class SlackThread: |
|
def __init__(self, min_ts: str, max_ts: str, user: str, unique_users: set[str], summary: str, message_count: int, |
|
detected_metadata: Optional[ThreadMetadata], jira_tickets: list[str] = None): |
|
self.min_ts = min_ts |
|
self.max_ts = max_ts |
|
self.user = user |
|
self.unique_users = unique_users |
|
self.unique_users_count = len(unique_users) |
|
self.summary = SlackChannelReader.replace_user_ids_with_names_in_summary(summary) |
|
self.message_count = message_count |
|
self.positive_topics: list[str] = detected_metadata.list_of_positive_topics if detected_metadata else [] |
|
self.negative_topics: list[str] = detected_metadata.list_of_negative_topics if detected_metadata else [] |
|
self.overall_sentiment: str = detected_metadata.overall_sentiment if detected_metadata else None |
|
self.jira_tickets: list[str] = jira_tickets if jira_tickets else [] |
|
|
|
|
|
class SlackChannelReader: |
|
def __init__(self, token, channel_id, channel_name, openai_api_key): |
|
self.client = slack_sdk.WebClient(token=token) |
|
self.channel_id = channel_id |
|
self.channel_name = channel_name |
|
openai.api_key = openai_api_key |
|
|
|
@staticmethod |
|
def replace_user_ids_with_names(text): |
|
def replacer(match): |
|
user_id = match.group(1) |
|
return userIdMapping.get(user_id, match.group(0)) |
|
|
|
return re.sub(r'<@(\w+)>', replacer, text) |
|
|
|
@staticmethod |
|
def replace_user_ids_with_names_in_summary(summary): |
|
|
|
words = summary.split() |
|
|
|
|
|
translated_words = [userIdMapping.get(word, word) for word in words] |
|
|
|
|
|
translated_text = ' '.join(translated_words) |
|
|
|
return translated_text |
|
|
|
@staticmethod |
|
def extract_jira_tickets(text: str): |
|
|
|
jira_ticket_pattern = r'\b[A-Z]+-\d+\b' |
|
|
|
tickets = re.findall(jira_ticket_pattern, text) |
|
return tickets |
|
|
|
def save_messages(self, db: shelve.Shelf, msg_csv_file: str): |
|
count = 0 |
|
with open(msg_csv_file, 'w', newline='') as csvfile: |
|
writer = csv.writer(csvfile) |
|
writer.writerow( |
|
['thread_id', 'timestamp', 'user', 'content']) |
|
for thread_ts, thread_messages in db.items(): |
|
writer.writerow([thread_ts, thread_messages[0].timestamp, thread_messages[0].user, |
|
thread_messages[0].content, thread_messages[0].jira_tickets]) |
|
count += 1 |
|
print(f"Saved {count} messages for channel {self.channel_name} to csv") |
|
|
|
def summarize_threads(self, thread_id_set: set): |
|
thread_messages_file = f'csv/{self.channel_name}_messages.csv' |
|
if not os.path.exists(thread_messages_file): |
|
print(f"Thread messages file not exists, collecting thread messages for channel {self.channel_name}") |
|
with open(thread_messages_file, 'w', newline='') as csvfile: |
|
writer = csv.writer(csvfile) |
|
writer.writerow( |
|
['thread_ts', 'messages_json']) |
|
thread_count = 0 |
|
for thread_ts in thread_id_set: |
|
thread_messages = self.read_thread_messages(self.channel_id, thread_ts) |
|
message_dicts = [msg.__dict__ for msg in thread_messages] |
|
writer.writerow([thread_ts, json.dumps(message_dicts)]) |
|
thread_count += 1 |
|
if thread_count % 100 == 0: |
|
print(f"Collected messages for {thread_count} threads") |
|
print(f"Collecting thread messages completed for : {len(thread_id_set)} threads") |
|
|
|
|
|
messages_count = 0 |
|
with open(f'csv/{self.channel_name}.csv', 'w', newline='') as csvfile: |
|
writer = csv.writer(csvfile) |
|
writer.writerow( |
|
['Start Time', 'Last Updated', 'Originator', 'Unique Users', 'Unique Users Count', 'Summary', |
|
'Message Count', |
|
'Positive Topics', |
|
'Negative Topics', 'Overall Sentiment']) |
|
max_thread_size = 0 |
|
thread_count = 0 |
|
with open(thread_messages_file, 'r', newline='', encoding='utf-8') as csvfile_msgs: |
|
csv_reader = csv.DictReader(csvfile_msgs) |
|
for row in csv_reader: |
|
thread_ts = str(row['thread_ts']).strip() |
|
thread_messages_dict = json.loads(str(row['messages_json'])) |
|
thread_messages: list[SlackMessage] = [SlackMessage(d['user'], d['timestamp'], d['content']) for d |
|
in thread_messages_dict] |
|
messages_count += len(thread_messages) |
|
print(f"Summarizing thread {thread_ts} with {len(thread_messages)} messages") |
|
slack_thread = self._get_thread_summary(thread_messages) |
|
max_thread_size = max(max_thread_size, len(thread_messages)) |
|
if slack_thread: |
|
writer.writerow(slack_thread.__dict__.values()) |
|
thread_count += 1 |
|
print(f"Max thread size: {max_thread_size}") |
|
return messages_count |
|
|
|
def read_messages(self, start_date): |
|
threads_file = f'csv/{self.channel_name}_threads.csv' |
|
if not os.path.exists(threads_file): |
|
print(f"Threads file not exists, collecting thread ids for channel {self.channel_name}") |
|
start_timestamp = str(start_date.timestamp()) |
|
cursor = None |
|
has_more = True |
|
thread_id_set = set() |
|
while has_more: |
|
response_data = self.read_slack_conversations(cursor, start_timestamp).data |
|
for response in response_data["messages"]: |
|
if "thread_ts" in response: |
|
thread_id_set.add(response["thread_ts"]) |
|
has_more = response_data["has_more"] |
|
if has_more: |
|
cursor = response_data["response_metadata"]["next_cursor"] |
|
print(f"Collected {len(thread_id_set)} thread ids for channel {self.channel_name}") |
|
with open(threads_file, 'w') as f: |
|
for element in thread_id_set: |
|
f.write(element + '\n') |
|
with open(threads_file) as f: |
|
thread_id_set = set(f.readlines()) |
|
message_count = self.summarize_threads(thread_id_set) |
|
return message_count, len(thread_id_set) |
|
|
|
@sleep_and_retry |
|
@limits(calls=1, period=1) |
|
def read_slack_conversations(self, cursor, start_timestamp): |
|
response = self.client.conversations_history( |
|
channel=self.channel_id, |
|
limit=1000, |
|
oldest=start_timestamp, |
|
cursor=cursor |
|
) |
|
return response |
|
|
|
def read_thread_messages(self, channel_id, thread_ts) -> List[SlackMessage]: |
|
all_messages: list[SlackMessage] = [] |
|
next_cursor = None |
|
has_more = True |
|
while has_more: |
|
|
|
result = self.read_slack_thread_messages(channel_id, next_cursor, thread_ts) |
|
|
|
messages = result['messages'] |
|
for message in messages: |
|
try: |
|
if 'user' in message: |
|
all_messages.append(SlackMessage(message['user'], message['ts'], message['text'])) |
|
elif 'subtype' in message: |
|
all_messages.append(SlackMessage(message['subtype'], message['ts'], message['text'])) |
|
else: |
|
print(f"Unknown message type: {message}") |
|
except KeyError: |
|
print(f"KeyError for message: {message}") |
|
has_more = result["has_more"] |
|
if has_more: |
|
next_cursor = result["response_metadata"]["next_cursor"] |
|
return all_messages |
|
|
|
@sleep_and_retry |
|
@limits(calls=2, period=1) |
|
def read_slack_thread_messages(self, channel_id, next_cursor, thread_ts): |
|
result = self.client.conversations_replies( |
|
channel=channel_id, |
|
ts=thread_ts, |
|
cursor=next_cursor |
|
) |
|
return result |
|
|
|
@staticmethod |
|
def _get_thread_summary(thread_messages: List[SlackMessage]) -> Optional[SlackThread]: |
|
|
|
if len(thread_messages) == 1: |
|
return None |
|
documents = [] |
|
unique_users = set() |
|
jira_tickets = [] |
|
min_timestamp = datetime.datetime.now() |
|
max_timestamp = datetime.datetime(1970, 1, 1) |
|
for slack_message in thread_messages: |
|
unique_users.add(slack_message.user) |
|
updated_msg = SlackChannelReader.replace_user_ids_with_names(str(slack_message.content)) |
|
msg = str(slack_message.user) + " : " + str(updated_msg) |
|
documents.append(Document(text=msg)) |
|
tickets = SlackChannelReader.extract_jira_tickets(updated_msg) |
|
if tickets: |
|
jira_tickets.extend(tickets) |
|
min_timestamp = min(min_timestamp, datetime.datetime.fromtimestamp(float(slack_message.timestamp))) |
|
max_timestamp = max(max_timestamp, datetime.datetime.fromtimestamp(float(slack_message.timestamp))) |
|
if SKIP_AI or (len(thread_messages) == 1 and len(thread_messages[0].content) < 50): |
|
summary = thread_messages[0].content |
|
else: |
|
qe = GPTListIndex.from_documents(documents).as_query_engine(service_context=service_context) |
|
resp = qe.query( |
|
"What is the key insight from this discussion thread, describe in less than 50 words" |
|
) |
|
summary = resp.response |
|
metadata: Optional[ThreadMetadata] = metadata_extractor.extract_metadata(summary) if not SKIP_AI else None |
|
|
|
return SlackThread(min_timestamp.isoformat(), max_timestamp.isoformat(), thread_messages[0].user, unique_users, |
|
summary, |
|
len(thread_messages), metadata, jira_tickets) |
|
|
|
|
|
def main(): |
|
start_date = datetime.datetime(2023, 1, 1) |
|
token = os.environ.get('SLACK_API_TOKEN') |
|
openai_api_key = os.environ.get('OPENAI_API_KEY') |
|
if not token: |
|
print('Please set the SLACK_API_TOKEN environment variable.') |
|
exit(1) |
|
if not openai_api_key: |
|
print('Please set the OPENAI_API_KEY environment variable.') |
|
exit(1) |
|
if len(sys.argv) < 2: |
|
print('Usage: python3 index_slack.py channel1,channel2,channel3') |
|
exit(1) |
|
channel_id_mapping = ChannelIdMapper(token) |
|
channels = sys.argv[1].split(',') |
|
for channel in channels: |
|
print(f'Indexing channel {channel} with id {channel_id_mapping.get_channel_id(channel)} from: {start_date}') |
|
reader = SlackChannelReader(token, channel_id_mapping.get_channel_id(channel), channel, openai_api_key) |
|
message_count, thread_count = reader.read_messages(start_date) |
|
print(f'Indexed {thread_count} threads created from {message_count} messages for channel {channel}') |
|
|
|
|
|
main() |
|
|