Ахой кибер странники! Данный материал пригодится многим админам чатов и просто тем, у кого каждое слово в диалоге на вес золота. Не лишним будет и тем, кто просто хочет быть в курсе всего, что происходит в Телеграм во время отсутствия онлайн.
Для работы чудо-машины не потребуется скачивать какой либо сторонний клиент Телеграм, реализация способа будет произведена полностью нами.
Как смотреть удаленные и отредактированные сообщения в Телеграм
Следуйте указанной ниже инструкции и у вас получится архивировать все сообщени я, которые когда либо были удалены или поменяны:
- Активируем Telegram Premium, если его у вас еще нет.
- Создаем бота в https://t.me/BotFather и включаем в нём Business mode:
- Подключаем своего бота к профилю Telegram и выбираем чаты, где он будет работать:
Прописываем код ниже на Python
- Устанавливаем Python. https://www.python.org/downloads/ (не забываем поставить галку “ADD TO PATH” при установке);
- Открываем консоль и устанавливаем необходимые модули;
pip install aiogram pip install pydantic
- Создаем файл
"main.py
“ и вставляем код:
import configparser import json from typing import Union import asyncio from aiogram import (Router, Bot, Dispatcher, F, types) import logging from database import Messagesx router = Router(name=__name__) logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) config = configparser.ConfigParser() config.read("config.ini") TOKEN = config["main"]["bot_token"] USER_ID = config["main"]["user"] async def send_msg(message_old: str, message_new: Union[str, None], user_fullname: str, user_id: int, bot: Bot = None): if message_new is None: msg = (f' <b>Пользователь {user_fullname} ({user_id})</b>\n' f' <b>Сообщение удалено:</b>\n' f' Сообщение:\n<code>{message_old}</code>\n') else: msg = (f' <b>Пользователь {user_fullname} ({user_id})</b>\n' f' <b>Сообщение изменено:</b>\n' f' Старое сообщение:\n<code>{message_old}</code>\n' f' Новое сообщение:\n<code>{message_new}</code>') await bot.send_message(USER_ID, msg, parse_mode='html') @router.edited_business_message() async def edited_business_message(message: types.Message): if message.from_user.id == message.chat.id: user_msg = Messagesx.get(user_id=message.from_user.id) data = {message.message_id: message.text} if user_msg is None: Messagesx.add(user_id=message.from_user.id, message_history=json.dumps(data)) else: msg_history = json.loads(user_msg.message_history) if str(message.message_id) in msg_history: await send_msg(message_old=msg_history[str(message.message_id)], message_new=message.text, user_fullname=message.from_user.full_name, user_id=message.chat.id, bot=message.bot) data = {**msg_history, **data} Messagesx.update(user_id=message.from_user.id, message_history=json.dumps(data)) @router.deleted_business_messages() async def deleted_business_messages(message: types.Message): user_msg = Messagesx.get(user_id=message.chat.id) if user_msg is not None: msg_history = json.loads(user_msg.message_history) for msg_id in message.message_ids: if str(msg_id) in msg_history: await send_msg(message_old=msg_history[str(msg_id)], message_new=None, user_fullname=message.chat.full_name, user_id=message.chat.id, bot=message.bot) msg_history.pop(str(msg_id)) Messagesx.update(user_id=message.chat.id, message_history=json.dumps(msg_history)) @router.business_message(F.text) async def business_message(message: types.Message): if message.from_user.id == message.chat.id: user_msg = Messagesx.get(user_id=message.from_user.id) data = {message.message_id: message.text} if user_msg is None: Messagesx.add(user_id=message.from_user.id, message_history=json.dumps(data)) else: msg_history = json.loads(user_msg.message_history) data = {**msg_history, **data} Messagesx.update(user_id=message.from_user.id, message_history=json.dumps(data)) async def main() -> None: Messagesx.create_db() bot = Bot(token=TOKEN) dp = Dispatcher() dp.include_router(router) await bot.delete_webhook(drop_pending_updates=True) await dp.start_polling(bot) asyncio.run(main())
- Создаем файл
"database.py
” и вставляем мой код с работой базы данных:
from pydantic import BaseModel import sqlite3 def dict_factory(cursor, row) -> dict: save_dict = {} for idx, col in enumerate(cursor.description): save_dict[col[0]] = row[idx] return save_dict def update_format(sql, parameters: dict) -> tuple[str, list]: values = ", ".join([ f"{item} = ?" for item in parameters ]) sql += f" {values}" return sql, list(parameters.values()) def update_format_where(sql, parameters: dict) -> tuple[str, list]: sql += " WHERE " sql += " AND ".join([ f"{item} = ?" for item in parameters ]) return sql, list(parameters.values()) class MessageRecord(BaseModel): user_id: int message_history: str # Работа с юзером class Messagesx: storage_name = "messages" PATH_DATABASE = "messages.db" @staticmethod def create_db(): with sqlite3.connect('messages.db') as conn: cursor = conn.cursor() cursor.execute('''CREATE TABLE IF NOT EXISTS messages (id INTEGER PRIMARY KEY, user_id INTEGER, message_history TEXT)''') # Добавление записи @staticmethod def add( user_id: int, message_history: str, ): with sqlite3.connect(Messagesx.PATH_DATABASE) as con: con.row_factory = dict_factory con.execute( f""" INSERT INTO {Messagesx.storage_name} ( user_id, message_history ) VALUES (?, ?) """, [ user_id, message_history, ], ) # Получение записи @staticmethod def get(**kwargs) -> MessageRecord: with sqlite3.connect(Messagesx.PATH_DATABASE) as con: con.row_factory = dict_factory sql = f"SELECT * FROM {Messagesx.storage_name}" sql, parameters = update_format_where(sql, kwargs) response = con.execute(sql, parameters).fetchone() if response is not None: response = MessageRecord(**response) return response # Получение записей @staticmethod def gets(**kwargs) -> list[MessageRecord]: with sqlite3.connect(Messagesx.PATH_DATABASE) as con: con.row_factory = dict_factory sql = f"SELECT * FROM {Messagesx.storage_name}" sql, parameters = update_format_where(sql, kwargs) response = con.execute(sql, parameters).fetchall() if len(response) >= 1: response = [MessageRecord(**cache_object) for cache_object in response] return response # Получение всех записей @staticmethod def get_all() -> list[MessageRecord]: with sqlite3.connect(Messagesx.PATH_DATABASE) as con: con.row_factory = dict_factory sql = f"SELECT * FROM {Messagesx.storage_name}" response = con.execute(sql).fetchall() if len(response) >= 1: response = [MessageRecord(**cache_object) for cache_object in response] return response # Редактирование записи @staticmethod def update(user_id, **kwargs): with sqlite3.connect(Messagesx.PATH_DATABASE) as con: con.row_factory = dict_factory sql = f"UPDATE {Messagesx.storage_name} SET" sql, parameters = update_format(sql, kwargs) parameters.append(user_id) con.execute(sql + "WHERE user_id = ?", parameters) # Удаление записи @staticmethod def delete(**kwargs): with sqlite3.connect(Messagesx.PATH_DATABASE) as con: con.row_factory = dict_factory sql = f"DELETE FROM {Messagesx.storage_name}" sql, parameters = update_format_where(sql, kwargs) con.execute(sql, parameters) # Очистка всех записей @staticmethod def clear(): with sqlite3.connect(Messagesx.PATH_DATABASE) as con: con.row_factory = dict_factory sql = f"DELETE FROM {Messagesx.storage_name}" con.execute(sql)
- Создаем файл “config.ini” и вставляем текст, но изменяем токен бота и ставим свой user_id (получаем его в боте: t.me/userinfobot ):
[main] bot_token=6095..... user=80....
- Вместо
6095.....
и80....
, там где bot_token – вставляем токен вашего бота, который вы получили у BotFather. И в user нужно вставить user_id из пункта 5! (У других бот будет отправлять уведомления вам, если они подключат вашего бота. У telegram не выводит кому было отправлено сообщение, так что узнать ваш ID никак нельзя);
- Запускаем скрипт и проверяем. Удаляем или изменяем сообщение для проверки.
Примеры сохраненных сообщений
Вот как выглядит работа скрипта и база даных, в которой будет все храниться.
В базе хранятся id пользователей и история сообщений после включения бота (message_id: text).