LiKenun's picture
Refactor #5
a1a6d79
from datetime import datetime
from json import dumps
from pydantic import BaseModel, ConfigDict, PositiveInt
from types import MappingProxyType
from typing import Any, Literal, Mapping, Optional, Self
from .base import Chunk, Content
class SlackEventPayload(BaseModel):
"""Represents a general event payload from Slack."""
model_config = ConfigDict(extra='allow', frozen=True)
type: str
event_ts: str
class SlackEvent(BaseModel):
"""Represents a general event from Slack."""
model_config = ConfigDict(frozen=True)
token: str
team_id: str
api_app_id: str
event: SlackEventPayload
type: str
event_id: str
event_time: int
authed_users: tuple[str, ...]
class SlackUserTimestampPair(BaseModel):
"""Represents a Slack user-timestamp pair."""
model_config = ConfigDict(frozen=True)
user: str
ts: str
class SlackReaction(BaseModel):
"""Represents a Slack reaction information."""
model_config = ConfigDict(frozen=True)
name: str
count: PositiveInt
users: tuple[str, ...]
class SlackMessage(Content):
"""Represents a message from Slack after adaptation."""
type: Literal["app_mention", "message"]
subtype: Optional[str] = None
channel: str
channel_type: Optional[str] = None
user: Optional[str] = None
bot_id: Optional[str] = None
thread_ts: Optional[str] = None
text: str
ts: str
edited: Optional[SlackUserTimestampPair] = None
event_ts: str
deleted_ts: Optional[str] = None
hidden: bool = False
is_starred: Optional[bool] = None
pinned_to: Optional[tuple[str, ...]] = None
reactions: Optional[tuple[SlackReaction, ...]] = None
def get_id(self: Self) -> str:
"""Unique identifier for this message."""
return f"slack-message:{self.channel}:{self.ts}"
def get_chunks(self: Self) -> tuple[Chunk]:
return (Chunk(text=self.text, parent_id=self.get_id(), chunk_id="", metadata=self.get_metadata()), )
def get_metadata(self: Self) -> Mapping[str, Any]:
return MappingProxyType({
"modificationTime": datetime.fromtimestamp(float(self.ts))
})
class SlackResponse(BaseModel): # TODO: This should also be based on Content as it is a SlackMessage―just not one for which we know the identity yet.
"""Represents a response message to be sent to Slack."""
model_config = ConfigDict(frozen=True)
text: str
channel: Optional[str]
thread_ts: Optional[str] = None