File size: 3,231 Bytes
e9dd2b9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
from pyrogram import types
from typing import Union, Optional, AsyncGenerator
from utils import temp
from info import SESSION, API_ID, API_HASH, BOT_TOKEN, LOG_STR
from database.users_chats_db import db
from database.ia_filterdb import Media
from pyrogram.raw.all import layer
from pyrogram import Client, __version__
import logging
import logging.config
# Get logging configurations
logging.config.fileConfig('logging.conf')
logging.getLogger().setLevel(logging.INFO)
logging.getLogger("pyrogram").setLevel(logging.ERROR)
logging.getLogger("imdbpy").setLevel(logging.ERROR)
class Bot(Client):
def __init__(self):
super().__init__(
name=SESSION,
api_id=API_ID,
api_hash=API_HASH,
bot_token=BOT_TOKEN,
workers=50,
plugins={"root": "plugins"},
sleep_threshold=5,
)
async def start(self):
b_users, b_chats = await db.get_banned()
temp.BANNED_USERS = b_users
temp.BANNED_CHATS = b_chats
await super().start()
await Media.ensure_indexes()
me = await self.get_me()
temp.ME = me.id
temp.U_NAME = me.username
temp.B_NAME = me.first_name
self.username = f'@{me.username}'
logging.info(
f"{me.first_name} with for Pyrogram v{__version__} (Layer {layer}) started on {me.username}.")
logging.info(LOG_STR)
async def iter_messages(
self,
chat_id: Union[int, str],
limit: int,
offset: int = 0,
) -> Optional[AsyncGenerator["types.Message", None]]:
"""Iterate through a chat sequentially.
This convenience method does the same as repeatedly calling :meth:`~pyrogram.Client.get_messages` in a loop, thus saving
you from the hassle of setting up boilerplate code. It is useful for getting the whole chat messages with a
single call.
Parameters:
chat_id (``int`` | ``str``):
Unique identifier (int) or username (str) of the target chat.
For your personal cloud (Saved Messages) you can simply use "me" or "self".
For a contact that exists in your Telegram address book you can use his phone number (str).
limit (``int``):
Identifier of the last message to be returned.
offset (``int``, *optional*):
Identifier of the first message to be returned.
Defaults to 0.
Returns:
``Generator``: A generator yielding :obj:`~pyrogram.types.Message` objects.
Example:
.. code-block:: python
for message in app.iter_messages("pyrogram", 1, 15000):
print(message.text)
"""
current = offset
while True:
new_diff = min(200, limit - current)
if new_diff <= 0:
return
messages = await self.get_messages(chat_id, list(range(current, current+new_diff+1)))
for message in messages:
yield message
current += 1
async def stop(self, *args):
await super().stop()
logging.info("Bot stopped. Bye.")
app = Bot()
app.run()
|