rawapibot.py

This example uses only the pure, “bare-metal” API wrapper.

 1#!/usr/bin/env python
 2# pylint: disable=wrong-import-position
 3"""Simple Bot to reply to Telegram messages.
 4
 5This is built on the API wrapper, see echobot.py to see the same example built
 6on the telegram.ext bot framework.
 7This program is dedicated to the public domain under the CC0 license.
 8"""
 9import asyncio
10import logging
11from typing import NoReturn
12
13from telegram import __version__ as TG_VER
14
15try:
16    from telegram import __version_info__
17except ImportError:
18    __version_info__ = (0, 0, 0, 0, 0)  # type: ignore[assignment]  # type: ignore[assignment]
19
20if __version_info__ < (20, 0, 0, "alpha", 1):
21    raise RuntimeError(
22        f"This example is not compatible with your current PTB version {TG_VER}. To view the "
23        f"{TG_VER} version of this example, "
24        f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html"
25    )
26from telegram import Bot
27from telegram.error import Forbidden, NetworkError
28
29logging.basicConfig(
30    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
31)
32logger = logging.getLogger(__name__)
33
34
35async def main() -> NoReturn:
36    """Run the bot."""
37    # Here we use the `async with` syntax to properly initialize and shutdown resources.
38    async with Bot("TOKEN") as bot:
39        # get the first pending update_id, this is so we can skip over it in case
40        # we get a "Forbidden" exception.
41        try:
42            update_id = (await bot.get_updates())[0].update_id
43        except IndexError:
44            update_id = None
45
46        logger.info("listening for new messages...")
47        while True:
48            try:
49                update_id = await echo(bot, update_id)
50            except NetworkError:
51                await asyncio.sleep(1)
52            except Forbidden:
53                # The user has removed or blocked the bot.
54                update_id += 1
55
56
57async def echo(bot: Bot, update_id: int) -> int:
58    """Echo the message the user sent."""
59    # Request updates after the last update_id
60    updates = await bot.get_updates(offset=update_id, timeout=10)
61    for update in updates:
62        next_update_id = update.update_id + 1
63
64        # your bot can receive updates without messages
65        # and not all messages contain text
66        if update.message and update.message.text:
67            # Reply to the message
68            logger.info("Found message %s!", update.message.text)
69            await update.message.reply_text(update.message.text)
70        return next_update_id
71    return update_id
72
73
74if __name__ == "__main__":
75    try:
76        asyncio.run(main())
77    except KeyboardInterrupt:  # Ignore exception when Ctrl-C is pressed
78        pass