customwebhookbot.py

  1#!/usr/bin/env python
  2# This program is dedicated to the public domain under the CC0 license.
  3# pylint: disable=import-error,wrong-import-position
  4"""
  5Simple example of a bot that uses a custom webhook setup and handles custom updates.
  6For the custom webhook setup, the libraries `starlette` and `uvicorn` are used. Please install
  7them as `pip install starlette~=0.20.0 uvicorn~=0.17.0`.
  8Note that any other `asyncio` based web server framework can be used for a custom webhook setup
  9just as well.
 10
 11Usage:
 12Set bot token, url, admin chat_id and port at the start of the `main` function.
 13You may also need to change the `listen` value in the uvicorn configuration to match your setup.
 14Press Ctrl-C on the command line or send a signal to the process to stop the bot.
 15"""
 16import asyncio
 17import html
 18import logging
 19from dataclasses import dataclass
 20from http import HTTPStatus
 21
 22import uvicorn
 23from starlette.applications import Starlette
 24from starlette.requests import Request
 25from starlette.responses import PlainTextResponse, Response
 26from starlette.routing import Route
 27
 28from telegram import __version__ as TG_VER
 29
 30try:
 31    from telegram import __version_info__
 32except ImportError:
 33    __version_info__ = (0, 0, 0, 0, 0)  # type: ignore[assignment]
 34
 35if __version_info__ < (20, 0, 0, "alpha", 1):
 36    raise RuntimeError(
 37        f"This example is not compatible with your current PTB version {TG_VER}. To view the "
 38        f"{TG_VER} version of this example, "
 39        f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html"
 40    )
 41
 42from telegram import Update
 43from telegram.constants import ParseMode
 44from telegram.ext import (
 45    Application,
 46    CallbackContext,
 47    CommandHandler,
 48    ContextTypes,
 49    ExtBot,
 50    TypeHandler,
 51)
 52
 53# Enable logging
 54logging.basicConfig(
 55    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
 56)
 57logger = logging.getLogger(__name__)
 58
 59
 60@dataclass
 61class WebhookUpdate:
 62    """Simple dataclass to wrap a custom update type"""
 63
 64    user_id: int
 65    payload: str
 66
 67
 68class CustomContext(CallbackContext[ExtBot, dict, dict, dict]):
 69    """
 70    Custom CallbackContext class that makes `user_data` available for updates of type
 71    `WebhookUpdate`.
 72    """
 73
 74    @classmethod
 75    def from_update(
 76        cls,
 77        update: object,
 78        application: "Application",
 79    ) -> "CustomContext":
 80        if isinstance(update, WebhookUpdate):
 81            return cls(application=application, user_id=update.user_id)
 82        return super().from_update(update, application)
 83
 84
 85async def start(update: Update, context: CustomContext) -> None:
 86    """Display a message with instructions on how to use this bot."""
 87    url = context.bot_data["url"]
 88    payload_url = html.escape(f"{url}/submitpayload?user_id=<your user id>&payload=<payload>")
 89    text = (
 90        f"To check if the bot is still running, call <code>{url}/healthcheck</code>.\n\n"
 91        f"To post a custom update, call <code>{payload_url}</code>."
 92    )
 93    await update.message.reply_html(text=text)
 94
 95
 96async def webhook_update(update: WebhookUpdate, context: CustomContext) -> None:
 97    """Callback that handles the custom updates."""
 98    chat_member = await context.bot.get_chat_member(chat_id=update.user_id, user_id=update.user_id)
 99    payloads = context.user_data.setdefault("payloads", [])
100    payloads.append(update.payload)
101    combined_payloads = "</code>\n• <code>".join(payloads)
102    text = (
103        f"The user {chat_member.user.mention_html()} has sent a new payload. "
104        f"So far they have sent the following payloads: \n\n• <code>{combined_payloads}</code>"
105    )
106    await context.bot.send_message(
107        chat_id=context.bot_data["admin_chat_id"], text=text, parse_mode=ParseMode.HTML
108    )
109
110
111async def main() -> None:
112    """Set up the application and a custom webserver."""
113    url = "https://domain.tld"
114    admin_chat_id = 123456
115    port = 8000
116
117    context_types = ContextTypes(context=CustomContext)
118    # Here we set updater to None because we want our custom webhook server to handle the updates
119    # and hence we don't need an Updater instance
120    application = (
121        Application.builder().token("TOKEN").updater(None).context_types(context_types).build()
122    )
123    # save the values in `bot_data` such that we may easily access them in the callbacks
124    application.bot_data["url"] = url
125    application.bot_data["admin_chat_id"] = admin_chat_id
126
127    # register handlers
128    application.add_handler(CommandHandler("start", start))
129    application.add_handler(TypeHandler(type=WebhookUpdate, callback=webhook_update))
130
131    # Pass webhook settings to telegram
132    await application.bot.set_webhook(url=f"{url}/telegram")
133
134    # Set up webserver
135    async def telegram(request: Request) -> Response:
136        """Handle incoming Telegram updates by putting them into the `update_queue`"""
137        await application.update_queue.put(
138            Update.de_json(data=await request.json(), bot=application.bot)
139        )
140        return Response()
141
142    async def custom_updates(request: Request) -> PlainTextResponse:
143        """
144        Handle incoming webhook updates by also putting them into the `update_queue` if
145        the required parameters were passed correctly.
146        """
147        try:
148            user_id = int(request.query_params["user_id"])
149            payload = request.query_params["payload"]
150        except KeyError:
151            return PlainTextResponse(
152                status_code=HTTPStatus.BAD_REQUEST,
153                content="Please pass both `user_id` and `payload` as query parameters.",
154            )
155        except ValueError:
156            return PlainTextResponse(
157                status_code=HTTPStatus.BAD_REQUEST,
158                content="The `user_id` must be a string!",
159            )
160
161        await application.update_queue.put(WebhookUpdate(user_id=user_id, payload=payload))
162        return PlainTextResponse("Thank you for the submission! It's being forwarded.")
163
164    async def health(_: Request) -> PlainTextResponse:
165        """For the health endpoint, reply with a simple plain text message."""
166        return PlainTextResponse(content="The bot is still running fine :)")
167
168    starlette_app = Starlette(
169        routes=[
170            Route("/telegram", telegram, methods=["POST"]),
171            Route("/healthcheck", health, methods=["GET"]),
172            Route("/submitpayload", custom_updates, methods=["POST", "GET"]),
173        ]
174    )
175    webserver = uvicorn.Server(
176        config=uvicorn.Config(
177            app=starlette_app,
178            port=port,
179            use_colors=False,
180            host="127.0.0.1",
181        )
182    )
183
184    # Run application and webserver together
185    async with application:
186        await application.start()
187        await webserver.serve()
188        await application.stop()
189
190
191if __name__ == "__main__":
192    asyncio.run(main())