File size: 12,853 Bytes
a31ba66
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
218f81b
a31ba66
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
218f81b
 
a31ba66
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
218f81b
 
a31ba66
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
218f81b
 
a31ba66
 
 
 
 
 
 
 
 
 
218f81b
a31ba66
218f81b
a31ba66
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
218f81b
a31ba66
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
import datetime
import functools
import json
import os
import shelve
import sys
from typing import List, Optional
import re
import openai
import slack_sdk
from llama_index import Document, GPTListIndex, ServiceContext
from llama_index.llms import OpenAI
from ratelimit import limits, sleep_and_retry

import csv
from channel_id_mapper import ChannelIdMapper
from data_models import GenericMessage
from metadata_extracter import MetadataExtractor, ThreadMetadata

SKIP_AI = False
gpt_turbo: OpenAI = OpenAI(temperature=0, model="gpt-3.5-turbo")
service_context = ServiceContext.from_defaults(llm=gpt_turbo, chunk_size=1024)
metadata_extractor = MetadataExtractor(gpt_turbo)


@functools.lru_cache(maxsize=1024)
def cached_shelve_read(key):
    with shelve.open("mydata.db") as db:
        return db.get(key, None)


def cached_shelve_write(key, value):
    with shelve.open("mydata.db") as db:
        db[key] = value
    cached_shelve_read.cache_clear()


def load_mapping_from_json(filepath):
    if not os.path.exists(filepath):
        return {}
    with open(filepath, 'r') as file:
        return json.load(file)


userIdMapping = load_mapping_from_json('user_id_to_name_mapping.json')

def map_user(user_id):
    return userIdMapping.get(user_id, user_id)

class SlackThread:
    def __init__(self, min_ts: str, max_ts: str, user: str, unique_users: set[str], summary: str, message_count: int,
                 detected_metadata: Optional[ThreadMetadata], jira_tickets: list[str] = None):
        self.min_ts = min_ts
        self.max_ts = max_ts
        self.user = user
        self.unique_users = unique_users
        self.unique_users_count = len(unique_users)
        self.summary = SlackChannelReader.replace_user_ids_with_names_in_summary(summary)
        self.message_count = message_count
        self.positive_topics: list[str] = detected_metadata.list_of_positive_topics if detected_metadata else []
        self.negative_topics: list[str] = detected_metadata.list_of_negative_topics if detected_metadata else []
        self.overall_sentiment: str = detected_metadata.overall_sentiment if detected_metadata else None
        self.jira_tickets: list[str] = jira_tickets if jira_tickets else []


class SlackChannelReader:
    def __init__(self, token, channel_id, channel_name, openai_api_key):
        self.client = slack_sdk.WebClient(token=token)
        self.channel_id = channel_id
        self.channel_name = channel_name
        openai.api_key = openai_api_key

    @staticmethod
    def replace_user_ids_with_names(text):
        def replacer(match):
            user_id = match.group(1)
            return userIdMapping.get(user_id, match.group(0))

        return re.sub(r'<@(\w+)>', replacer, text)

    @staticmethod
    def replace_user_ids_with_names_in_summary(summary):
        # This is a workaround, better to replace ahead of time
        words = summary.split()

        # Translate each word if it exists in the dictionary, otherwise keep the word as-is
        translated_words = [userIdMapping.get(word, word) for word in words]

        # Join the translated words back into a single string
        translated_text = ' '.join(translated_words)

        return translated_text

    @staticmethod
    def extract_jira_tickets(text: str):
        # Define the pattern for JIRA tickets. Assuming that project names do not contain numbers.
        jira_ticket_pattern = r'\b[A-Z]+-\d+\b'
        # Use findall to get all occurrences of the pattern
        tickets = re.findall(jira_ticket_pattern, text)
        return tickets

    def save_messages(self, db: shelve.Shelf, msg_csv_file: str):
        count = 0
        with open(msg_csv_file, 'w', newline='') as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow(
                ['thread_id', 'timestamp', 'user', 'content'])
            for thread_ts, thread_messages in db.items():
                writer.writerow([thread_ts, thread_messages[0].timestamp, thread_messages[0].user,
                                 thread_messages[0].content, thread_messages[0].jira_tickets])
                count += 1
        print(f"Saved {count} messages for channel {self.channel_name} to csv")

    def summarize_threads(self, thread_id_set: set):
        thread_messages_file = f'csv/{self.channel_name}_messages.csv'
        if not os.path.exists(thread_messages_file):
            print(f"Thread messages file not exists, collecting thread messages for channel {self.channel_name}")
            with open(thread_messages_file, 'w', newline='') as csvfile:
                writer = csv.writer(csvfile)
                writer.writerow(
                    ['thread_ts', 'messages_json'])
                thread_count = 0
                for thread_ts in thread_id_set:
                    thread_messages = self.read_thread_messages(self.channel_id, thread_ts)
                    message_dicts = [msg.__dict__ for msg in thread_messages]
                    writer.writerow([thread_ts, json.dumps(message_dicts)])
                    thread_count += 1
                    if thread_count % 100 == 0:
                        print(f"Collected messages for {thread_count} threads")
                print(f"Collecting thread messages completed for : {len(thread_id_set)} threads")

        # From file read messages and generate threads to another CSV
        messages_count = 0
        with open(f'csv/{self.channel_name}.csv', 'w', newline='') as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow(
                ['Start Time', 'Last Updated', 'Originator', 'Unique Users', 'Unique Users Count', 'Summary',
                 'Message Count',
                 'Positive Topics',
                 'Negative Topics', 'Overall Sentiment'])
            max_thread_size = 0
            thread_count = 0
            with open(thread_messages_file, 'r', newline='', encoding='utf-8') as csvfile_msgs:
                csv_reader = csv.DictReader(csvfile_msgs)
                for row in csv_reader:
                    thread_ts = str(row['thread_ts']).strip()
                    thread_messages_dict = json.loads(str(row['messages_json']))
                    thread_messages: list[GenericMessage] = [GenericMessage(map_user(d['user']), d['timestamp'], d['content']) for d
                                                             in thread_messages_dict]
                    messages_count += len(thread_messages)
                    print(f"Summarizing thread {thread_ts} with {len(thread_messages)} messages")
                    slack_thread = self._get_thread_summary(thread_messages)
                    max_thread_size = max(max_thread_size, len(thread_messages))
                    if slack_thread:
                        writer.writerow(slack_thread.__dict__.values())
                        thread_count += 1
            print(f"Max thread size: {max_thread_size}")
            return messages_count

    def read_messages(self, start_date):
        threads_file = f'csv/{self.channel_name}_threads.csv'
        if not os.path.exists(threads_file):
            print(f"Threads file not exists, collecting thread ids for channel {self.channel_name}")
            start_timestamp = str(start_date.timestamp())
            cursor = None
            has_more = True
            thread_id_set = set()
            while has_more:
                response_data = self.read_slack_conversations(cursor, start_timestamp).data
                for response in response_data["messages"]:
                    if "thread_ts" in response:
                        thread_id_set.add(response["thread_ts"])
                has_more = response_data["has_more"]
                if has_more:
                    cursor = response_data["response_metadata"]["next_cursor"]
            print(f"Collected {len(thread_id_set)} thread ids for channel {self.channel_name}")
            with open(threads_file, 'w') as f:
                for element in thread_id_set:
                    f.write(element + '\n')
        with open(threads_file) as f:
            thread_id_set = set(f.readlines())
            message_count = self.summarize_threads(thread_id_set)
        return message_count, len(thread_id_set)

    @sleep_and_retry
    @limits(calls=1, period=1)
    def read_slack_conversations(self, cursor, start_timestamp):
        response = self.client.conversations_history(
            channel=self.channel_id,
            limit=1000,
            oldest=start_timestamp,
            cursor=cursor
        )
        return response

    def read_thread_messages(self, channel_id, thread_ts) -> List[GenericMessage]:
        all_messages: list[GenericMessage] = []
        next_cursor = None
        has_more = True
        while has_more:
            # Call the conversations.replies method using the WebClient
            result = self.read_slack_thread_messages(channel_id, next_cursor, thread_ts)

            messages = result['messages']
            for message in messages:
                try:
                    if 'user' in message:
                        all_messages.append(GenericMessage(map_user(message['user']), message['ts'], message['text']))
                    elif 'subtype' in message:
                        all_messages.append(GenericMessage(message['subtype'], message['ts'], message['text']))
                    else:
                        print(f"Unknown message type: {message}")
                except KeyError:
                    print(f"KeyError for message: {message}")
            has_more = result["has_more"]
            if has_more:
                next_cursor = result["response_metadata"]["next_cursor"]
        return all_messages

    @sleep_and_retry
    @limits(calls=2, period=1)
    def read_slack_thread_messages(self, channel_id, next_cursor, thread_ts):
        result = self.client.conversations_replies(
            channel=channel_id,
            ts=thread_ts,
            cursor=next_cursor
        )
        return result

    @staticmethod
    def _get_thread_summary(thread_messages: List[GenericMessage]) -> Optional[SlackThread]:

        if len(thread_messages) == 1:
            return None
        documents = []
        unique_users = set()
        jira_tickets = []
        min_timestamp = datetime.datetime.now()
        max_timestamp = datetime.datetime(1970, 1, 1)
        for slack_message in thread_messages:
            unique_users.add(slack_message.user)
            updated_msg = SlackChannelReader.replace_user_ids_with_names(str(slack_message.content))
            msg = str(slack_message.user) + " : " + str(updated_msg)
            documents.append(Document(text=msg))
            tickets = SlackChannelReader.extract_jira_tickets(updated_msg)
            if tickets:
                jira_tickets.extend(tickets)
            min_timestamp = min(min_timestamp, datetime.datetime.fromtimestamp(float(slack_message.timestamp)))
            max_timestamp = max(max_timestamp, datetime.datetime.fromtimestamp(float(slack_message.timestamp)))
        if SKIP_AI or (len(thread_messages) == 1 and len(thread_messages[0].content) < 50):
            summary = thread_messages[0].content
        else:
            qe = GPTListIndex.from_documents(documents).as_query_engine(service_context=service_context)
            resp = qe.query(
                "What is the key insight from this discussion thread, describe in less than 50 words"
            )
            summary = resp.response
        metadata: Optional[ThreadMetadata] = metadata_extractor.extract_metadata(summary) if not SKIP_AI else None

        return SlackThread(min_timestamp.isoformat(), max_timestamp.isoformat(), thread_messages[0].user, unique_users,
                           summary,
                           len(thread_messages), metadata, jira_tickets)


def main():
    start_date = datetime.datetime(2023, 1, 1)
    token = os.environ.get('SLACK_API_TOKEN')
    openai_api_key = os.environ.get('OPENAI_API_KEY')
    if not token:
        print('Please set the SLACK_API_TOKEN environment variable.')
        exit(1)
    if not openai_api_key:
        print('Please set the OPENAI_API_KEY environment variable.')
        exit(1)
    if len(sys.argv) < 2:
        print('Usage: python3 index_slack.py channel1,channel2,channel3')
        exit(1)
    channel_id_mapping = ChannelIdMapper(token)
    channels = sys.argv[1].split(',')
    for channel in channels:
        print(f'Indexing channel {channel} with id {channel_id_mapping.get_channel_id(channel)} from: {start_date}')
        reader = SlackChannelReader(token, channel_id_mapping.get_channel_id(channel), channel, openai_api_key)
        message_count, thread_count = reader.read_messages(start_date)
        print(f'Indexed {thread_count} threads created from {message_count} messages for channel {channel}')


main()