| import json |
| import time |
| from typing import AsyncGenerator, Generator, List, Optional, cast, Any |
| import requests |
| import aiohttp |
| from sseclient import SSEClient |
| from pydantic import ValidationError |
| from .core.auth_manager import AuthManager |
| from .logger import setup_logger |
| from .core.types.chat import ChatResponse, ChatResponseStream, ChatMessage, MessageRole |
| from .resources.completions import Completion |
| from .resources.new_chat import NewChat |
| from .utils.promp_system import WEB_DEVELOPMENT_PROMPT |
| from .core.exceptions import QwenAPIError |
| from .core.types.response.function_tool import ToolCall, Function |
|
|
|
|
| class Qwen: |
| def __init__( |
| self, |
| email: str, |
| password: str, |
| api_key: Optional[str] = None, |
| cookie: Optional[str] = None, |
| base_url: str = "https://chat.qwen.ai", |
| timeout: int = 600, |
| log_level: str = "INFO", |
| save_logs: bool = False, |
| ): |
| self.new_chat = NewChat(self) |
| self.chat = Completion(self) |
| self.timeout = timeout |
| self.auth = AuthManager(email, password, token=api_key, cookie=cookie) |
| self.logger = setup_logger(log_level=log_level, save_logs=save_logs) |
| self.base_url = base_url |
| self._active_sessions = [] |
| self._is_cancelled = False |
|
|
| def _build_headers(self) -> dict: |
| return { |
| "Content-Type": "application/json", |
| |
| "Cookie": self.auth.get_cookie(), |
| "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36", |
| "Host": "chat.qwen.ai", |
| "Origin": "https://chat.qwen.ai", |
| } |
|
|
| def _build_payload( |
| self, |
| chat_id: str, |
| messages: List[ChatMessage], |
| temperature: float, |
| model: str, |
| stream: bool, |
| max_tokens: Optional[int], |
| ) -> dict: |
| validated_messages = [] |
|
|
| for msg in messages: |
| if isinstance(msg, dict): |
| try: |
| validated_msg = ChatMessage(**msg) |
| except ValidationError as e: |
| raise QwenAPIError(f"Error validating message: {e}") |
| else: |
| validated_msg = msg |
|
|
| if validated_msg.role == "system": |
| if ( |
| validated_msg.web_development |
| and validated_msg.content |
| and WEB_DEVELOPMENT_PROMPT not in validated_msg.content |
| ): |
| updated_content = ( |
| f"{validated_msg.content}\n\n{WEB_DEVELOPMENT_PROMPT}" |
| ) |
| validated_msg = ChatMessage( |
| **{**validated_msg.model_dump(), "content": updated_content} |
| ) |
|
|
| validated_messages.append( |
| { |
| "role": ( |
| MessageRole.FUNCTION |
| if validated_msg.role == MessageRole.TOOL |
| else ( |
| validated_msg.role |
| if validated_msg.role == MessageRole.SYSTEM |
| else MessageRole.USER |
| ) |
| ), |
| "content": ( |
| validated_msg.blocks[0].text |
| if len(validated_msg.blocks) == 1 |
| and validated_msg.blocks[0].block_type == "text" |
| else [ |
| ( |
| {"type": "text", "text": block.text} |
| if block.block_type == "text" |
| else ( |
| {"type": "image", "image": str(block.url)} |
| if block.block_type == "image" |
| else {"type": block.block_type} |
| ) |
| ) |
| for block in validated_msg.blocks |
| ] |
| ), |
| "chat_type": ( |
| "artifacts" |
| if getattr(validated_msg, "web_development", False) |
| else ( |
| "search" |
| if getattr(validated_msg, "web_search", False) |
| else "t2t" |
| ) |
| ), |
| "feature_config": { |
| "thinking_enabled": getattr(validated_msg, "thinking", False), |
| "thinking_budget": getattr(validated_msg, "thinking_budget", 0), |
| "output_schema": getattr(validated_msg, "output_schema", None), |
| }, |
| "extra": {}, |
| } |
| ) |
|
|
| return { |
| "stream": stream, |
| "incremental_output": True, |
| "chat_id": chat_id, |
| "chat_mode": "normal", |
| "model": model, |
| "parent_id": None, |
| "messages": validated_messages, |
| "temperature": temperature, |
| "max_tokens": max_tokens, |
| "timestamp": int(time.time() * 1000) |
| } |
|
|
| def _process_response(self, response: requests.Response) -> ChatResponse: |
| from .core.types.chat import Choice, Message, Extra |
|
|
| client = SSEClient(cast(Any, response)) |
| extra = None |
| text = "" |
| cnt_events=0 |
| |
| for event in client.events(): |
| cnt_events += cnt_events |
| if event.data: |
| try: |
| data = json.loads(event.data) |
| if data["choices"][0]["delta"].get("role") == "function": |
| extra_data = data["choices"][0]["delta"].get("extra") |
| if extra_data: |
| extra = Extra(**extra_data) |
| text += data["choices"][0]["delta"].get("content") |
| except json.JSONDecodeError: |
| continue |
| |
| if cnt_events == 0: |
| |
| res = response.json() |
| data = res["data"] |
| if data["choices"][0]["message"].get("role") == "function": |
| extra_data = data["choices"][0]["message"].get("extra") |
| if extra_data: |
| extra = Extra(**extra_data) |
| text += data["choices"][0]["message"].get("content") |
| |
| message = Message(role="assistant", content=text) |
| choice = Choice(message=message, extra=extra) |
| return ChatResponse(choices=choice) |
|
|
| def _process_response_tool( |
| self, response: requests.Response |
| ) -> ChatResponse | QwenAPIError: |
| from .core.types.chat import Choice, Message, Extra |
|
|
| client = SSEClient(cast(Any, response)) |
| extra = None |
| text = "" |
| for event in client.events(): |
| if event.data: |
| try: |
| data = json.loads(event.data) |
| if data["choices"][0]["delta"].get("role") == "function": |
| extra_data = data["choices"][0]["delta"].get("extra") |
| if extra_data: |
| extra = Extra(**extra_data) |
| text += data["choices"][0]["delta"].get("content") |
| except json.JSONDecodeError: |
| continue |
| try: |
| self.logger.debug(f"text: {text}") |
| parse_json = json.loads(text) |
| if isinstance(parse_json["arguments"], str): |
| parse_arguments = json.loads(parse_json["arguments"]) |
| else: |
| parse_arguments = parse_json["arguments"] |
| self.logger.debug(f"parse_json: {parse_json}") |
| self.logger.debug(f"arguments: {parse_arguments}") |
| function = Function(name=parse_json["name"], arguments=parse_arguments) |
| message = Message( |
| role="assistant", content="", tool_calls=[ToolCall(function=function)] |
| ) |
| choice = Choice(message=message, extra=extra) |
| return ChatResponse(choices=choice) |
| except json.JSONDecodeError as e: |
| return QwenAPIError(f"Error decoding JSON response: {e}") |
|
|
| async def _process_aresponse( |
| self, response: aiohttp.ClientResponse, session: aiohttp.ClientSession |
| ) -> ChatResponse: |
| from .core.types.chat import Choice, Message, Extra |
|
|
| |
| self._active_sessions.append(session) |
|
|
| try: |
| extra = None |
| text = "" |
| async for line in response.content: |
| |
| if self._is_cancelled: |
| self.logger.info("Async response processing cancelled") |
| break |
|
|
| if line.startswith(b"data:"): |
| try: |
| data = json.loads(line[5:].decode()) |
| if data["choices"][0]["delta"].get("role") == "function": |
| extra_data = data["choices"][0]["delta"].get("extra") |
| if extra_data: |
| extra = Extra(**extra_data) |
| text += data["choices"][0]["delta"].get("content") |
| except json.JSONDecodeError: |
| continue |
| message = Message(role="assistant", content=text) |
| choice = Choice(message=message, extra=extra) |
| return ChatResponse(choices=choice) |
| except aiohttp.ClientError as e: |
| self.logger.error(f"Client error: {e}") |
| raise |
|
|
| finally: |
| |
| if session in self._active_sessions: |
| self._active_sessions.remove(session) |
| await session.close() |
|
|
| async def _process_aresponse_tool( |
| self, response: aiohttp.ClientResponse, session: aiohttp.ClientSession |
| ) -> ChatResponse | QwenAPIError: |
| from .core.types.chat import Choice, Message, Extra |
|
|
| |
| self._active_sessions.append(session) |
|
|
| try: |
| extra = None |
| text = "" |
| async for line in response.content: |
| |
| if self._is_cancelled: |
| self.logger.info("Async tool response processing cancelled") |
| break |
|
|
| if line.startswith(b"data:"): |
| try: |
| data = json.loads(line[5:].decode()) |
| if data["choices"][0]["delta"].get("role") == "function": |
| extra_data = data["choices"][0]["delta"].get("extra") |
| if extra_data: |
| extra = Extra(**extra_data) |
| text += data["choices"][0]["delta"].get("content") |
| except json.JSONDecodeError: |
| continue |
| try: |
| self.logger.debug(f"text: {text}") |
| parse_json = json.loads(text) |
| if isinstance(parse_json["arguments"], str): |
| parse_arguments = json.loads(parse_json["arguments"]) |
| else: |
| parse_arguments = parse_json["arguments"] |
| self.logger.debug(f"parse_json: {parse_json}") |
| self.logger.debug(f"arguments: {parse_arguments}") |
| function = Function(name=parse_json["name"], arguments=parse_arguments) |
| message = Message( |
| role="assistant", |
| content="", |
| tool_calls=[ToolCall(function=function)], |
| ) |
| choice = Choice(message=message, extra=extra) |
| return ChatResponse(choices=choice) |
| except json.JSONDecodeError as e: |
| self.logger.error(f"Error decoding JSON response: {e}") |
| return QwenAPIError(f"Error decoding JSON response: {e}") |
|
|
| except aiohttp.ClientError as e: |
| self.logger.error(f"Client error: {e}") |
| raise |
|
|
| finally: |
| |
| if session in self._active_sessions: |
| self._active_sessions.remove(session) |
| await session.close() |
|
|
| def _process_stream( |
| self, response: requests.Response |
| ) -> Generator[ChatResponseStream, None, None]: |
| |
| client = SSEClient(cast(Any, response)) |
| content = "" |
| for event in client.events(): |
| |
| if self._is_cancelled: |
| self.logger.info("Stream processing cancelled") |
| break |
|
|
| if event.data: |
| try: |
| data = json.loads(event.data) |
| if not "choices" in data: |
| continue |
| |
| content += data["choices"][0]["delta"].get("content") |
| yield ChatResponseStream( |
| **data, |
| message=ChatMessage( |
| role=data["choices"][0]["delta"].get("role"), |
| content=content, |
| ), |
| ) |
| except json.JSONDecodeError: |
| continue |
|
|
| async def _process_astream( |
| self, response: aiohttp.ClientResponse, session: aiohttp.ClientSession |
| ) -> AsyncGenerator[ChatResponseStream, None]: |
| |
| self._active_sessions.append(session) |
|
|
| try: |
| content = "" |
| import asyncio |
|
|
| |
| async def read_content(): |
| async for line in response.content: |
| if self._is_cancelled: |
| break |
| yield line |
|
|
| |
| async for line in read_content(): |
| |
| if self._is_cancelled: |
| self.logger.info("Async stream processing cancelled") |
| break |
|
|
| if line.startswith(b"data:"): |
| try: |
| data = json.loads(line[5:].decode()) |
| content += data["choices"][0]["delta"].get("content") |
|
|
| |
| yield ChatResponseStream( |
| **data, |
| message=ChatMessage( |
| role=data["choices"][0]["delta"].get("role"), |
| content=content, |
| ), |
| ) |
|
|
| |
| await asyncio.sleep(0) |
|
|
| except json.JSONDecodeError: |
| continue |
|
|
| except (aiohttp.ClientError, asyncio.CancelledError) as e: |
| if isinstance(e, asyncio.CancelledError): |
| self.logger.info("Stream was cancelled") |
| else: |
| self.logger.error(f"Client error: {e}") |
| |
| if not isinstance(e, asyncio.CancelledError): |
| raise |
|
|
| finally: |
| self.logger.debug(f"Closing session") |
| |
| if session in self._active_sessions: |
| self._active_sessions.remove(session) |
|
|
| |
| if not session.closed: |
| await session.close() |
|
|
| def cancel(self): |
| """ |
| Cancel all active requests and close connections. |
| """ |
| self._is_cancelled = True |
| self.logger.info("Cancelling all active requests...") |
|
|
| |
| for session in self._active_sessions[ |
| : |
| ]: |
| try: |
| if hasattr(session, "close") and not session.closed: |
| |
| if hasattr(session, "_connector") and session._connector: |
| |
| session._connector._ssl_shutdown_timeout = 0.1 |
| session._connector.close() |
|
|
| |
| import asyncio |
|
|
| if asyncio.iscoroutinefunction(session.close): |
| |
| try: |
| loop = asyncio.get_running_loop() |
| task = loop.create_task(session.close()) |
| |
| task.cancel() |
| except RuntimeError: |
| |
| pass |
|
|
| self.logger.debug(f"Session {id(session)} marked for closure") |
| except Exception as e: |
| |
| if "SSL shutdown timed out" not in str( |
| e |
| ) and "CancelledError" not in str(e): |
| self.logger.warning(f"Error closing session {id(session)}: {e}") |
|
|
| |
| self._active_sessions.clear() |
| self.logger.info("All active sessions cancelled") |
|
|
| def close(self): |
| """ |
| Close the client and clean up resources. |
| """ |
| self.cancel() |
| self.logger.info("Qwen client closed") |
|
|
| def __enter__(self): |
| """Context manager entry.""" |
| return self |
|
|
| def __exit__(self, exc_type, exc_val, exc_tb): |
| """Context manager exit.""" |
| self.close() |
|
|