| | import base64 |
| | import json |
| | import re |
| | from collections.abc import Iterator |
| | from json.decoder import JSONDecodeError |
| | from typing import Any |
| |
|
| | from google.auth.exceptions import RefreshError |
| | from google.oauth2.credentials import Credentials |
| | from googleapiclient.discovery import build |
| | from langchain_core.chat_sessions import ChatSession |
| | from langchain_core.messages import HumanMessage |
| | from langchain_google_community.gmail.loader import GMailLoader |
| | from loguru import logger |
| |
|
| | from langflow.custom import Component |
| | from langflow.inputs import MessageTextInput |
| | from langflow.io import SecretStrInput |
| | from langflow.schema import Data |
| | from langflow.template import Output |
| |
|
| |
|
| | class GmailLoaderComponent(Component): |
| | display_name = "Gmail Loader" |
| | description = "Loads emails from Gmail using provided credentials." |
| | icon = "Google" |
| |
|
| | inputs = [ |
| | SecretStrInput( |
| | name="json_string", |
| | display_name="JSON String of the Service Account Token", |
| | info="JSON string containing OAuth 2.0 access token information for service account access", |
| | required=True, |
| | value="""{ |
| | "account": "", |
| | "client_id": "", |
| | "client_secret": "", |
| | "expiry": "", |
| | "refresh_token": "", |
| | "scopes": [ |
| | "https://www.googleapis.com/auth/gmail.readonly", |
| | ], |
| | "token": "", |
| | "token_uri": "https://oauth2.googleapis.com/token", |
| | "universe_domain": "googleapis.com" |
| | }""", |
| | ), |
| | MessageTextInput( |
| | name="label_ids", |
| | display_name="Label IDs", |
| | info="Comma-separated list of label IDs to filter emails.", |
| | required=True, |
| | value="INBOX,SENT,UNREAD,IMPORTANT", |
| | ), |
| | MessageTextInput( |
| | name="max_results", |
| | display_name="Max Results", |
| | info="Maximum number of emails to load.", |
| | required=True, |
| | value="10", |
| | ), |
| | ] |
| |
|
| | outputs = [ |
| | Output(display_name="Data", name="data", method="load_emails"), |
| | ] |
| |
|
| | def load_emails(self) -> Data: |
| | class CustomGMailLoader(GMailLoader): |
| | def __init__( |
| | self, creds: Any, *, n: int = 100, label_ids: list[str] | None = None, raise_error: bool = False |
| | ) -> None: |
| | super().__init__(creds, n, raise_error) |
| | self.label_ids = label_ids if label_ids is not None else ["SENT"] |
| |
|
| | def clean_message_content(self, message): |
| | |
| | message = re.sub(r"http\S+|www\S+|https\S+", "", message, flags=re.MULTILINE) |
| |
|
| | |
| | message = re.sub(r"\S+@\S+", "", message) |
| |
|
| | |
| | message = re.sub(r"[^A-Za-z0-9\s]+", " ", message) |
| | message = re.sub(r"\s{2,}", " ", message) |
| |
|
| | |
| | return message.strip() |
| |
|
| | def _extract_email_content(self, msg: Any) -> HumanMessage: |
| | from_email = None |
| | for values in msg["payload"]["headers"]: |
| | name = values["name"] |
| | if name == "From": |
| | from_email = values["value"] |
| | if from_email is None: |
| | msg = "From email not found." |
| | raise ValueError(msg) |
| |
|
| | parts = msg["payload"]["parts"] if "parts" in msg["payload"] else [msg["payload"]] |
| |
|
| | for part in parts: |
| | if part["mimeType"] == "text/plain": |
| | data = part["body"]["data"] |
| | data = base64.urlsafe_b64decode(data).decode("utf-8") |
| | pattern = re.compile(r"\r\nOn .+(\r\n)*wrote:\r\n") |
| | newest_response = re.split(pattern, data)[0] |
| | return HumanMessage( |
| | content=self.clean_message_content(newest_response), |
| | additional_kwargs={"sender": from_email}, |
| | ) |
| | msg = "No plain text part found in the email." |
| | raise ValueError(msg) |
| |
|
| | def _get_message_data(self, service: Any, message: Any) -> ChatSession: |
| | msg = service.users().messages().get(userId="me", id=message["id"]).execute() |
| | message_content = self._extract_email_content(msg) |
| |
|
| | in_reply_to = None |
| | email_data = msg["payload"]["headers"] |
| | for values in email_data: |
| | name = values["name"] |
| | if name == "In-Reply-To": |
| | in_reply_to = values["value"] |
| |
|
| | thread_id = msg["threadId"] |
| |
|
| | if in_reply_to: |
| | thread = service.users().threads().get(userId="me", id=thread_id).execute() |
| | messages = thread["messages"] |
| |
|
| | response_email = None |
| | for _message in messages: |
| | email_data = _message["payload"]["headers"] |
| | for values in email_data: |
| | if values["name"] == "Message-ID": |
| | message_id = values["value"] |
| | if message_id == in_reply_to: |
| | response_email = _message |
| | if response_email is None: |
| | msg = "Response email not found in the thread." |
| | raise ValueError(msg) |
| | starter_content = self._extract_email_content(response_email) |
| | return ChatSession(messages=[starter_content, message_content]) |
| | return ChatSession(messages=[message_content]) |
| |
|
| | def lazy_load(self) -> Iterator[ChatSession]: |
| | service = build("gmail", "v1", credentials=self.creds) |
| | results = ( |
| | service.users().messages().list(userId="me", labelIds=self.label_ids, maxResults=self.n).execute() |
| | ) |
| | messages = results.get("messages", []) |
| | if not messages: |
| | logger.warning("No messages found with the specified labels.") |
| | for message in messages: |
| | try: |
| | yield self._get_message_data(service, message) |
| | except Exception: |
| | if self.raise_error: |
| | raise |
| | else: |
| | logger.exception(f"Error processing message {message['id']}") |
| |
|
| | json_string = self.json_string |
| | label_ids = self.label_ids.split(",") if self.label_ids else ["INBOX"] |
| | max_results = int(self.max_results) if self.max_results else 100 |
| |
|
| | |
| | try: |
| | token_info = json.loads(json_string) |
| | except JSONDecodeError as e: |
| | msg = "Invalid JSON string" |
| | raise ValueError(msg) from e |
| |
|
| | creds = Credentials.from_authorized_user_info(token_info) |
| |
|
| | |
| | loader = CustomGMailLoader(creds=creds, n=max_results, label_ids=label_ids) |
| |
|
| | try: |
| | docs = loader.load() |
| | except RefreshError as e: |
| | msg = "Authentication error: Unable to refresh authentication token. Please try to reauthenticate." |
| | raise ValueError(msg) from e |
| | except Exception as e: |
| | msg = f"Error loading documents: {e}" |
| | raise ValueError(msg) from e |
| |
|
| | |
| | self.status = docs |
| | return Data(data={"text": docs}) |
| |
|