Сохраняем все удаленные и отредактированные сообщения в Telegram

4 мин

|

Ахой кибер странники! Данный материал пригодится многим админам чатов и просто тем, у кого каждое слово в диалоге на вес золота. Не лишним будет и тем, кто просто хочет быть в курсе всего, что происходит в Телеграм во время отсутствия онлайн.

Для работы чудо-машины не потребуется скачивать какой либо сторонний клиент Телеграм, реализация способа будет произведена полностью нами.

Как смотреть удаленные и отредактированные сообщения в Телеграм

Следуйте указанной ниже инструкции и у вас получится архивировать все сообщени я, которые когда либо были удалены или поменяны:

  • Активируем Telegram Premium, если его у вас еще нет.
  • Создаем бота в https://t.me/BotFather и включаем в нём Business mode:

  • Подключаем своего бота к профилю Telegram и выбираем чаты, где он будет работать:

Прописываем код ниже на Python

  • Устанавливаем Python. https://www.python.org/downloads/ (не забываем поставить галку “ADD TO PATH” при установке);
  • Открываем консоль и устанавливаем необходимые модули;
pip install aiogram
pip install pydantic
  • Создаем файл ⁡"main.py“⁡ и вставляем код:​
import configparser
import json
from typing import Union

import asyncio

from aiogram import (Router, Bot, Dispatcher,
                     F, types)
import logging

from database import Messagesx

router = Router(name=__name__)

logger = logging.getLogger(__name__)

logging.basicConfig(level=logging.INFO)

config = configparser.ConfigParser()
config.read("config.ini")

TOKEN = config["main"]["bot_token"]
USER_ID = config["main"]["user"]


async def send_msg(message_old: str, message_new: Union[str, None], user_fullname: str, user_id: int, bot: Bot = None):
    if message_new is None:
        msg = (f' <b>Пользователь {user_fullname} ({user_id})</b>\n'
               f' <b>Сообщение удалено:</b>\n'
               f' Сообщение:\n<code>{message_old}</code>\n')
    else:
        msg = (f' <b>Пользователь {user_fullname} ({user_id})</b>\n'
               f' <b>Сообщение изменено:</b>\n'
               f' Старое сообщение:\n<code>{message_old}</code>\n'
               f' Новое сообщение:\n<code>{message_new}</code>')
    await bot.send_message(USER_ID, msg, parse_mode='html')


@router.edited_business_message()
async def edited_business_message(message: types.Message):
    if message.from_user.id == message.chat.id:
        user_msg = Messagesx.get(user_id=message.from_user.id)
        data = {message.message_id: message.text}
        if user_msg is None:
            Messagesx.add(user_id=message.from_user.id, message_history=json.dumps(data))
        else:
            msg_history = json.loads(user_msg.message_history)
            if str(message.message_id) in msg_history:
                await send_msg(message_old=msg_history[str(message.message_id)], message_new=message.text,
                               user_fullname=message.from_user.full_name, user_id=message.chat.id, bot=message.bot)
                data = {**msg_history, **data}
                Messagesx.update(user_id=message.from_user.id, message_history=json.dumps(data))


@router.deleted_business_messages()
async def deleted_business_messages(message: types.Message):
    user_msg = Messagesx.get(user_id=message.chat.id)
    if user_msg is not None:
        msg_history = json.loads(user_msg.message_history)
        for msg_id in message.message_ids:
            if str(msg_id) in msg_history:
                await send_msg(message_old=msg_history[str(msg_id)], message_new=None,
                               user_fullname=message.chat.full_name, user_id=message.chat.id, bot=message.bot)
                msg_history.pop(str(msg_id))
                Messagesx.update(user_id=message.chat.id, message_history=json.dumps(msg_history))


@router.business_message(F.text)
async def business_message(message: types.Message):
    if message.from_user.id == message.chat.id:
        user_msg = Messagesx.get(user_id=message.from_user.id)
        data = {message.message_id: message.text}
        if user_msg is None:
            Messagesx.add(user_id=message.from_user.id, message_history=json.dumps(data))
        else:
            msg_history = json.loads(user_msg.message_history)
            data = {**msg_history, **data}
            Messagesx.update(user_id=message.from_user.id, message_history=json.dumps(data))


async def main() -> None:
    Messagesx.create_db()

    bot = Bot(token=TOKEN)
    dp = Dispatcher()

    dp.include_router(router)

    await bot.delete_webhook(drop_pending_updates=True)
    await dp.start_polling(bot)


asyncio.run(main())
  • Создаем файл ⁡"database.py” ⁡и вставляем мой код с работой базы данных:
from pydantic import BaseModel

import sqlite3


def dict_factory(cursor, row) -> dict:
    save_dict = {}

    for idx, col in enumerate(cursor.description):
        save_dict[col[0]] = row[idx]

    return save_dict


def update_format(sql, parameters: dict) -> tuple[str, list]:
    values = ", ".join([
        f"{item} = ?" for item in parameters
    ])
    sql += f" {values}"

    return sql, list(parameters.values())


def update_format_where(sql, parameters: dict) -> tuple[str, list]:
    sql += " WHERE "

    sql += " AND ".join([
        f"{item} = ?" for item in parameters
    ])

    return sql, list(parameters.values())


class MessageRecord(BaseModel):
    user_id: int
    message_history: str



# Работа с юзером
class Messagesx:
    storage_name = "messages"
    PATH_DATABASE = "messages.db"

    @staticmethod
    def create_db():
        with sqlite3.connect('messages.db') as conn:
            cursor = conn.cursor()
            cursor.execute('''CREATE TABLE IF NOT EXISTS messages
                              (id INTEGER PRIMARY KEY, user_id INTEGER, message_history TEXT)''')

    # Добавление записи
    @staticmethod
    def add(
            user_id: int,
            message_history: str,

    ):

        with sqlite3.connect(Messagesx.PATH_DATABASE) as con:
            con.row_factory = dict_factory

            con.execute(
                f"""
                    INSERT INTO {Messagesx.storage_name} (
                        user_id,
                        message_history
                    ) VALUES (?, ?)
                """,
                [
                    user_id,
                    message_history,
                ],
            )

    # Получение записи
    @staticmethod
    def get(**kwargs) -> MessageRecord:
        with sqlite3.connect(Messagesx.PATH_DATABASE) as con:
            con.row_factory = dict_factory
            sql = f"SELECT * FROM {Messagesx.storage_name}"
            sql, parameters = update_format_where(sql, kwargs)

            response = con.execute(sql, parameters).fetchone()

            if response is not None:
                response = MessageRecord(**response)

            return response

    # Получение записей
    @staticmethod
    def gets(**kwargs) -> list[MessageRecord]:
        with sqlite3.connect(Messagesx.PATH_DATABASE) as con:
            con.row_factory = dict_factory
            sql = f"SELECT * FROM {Messagesx.storage_name}"
            sql, parameters = update_format_where(sql, kwargs)

            response = con.execute(sql, parameters).fetchall()

            if len(response) >= 1:
                response = [MessageRecord(**cache_object) for cache_object in response]

            return response

    # Получение всех записей
    @staticmethod
    def get_all() -> list[MessageRecord]:
        with sqlite3.connect(Messagesx.PATH_DATABASE) as con:
            con.row_factory = dict_factory
            sql = f"SELECT * FROM {Messagesx.storage_name}"

            response = con.execute(sql).fetchall()

            if len(response) >= 1:
                response = [MessageRecord(**cache_object) for cache_object in response]

            return response

    # Редактирование записи
    @staticmethod
    def update(user_id, **kwargs):
        with sqlite3.connect(Messagesx.PATH_DATABASE) as con:
            con.row_factory = dict_factory
            sql = f"UPDATE {Messagesx.storage_name} SET"
            sql, parameters = update_format(sql, kwargs)
            parameters.append(user_id)

            con.execute(sql + "WHERE user_id = ?", parameters)

    # Удаление записи
    @staticmethod
    def delete(**kwargs):
        with sqlite3.connect(Messagesx.PATH_DATABASE) as con:
            con.row_factory = dict_factory
            sql = f"DELETE FROM {Messagesx.storage_name}"
            sql, parameters = update_format_where(sql, kwargs)

            con.execute(sql, parameters)

    # Очистка всех записей
    @staticmethod
    def clear():
        with sqlite3.connect(Messagesx.PATH_DATABASE) as con:
            con.row_factory = dict_factory
            sql = f"DELETE FROM {Messagesx.storage_name}"

            con.execute(sql)
  • Создаем файл “config.ini” и вставляем текст, но изменяем токен бота и ставим свой user_id (получаем его в боте: t.me/userinfobot ):
[main]
bot_token=6095.....
user=80....
  • Вместо ⁡6095..... ⁡ и ⁡80.... ⁡, там где bot_token – вставляем токен вашего бота, который вы получили у BotFather. И в user нужно вставить user_id из пункта 5! (У других бот будет отправлять уведомления вам, если они подключат вашего бота. У telegram не выводит кому было отправлено сообщение, так что узнать ваш ID никак нельзя);
  • Запускаем скрипт и проверяем. Удаляем или изменяем сообщение для проверки.

Примеры сохраненных сообщений

Вот как выглядит работа скрипта и база даных, в которой будет все храниться.

Отображение в Телеграм

База данных

В базе хранятся id пользователей и история сообщений после включения бота (message_id: text).


Подписаться
Уведомить о
guest
0 комментариев
Межтекстовые Отзывы
Посмотреть все комментарии