contexttypesbot.pyΒΆ

  1#!/usr/bin/env python
  2# pylint: disable=unused-argument
  3# This program is dedicated to the public domain under the CC0 license.
  4
  5"""
  6Simple Bot to showcase `telegram.ext.ContextTypes`.
  7
  8Usage:
  9Press Ctrl-C on the command line or send a signal to the process to stop the
 10bot.
 11"""
 12
 13import logging
 14from collections import defaultdict
 15from typing import DefaultDict, Optional, Set
 16
 17from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update
 18from telegram.constants import ParseMode
 19from telegram.ext import (
 20    Application,
 21    CallbackContext,
 22    CallbackQueryHandler,
 23    CommandHandler,
 24    ContextTypes,
 25    ExtBot,
 26    TypeHandler,
 27)
 28
 29# Enable logging
 30logging.basicConfig(
 31    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
 32)
 33# set higher logging level for httpx to avoid all GET and POST requests being logged
 34logging.getLogger("httpx").setLevel(logging.WARNING)
 35
 36logger = logging.getLogger(__name__)
 37
 38
 39class ChatData:
 40    """Custom class for chat_data. Here we store data per message."""
 41
 42    def __init__(self) -> None:
 43        self.clicks_per_message: DefaultDict[int, int] = defaultdict(int)
 44
 45
 46# The [ExtBot, dict, ChatData, dict] is for type checkers like mypy
 47class CustomContext(CallbackContext[ExtBot, dict, ChatData, dict]):
 48    """Custom class for context."""
 49
 50    def __init__(
 51        self,
 52        application: Application,
 53        chat_id: Optional[int] = None,
 54        user_id: Optional[int] = None,
 55    ):
 56        super().__init__(application=application, chat_id=chat_id, user_id=user_id)
 57        self._message_id: Optional[int] = None
 58
 59    @property
 60    def bot_user_ids(self) -> Set[int]:
 61        """Custom shortcut to access a value stored in the bot_data dict"""
 62        return self.bot_data.setdefault("user_ids", set())
 63
 64    @property
 65    def message_clicks(self) -> Optional[int]:
 66        """Access the number of clicks for the message this context object was built for."""
 67        if self._message_id:
 68            return self.chat_data.clicks_per_message[self._message_id]
 69        return None
 70
 71    @message_clicks.setter
 72    def message_clicks(self, value: int) -> None:
 73        """Allow to change the count"""
 74        if not self._message_id:
 75            raise RuntimeError("There is no message associated with this context object.")
 76        self.chat_data.clicks_per_message[self._message_id] = value
 77
 78    @classmethod
 79    def from_update(cls, update: object, application: "Application") -> "CustomContext":
 80        """Override from_update to set _message_id."""
 81        # Make sure to call super()
 82        context = super().from_update(update, application)
 83
 84        if context.chat_data and isinstance(update, Update) and update.effective_message:
 85            # pylint: disable=protected-access
 86            context._message_id = update.effective_message.message_id
 87
 88        # Remember to return the object
 89        return context
 90
 91
 92async def start(update: Update, context: CustomContext) -> None:
 93    """Display a message with a button."""
 94    await update.message.reply_html(
 95        "This button was clicked <i>0</i> times.",
 96        reply_markup=InlineKeyboardMarkup.from_button(
 97            InlineKeyboardButton(text="Click me!", callback_data="button")
 98        ),
 99    )
100
101
102async def count_click(update: Update, context: CustomContext) -> None:
103    """Update the click count for the message."""
104    context.message_clicks += 1
105    await update.callback_query.answer()
106    await update.effective_message.edit_text(
107        f"This button was clicked <i>{context.message_clicks}</i> times.",
108        reply_markup=InlineKeyboardMarkup.from_button(
109            InlineKeyboardButton(text="Click me!", callback_data="button")
110        ),
111        parse_mode=ParseMode.HTML,
112    )
113
114
115async def print_users(update: Update, context: CustomContext) -> None:
116    """Show which users have been using this bot."""
117    await update.message.reply_text(
118        f"The following user IDs have used this bot: {', '.join(map(str, context.bot_user_ids))}"
119    )
120
121
122async def track_users(update: Update, context: CustomContext) -> None:
123    """Store the user id of the incoming update, if any."""
124    if update.effective_user:
125        context.bot_user_ids.add(update.effective_user.id)
126
127
128def main() -> None:
129    """Run the bot."""
130    context_types = ContextTypes(context=CustomContext, chat_data=ChatData)
131    application = Application.builder().token("TOKEN").context_types(context_types).build()
132
133    # run track_users in its own group to not interfere with the user handlers
134    application.add_handler(TypeHandler(Update, track_users), group=-1)
135    application.add_handler(CommandHandler("start", start))
136    application.add_handler(CallbackQueryHandler(count_click))
137    application.add_handler(CommandHandler("print_users", print_users))
138
139    application.run_polling(allowed_updates=Update.ALL_TYPES)
140
141
142if __name__ == "__main__":
143    main()