File size: 3,442 Bytes
4304c6d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
from typing import Optional, Union

from core.app.entities.app_invoke_entities import InvokeFrom
from extensions.ext_database import db
from libs.infinite_scroll_pagination import InfiniteScrollPagination
from models.account import Account
from models.model import App, EndUser
from models.web import PinnedConversation
from services.conversation_service import ConversationService


class WebConversationService:
    @classmethod
    def pagination_by_last_id(cls, app_model: App, user: Optional[Union[Account, EndUser]],

                              last_id: Optional[str], limit: int, invoke_from: InvokeFrom,

                              pinned: Optional[bool] = None) -> InfiniteScrollPagination:
        include_ids = None
        exclude_ids = None
        if pinned is not None:
            pinned_conversations = db.session.query(PinnedConversation).filter(
                PinnedConversation.app_id == app_model.id,
                PinnedConversation.created_by_role == ('account' if isinstance(user, Account) else 'end_user'),
                PinnedConversation.created_by == user.id
            ).order_by(PinnedConversation.created_at.desc()).all()
            pinned_conversation_ids = [pc.conversation_id for pc in pinned_conversations]
            if pinned:
                include_ids = pinned_conversation_ids
            else:
                exclude_ids = pinned_conversation_ids

        return ConversationService.pagination_by_last_id(
            app_model=app_model,
            user=user,
            last_id=last_id,
            limit=limit,
            invoke_from=invoke_from,
            include_ids=include_ids,
            exclude_ids=exclude_ids,
        )

    @classmethod
    def pin(cls, app_model: App, conversation_id: str, user: Optional[Union[Account, EndUser]]):
        pinned_conversation = db.session.query(PinnedConversation).filter(
            PinnedConversation.app_id == app_model.id,
            PinnedConversation.conversation_id == conversation_id,
            PinnedConversation.created_by_role == ('account' if isinstance(user, Account) else 'end_user'),
            PinnedConversation.created_by == user.id
        ).first()

        if pinned_conversation:
            return

        conversation = ConversationService.get_conversation(
            app_model=app_model,
            conversation_id=conversation_id,
            user=user
        )

        pinned_conversation = PinnedConversation(
            app_id=app_model.id,
            conversation_id=conversation.id,
            created_by_role='account' if isinstance(user, Account) else 'end_user',
            created_by=user.id
        )

        db.session.add(pinned_conversation)
        db.session.commit()

    @classmethod
    def unpin(cls, app_model: App, conversation_id: str, user: Optional[Union[Account, EndUser]]):
        pinned_conversation = db.session.query(PinnedConversation).filter(
            PinnedConversation.app_id == app_model.id,
            PinnedConversation.conversation_id == conversation_id,
            PinnedConversation.created_by_role == ('account' if isinstance(user, Account) else 'end_user'),
            PinnedConversation.created_by == user.id
        ).first()

        if not pinned_conversation:
            return

        db.session.delete(pinned_conversation)
        db.session.commit()