| import imaplib
|
| import time
|
| from datetime import datetime, timedelta
|
| from email import message_from_bytes
|
| from email.utils import parsedate_to_datetime
|
| from typing import Optional
|
|
|
| import requests
|
|
|
| from core.mail_utils import extract_verification_code
|
|
|
|
|
| class MicrosoftMailClient:
|
| def __init__(
|
| self,
|
| client_id: str,
|
| refresh_token: str,
|
| tenant: str = "consumers",
|
| proxy: str = "",
|
| log_callback=None,
|
| ) -> None:
|
| self.client_id = client_id
|
| self.refresh_token = refresh_token
|
| self.tenant = tenant or "consumers"
|
| self.proxies = {"http": proxy, "https": proxy} if proxy else None
|
| self.log_callback = log_callback
|
| self.email: Optional[str] = None
|
|
|
| def set_credentials(self, email: str, password: Optional[str] = None) -> None:
|
| self.email = email
|
|
|
| def _get_access_token(self) -> Optional[str]:
|
| url = f"https://login.microsoftonline.com/{self.tenant}/oauth2/v2.0/token"
|
| data = {
|
| "client_id": self.client_id,
|
| "grant_type": "refresh_token",
|
| "refresh_token": self.refresh_token,
|
| }
|
| try:
|
| res = requests.post(url, data=data, proxies=self.proxies, timeout=15)
|
| if res.status_code != 200:
|
| self._log("error", f"Microsoft token error: {res.status_code}")
|
| return None
|
| payload = res.json() if res.content else {}
|
| token = payload.get("access_token")
|
| if not token:
|
| self._log("error", "Microsoft token missing")
|
| return None
|
| return token
|
| except Exception as exc:
|
| self._log("error", f"Microsoft token exception: {exc}")
|
| return None
|
|
|
| def fetch_verification_code(self, since_time: Optional[datetime] = None) -> Optional[str]:
|
| if not self.email:
|
| return None
|
|
|
| self._log("info", "fetching verification code")
|
| token = self._get_access_token()
|
| if not token:
|
| return None
|
|
|
| auth_string = f"user={self.email}\x01auth=Bearer {token}\x01\x01".encode()
|
| client = imaplib.IMAP4_SSL("outlook.office365.com", 993)
|
| try:
|
| client.authenticate("XOAUTH2", lambda _: auth_string)
|
| except Exception as exc:
|
| self._log("error", f"IMAP auth failed: {exc}")
|
| try:
|
| client.logout()
|
| except Exception:
|
| pass
|
| return None
|
|
|
| search_since = since_time or (datetime.now() - timedelta(minutes=5))
|
|
|
| try:
|
| for mailbox in ("INBOX", "Junk"):
|
| try:
|
| status, _ = client.select(mailbox, readonly=True)
|
| if status != "OK":
|
| continue
|
| except Exception:
|
| continue
|
|
|
|
|
| status, data = client.search(None, "ALL")
|
| if status != "OK" or not data or not data[0]:
|
| continue
|
|
|
| ids = data[0].split()[-5:]
|
|
|
| for msg_id in reversed(ids):
|
| status, msg_data = client.fetch(msg_id, "(RFC822)")
|
| if status != "OK" or not msg_data:
|
| continue
|
| raw_bytes = None
|
| for item in msg_data:
|
| if isinstance(item, tuple) and len(item) > 1:
|
| raw_bytes = item[1]
|
| break
|
| if not raw_bytes:
|
| continue
|
|
|
| msg = message_from_bytes(raw_bytes)
|
| msg_date = self._parse_message_date(msg.get("Date"))
|
|
|
|
|
| if msg_date and msg_date < search_since:
|
| continue
|
|
|
| content = self._message_to_text(msg)
|
| import re
|
| match = re.search(r'[A-Z0-9]{6}', content)
|
| if match:
|
| code = match.group(0)
|
| self._log("info", f"code found in {mailbox}: {code}")
|
| return code
|
| finally:
|
| try:
|
| client.logout()
|
| except Exception:
|
| pass
|
|
|
| return None
|
|
|
| def poll_for_code(
|
| self,
|
| timeout: int = 120,
|
| interval: int = 4,
|
| since_time: Optional[datetime] = None,
|
| ) -> Optional[str]:
|
| if not self.email:
|
| return None
|
|
|
| max_retries = max(1, timeout // interval)
|
|
|
| for i in range(1, max_retries + 1):
|
| code = self.fetch_verification_code(since_time=since_time)
|
| if code:
|
| return code
|
| if i < max_retries:
|
| time.sleep(interval)
|
|
|
| self._log("error", "verification code timeout")
|
| return None
|
|
|
| @staticmethod
|
| def _message_to_text(msg) -> str:
|
| if msg.is_multipart():
|
| parts = []
|
| for part in msg.walk():
|
| content_type = part.get_content_type()
|
| if content_type not in ("text/plain", "text/html"):
|
| continue
|
| payload = part.get_payload(decode=True)
|
| if not payload:
|
| continue
|
| charset = part.get_content_charset() or "utf-8"
|
| parts.append(payload.decode(charset, errors="ignore"))
|
| return "".join(parts)
|
| payload = msg.get_payload(decode=True)
|
| if isinstance(payload, bytes):
|
| return payload.decode(msg.get_content_charset() or "utf-8", errors="ignore")
|
| return str(payload) if payload else ""
|
|
|
| @staticmethod
|
| def _parse_message_date(value: Optional[str]) -> Optional[datetime]:
|
| if not value:
|
| return None
|
| try:
|
| parsed = parsedate_to_datetime(value)
|
| if parsed is None:
|
| return None
|
| if parsed.tzinfo:
|
| return parsed.astimezone(tz=None).replace(tzinfo=None)
|
| return parsed
|
| except Exception:
|
| return None
|
|
|
| def _log(self, level: str, message: str) -> None:
|
| if self.log_callback:
|
| try:
|
| self.log_callback(level, message)
|
| except Exception:
|
| pass
|
|
|