chatmemberbot.pyΒΆ

  1#!/usr/bin/env python
  2# pylint: disable=unused-argument, wrong-import-position
  3# This program is dedicated to the public domain under the CC0 license.
  4
  5"""
  6Simple Bot to handle '(my_)chat_member' updates.
  7Greets new users & keeps track of which chats the bot is in.
  8
  9Usage:
 10Press Ctrl-C on the command line or send a signal to the process to stop the
 11bot.
 12"""
 13
 14import logging
 15from typing import Optional, Tuple
 16
 17from telegram import __version__ as TG_VER
 18
 19try:
 20    from telegram import __version_info__
 21except ImportError:
 22    __version_info__ = (0, 0, 0, 0, 0)  # type: ignore[assignment]
 23
 24if __version_info__ < (20, 0, 0, "alpha", 1):
 25    raise RuntimeError(
 26        f"This example is not compatible with your current PTB version {TG_VER}. To view the "
 27        f"{TG_VER} version of this example, "
 28        f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html"
 29    )
 30from telegram import Chat, ChatMember, ChatMemberUpdated, Update
 31from telegram.constants import ParseMode
 32from telegram.ext import (
 33    Application,
 34    ChatMemberHandler,
 35    CommandHandler,
 36    ContextTypes,
 37    MessageHandler,
 38    filters,
 39)
 40
 41# Enable logging
 42
 43logging.basicConfig(
 44    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
 45)
 46
 47logger = logging.getLogger(__name__)
 48
 49
 50def extract_status_change(chat_member_update: ChatMemberUpdated) -> Optional[Tuple[bool, bool]]:
 51    """Takes a ChatMemberUpdated instance and extracts whether the 'old_chat_member' was a member
 52    of the chat and whether the 'new_chat_member' is a member of the chat. Returns None, if
 53    the status didn't change.
 54    """
 55    status_change = chat_member_update.difference().get("status")
 56    old_is_member, new_is_member = chat_member_update.difference().get("is_member", (None, None))
 57
 58    if status_change is None:
 59        return None
 60
 61    old_status, new_status = status_change
 62    was_member = old_status in [
 63        ChatMember.MEMBER,
 64        ChatMember.OWNER,
 65        ChatMember.ADMINISTRATOR,
 66    ] or (old_status == ChatMember.RESTRICTED and old_is_member is True)
 67    is_member = new_status in [
 68        ChatMember.MEMBER,
 69        ChatMember.OWNER,
 70        ChatMember.ADMINISTRATOR,
 71    ] or (new_status == ChatMember.RESTRICTED and new_is_member is True)
 72
 73    return was_member, is_member
 74
 75
 76async def track_chats(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
 77    """Tracks the chats the bot is in."""
 78    result = extract_status_change(update.my_chat_member)
 79    if result is None:
 80        return
 81    was_member, is_member = result
 82
 83    # Let's check who is responsible for the change
 84    cause_name = update.effective_user.full_name
 85
 86    # Handle chat types differently:
 87    chat = update.effective_chat
 88    if chat.type == Chat.PRIVATE:
 89        if not was_member and is_member:
 90            # This may not be really needed in practice because most clients will automatically
 91            # send a /start command after the user unblocks the bot, and start_private_chat()
 92            # will add the user to "user_ids".
 93            # We're including this here for the sake of the example.
 94            logger.info("%s unblocked the bot", cause_name)
 95            context.bot_data.setdefault("user_ids", set()).add(chat.id)
 96        elif was_member and not is_member:
 97            logger.info("%s blocked the bot", cause_name)
 98            context.bot_data.setdefault("user_ids", set()).discard(chat.id)
 99    elif chat.type in [Chat.GROUP, Chat.SUPERGROUP]:
100        if not was_member and is_member:
101            logger.info("%s added the bot to the group %s", cause_name, chat.title)
102            context.bot_data.setdefault("group_ids", set()).add(chat.id)
103        elif was_member and not is_member:
104            logger.info("%s removed the bot from the group %s", cause_name, chat.title)
105            context.bot_data.setdefault("group_ids", set()).discard(chat.id)
106    else:
107        if not was_member and is_member:
108            logger.info("%s added the bot to the channel %s", cause_name, chat.title)
109            context.bot_data.setdefault("channel_ids", set()).add(chat.id)
110        elif was_member and not is_member:
111            logger.info("%s removed the bot from the channel %s", cause_name, chat.title)
112            context.bot_data.setdefault("channel_ids", set()).discard(chat.id)
113
114
115async def show_chats(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
116    """Shows which chats the bot is in"""
117    user_ids = ", ".join(str(uid) for uid in context.bot_data.setdefault("user_ids", set()))
118    group_ids = ", ".join(str(gid) for gid in context.bot_data.setdefault("group_ids", set()))
119    channel_ids = ", ".join(str(cid) for cid in context.bot_data.setdefault("channel_ids", set()))
120    text = (
121        f"@{context.bot.username} is currently in a conversation with the user IDs {user_ids}."
122        f" Moreover it is a member of the groups with IDs {group_ids} "
123        f"and administrator in the channels with IDs {channel_ids}."
124    )
125    await update.effective_message.reply_text(text)
126
127
128async def greet_chat_members(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
129    """Greets new users in chats and announces when someone leaves"""
130    result = extract_status_change(update.chat_member)
131    if result is None:
132        return
133
134    was_member, is_member = result
135    cause_name = update.chat_member.from_user.mention_html()
136    member_name = update.chat_member.new_chat_member.user.mention_html()
137
138    if not was_member and is_member:
139        await update.effective_chat.send_message(
140            f"{member_name} was added by {cause_name}. Welcome!",
141            parse_mode=ParseMode.HTML,
142        )
143    elif was_member and not is_member:
144        await update.effective_chat.send_message(
145            f"{member_name} is no longer with us. Thanks a lot, {cause_name} ...",
146            parse_mode=ParseMode.HTML,
147        )
148
149
150async def start_private_chat(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
151    """Greets the user and records that they started a chat with the bot if it's a private chat.
152    Since no `my_chat_member` update is issued when a user starts a private chat with the bot
153    for the first time, we have to track it explicitly here.
154    """
155    user_name = update.effective_user.full_name
156    chat = update.effective_chat
157    if chat.type != Chat.PRIVATE or chat.id in context.bot_data.get("user_ids", set()):
158        return
159
160    logger.info("%s started a private chat with the bot", user_name)
161    context.bot_data.setdefault("user_ids", set()).add(chat.id)
162
163    await update.effective_message.reply_text(
164        f"Welcome {user_name}. Use /show_chats to see what chats I'm in."
165    )
166
167
168def main() -> None:
169    """Start the bot."""
170    # Create the Application and pass it your bot's token.
171    application = Application.builder().token("TOKEN").build()
172
173    # Keep track of which chats the bot is in
174    application.add_handler(ChatMemberHandler(track_chats, ChatMemberHandler.MY_CHAT_MEMBER))
175    application.add_handler(CommandHandler("show_chats", show_chats))
176
177    # Handle members joining/leaving chats.
178    application.add_handler(ChatMemberHandler(greet_chat_members, ChatMemberHandler.CHAT_MEMBER))
179
180    # Interpret any other command or text message as a start of a private chat.
181    # This will record the user as being in a private chat with bot.
182    application.add_handler(MessageHandler(filters.ALL, start_private_chat))
183
184    # Run the bot until the user presses Ctrl-C
185    # We pass 'allowed_updates' handle *all* updates including `chat_member` updates
186    # To reset this, simply pass `allowed_updates=[]`
187    application.run_polling(allowed_updates=Update.ALL_TYPES)
188
189
190if __name__ == "__main__":
191    main()