chatmemberbot.pyΒΆ

  1#!/usr/bin/env python
  2# pylint: disable=unused-argument, wrong-import-position
  3# This program is dedicated to the public domain under the CC0 license.
  4
  5"""
  6Simple Bot to handle '(my_)chat_member' updates.
  7Greets new users & keeps track of which chats the bot is in.
  8
  9Usage:
 10Press Ctrl-C on the command line or send a signal to the process to stop the
 11bot.
 12"""
 13
 14import logging
 15from typing import Optional, Tuple
 16
 17from telegram import __version__ as TG_VER
 18
 19try:
 20    from telegram import __version_info__
 21except ImportError:
 22    __version_info__ = (0, 0, 0, 0, 0)  # type: ignore[assignment]
 23
 24if __version_info__ < (20, 0, 0, "alpha", 1):
 25    raise RuntimeError(
 26        f"This example is not compatible with your current PTB version {TG_VER}. To view the "
 27        f"{TG_VER} version of this example, "
 28        f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html"
 29    )
 30from telegram import Chat, ChatMember, ChatMemberUpdated, Update
 31from telegram.constants import ParseMode
 32from telegram.ext import Application, ChatMemberHandler, CommandHandler, ContextTypes
 33
 34# Enable logging
 35
 36logging.basicConfig(
 37    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
 38)
 39
 40logger = logging.getLogger(__name__)
 41
 42
 43def extract_status_change(chat_member_update: ChatMemberUpdated) -> Optional[Tuple[bool, bool]]:
 44    """Takes a ChatMemberUpdated instance and extracts whether the 'old_chat_member' was a member
 45    of the chat and whether the 'new_chat_member' is a member of the chat. Returns None, if
 46    the status didn't change.
 47    """
 48    status_change = chat_member_update.difference().get("status")
 49    old_is_member, new_is_member = chat_member_update.difference().get("is_member", (None, None))
 50
 51    if status_change is None:
 52        return None
 53
 54    old_status, new_status = status_change
 55    was_member = old_status in [
 56        ChatMember.MEMBER,
 57        ChatMember.OWNER,
 58        ChatMember.ADMINISTRATOR,
 59    ] or (old_status == ChatMember.RESTRICTED and old_is_member is True)
 60    is_member = new_status in [
 61        ChatMember.MEMBER,
 62        ChatMember.OWNER,
 63        ChatMember.ADMINISTRATOR,
 64    ] or (new_status == ChatMember.RESTRICTED and new_is_member is True)
 65
 66    return was_member, is_member
 67
 68
 69async def track_chats(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
 70    """Tracks the chats the bot is in."""
 71    result = extract_status_change(update.my_chat_member)
 72    if result is None:
 73        return
 74    was_member, is_member = result
 75
 76    # Let's check who is responsible for the change
 77    cause_name = update.effective_user.full_name
 78
 79    # Handle chat types differently:
 80    chat = update.effective_chat
 81    if chat.type == Chat.PRIVATE:
 82        if not was_member and is_member:
 83            logger.info("%s started the bot", cause_name)
 84            context.bot_data.setdefault("user_ids", set()).add(chat.id)
 85        elif was_member and not is_member:
 86            logger.info("%s blocked the bot", cause_name)
 87            context.bot_data.setdefault("user_ids", set()).discard(chat.id)
 88    elif chat.type in [Chat.GROUP, Chat.SUPERGROUP]:
 89        if not was_member and is_member:
 90            logger.info("%s added the bot to the group %s", cause_name, chat.title)
 91            context.bot_data.setdefault("group_ids", set()).add(chat.id)
 92        elif was_member and not is_member:
 93            logger.info("%s removed the bot from the group %s", cause_name, chat.title)
 94            context.bot_data.setdefault("group_ids", set()).discard(chat.id)
 95    else:
 96        if not was_member and is_member:
 97            logger.info("%s added the bot to the channel %s", cause_name, chat.title)
 98            context.bot_data.setdefault("channel_ids", set()).add(chat.id)
 99        elif was_member and not is_member:
100            logger.info("%s removed the bot from the channel %s", cause_name, chat.title)
101            context.bot_data.setdefault("channel_ids", set()).discard(chat.id)
102
103
104async def show_chats(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
105    """Shows which chats the bot is in"""
106    user_ids = ", ".join(str(uid) for uid in context.bot_data.setdefault("user_ids", set()))
107    group_ids = ", ".join(str(gid) for gid in context.bot_data.setdefault("group_ids", set()))
108    channel_ids = ", ".join(str(cid) for cid in context.bot_data.setdefault("channel_ids", set()))
109    text = (
110        f"@{context.bot.username} is currently in a conversation with the user IDs {user_ids}."
111        f" Moreover it is a member of the groups with IDs {group_ids} "
112        f"and administrator in the channels with IDs {channel_ids}."
113    )
114    await update.effective_message.reply_text(text)
115
116
117async def greet_chat_members(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
118    """Greets new users in chats and announces when someone leaves"""
119    result = extract_status_change(update.chat_member)
120    if result is None:
121        return
122
123    was_member, is_member = result
124    cause_name = update.chat_member.from_user.mention_html()
125    member_name = update.chat_member.new_chat_member.user.mention_html()
126
127    if not was_member and is_member:
128        await update.effective_chat.send_message(
129            f"{member_name} was added by {cause_name}. Welcome!",
130            parse_mode=ParseMode.HTML,
131        )
132    elif was_member and not is_member:
133        await update.effective_chat.send_message(
134            f"{member_name} is no longer with us. Thanks a lot, {cause_name} ...",
135            parse_mode=ParseMode.HTML,
136        )
137
138
139def main() -> None:
140    """Start the bot."""
141    # Create the Application and pass it your bot's token.
142    application = Application.builder().token("TOKEN").build()
143
144    # Keep track of which chats the bot is in
145    application.add_handler(ChatMemberHandler(track_chats, ChatMemberHandler.MY_CHAT_MEMBER))
146    application.add_handler(CommandHandler("show_chats", show_chats))
147
148    # Handle members joining/leaving chats.
149    application.add_handler(ChatMemberHandler(greet_chat_members, ChatMemberHandler.CHAT_MEMBER))
150
151    # Run the bot until the user presses Ctrl-C
152    # We pass 'allowed_updates' handle *all* updates including `chat_member` updates
153    # To reset this, simply pass `allowed_updates=[]`
154    application.run_polling(allowed_updates=Update.ALL_TYPES)
155
156
157if __name__ == "__main__":
158    main()