chatmemberbot.pyΒΆ

  1#!/usr/bin/env python
  2# pylint: disable=unused-argument
  3# This program is dedicated to the public domain under the CC0 license.
  4
  5"""
  6Simple Bot to handle '(my_)chat_member' updates.
  7Greets new users & keeps track of which chats the bot is in.
  8
  9Usage:
 10Press Ctrl-C on the command line or send a signal to the process to stop the
 11bot.
 12"""
 13
 14import logging
 15from typing import Optional, Tuple
 16
 17from telegram import Chat, ChatMember, ChatMemberUpdated, Update
 18from telegram.constants import ParseMode
 19from telegram.ext import (
 20    Application,
 21    ChatMemberHandler,
 22    CommandHandler,
 23    ContextTypes,
 24    MessageHandler,
 25    filters,
 26)
 27
 28# Enable logging
 29
 30logging.basicConfig(
 31    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
 32)
 33
 34# set higher logging level for httpx to avoid all GET and POST requests being logged
 35logging.getLogger("httpx").setLevel(logging.WARNING)
 36
 37logger = logging.getLogger(__name__)
 38
 39
 40def extract_status_change(chat_member_update: ChatMemberUpdated) -> Optional[Tuple[bool, bool]]:
 41    """Takes a ChatMemberUpdated instance and extracts whether the 'old_chat_member' was a member
 42    of the chat and whether the 'new_chat_member' is a member of the chat. Returns None, if
 43    the status didn't change.
 44    """
 45    status_change = chat_member_update.difference().get("status")
 46    old_is_member, new_is_member = chat_member_update.difference().get("is_member", (None, None))
 47
 48    if status_change is None:
 49        return None
 50
 51    old_status, new_status = status_change
 52    was_member = old_status in [
 53        ChatMember.MEMBER,
 54        ChatMember.OWNER,
 55        ChatMember.ADMINISTRATOR,
 56    ] or (old_status == ChatMember.RESTRICTED and old_is_member is True)
 57    is_member = new_status in [
 58        ChatMember.MEMBER,
 59        ChatMember.OWNER,
 60        ChatMember.ADMINISTRATOR,
 61    ] or (new_status == ChatMember.RESTRICTED and new_is_member is True)
 62
 63    return was_member, is_member
 64
 65
 66async def track_chats(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
 67    """Tracks the chats the bot is in."""
 68    result = extract_status_change(update.my_chat_member)
 69    if result is None:
 70        return
 71    was_member, is_member = result
 72
 73    # Let's check who is responsible for the change
 74    cause_name = update.effective_user.full_name
 75
 76    # Handle chat types differently:
 77    chat = update.effective_chat
 78    if chat.type == Chat.PRIVATE:
 79        if not was_member and is_member:
 80            # This may not be really needed in practice because most clients will automatically
 81            # send a /start command after the user unblocks the bot, and start_private_chat()
 82            # will add the user to "user_ids".
 83            # We're including this here for the sake of the example.
 84            logger.info("%s unblocked the bot", cause_name)
 85            context.bot_data.setdefault("user_ids", set()).add(chat.id)
 86        elif was_member and not is_member:
 87            logger.info("%s blocked the bot", cause_name)
 88            context.bot_data.setdefault("user_ids", set()).discard(chat.id)
 89    elif chat.type in [Chat.GROUP, Chat.SUPERGROUP]:
 90        if not was_member and is_member:
 91            logger.info("%s added the bot to the group %s", cause_name, chat.title)
 92            context.bot_data.setdefault("group_ids", set()).add(chat.id)
 93        elif was_member and not is_member:
 94            logger.info("%s removed the bot from the group %s", cause_name, chat.title)
 95            context.bot_data.setdefault("group_ids", set()).discard(chat.id)
 96    elif not was_member and is_member:
 97        logger.info("%s added the bot to the channel %s", cause_name, chat.title)
 98        context.bot_data.setdefault("channel_ids", set()).add(chat.id)
 99    elif was_member and not is_member:
100        logger.info("%s removed the bot from the channel %s", cause_name, chat.title)
101        context.bot_data.setdefault("channel_ids", set()).discard(chat.id)
102
103
104async def show_chats(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
105    """Shows which chats the bot is in"""
106    user_ids = ", ".join(str(uid) for uid in context.bot_data.setdefault("user_ids", set()))
107    group_ids = ", ".join(str(gid) for gid in context.bot_data.setdefault("group_ids", set()))
108    channel_ids = ", ".join(str(cid) for cid in context.bot_data.setdefault("channel_ids", set()))
109    text = (
110        f"@{context.bot.username} is currently in a conversation with the user IDs {user_ids}."
111        f" Moreover it is a member of the groups with IDs {group_ids} "
112        f"and administrator in the channels with IDs {channel_ids}."
113    )
114    await update.effective_message.reply_text(text)
115
116
117async def greet_chat_members(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
118    """Greets new users in chats and announces when someone leaves"""
119    result = extract_status_change(update.chat_member)
120    if result is None:
121        return
122
123    was_member, is_member = result
124    cause_name = update.chat_member.from_user.mention_html()
125    member_name = update.chat_member.new_chat_member.user.mention_html()
126
127    if not was_member and is_member:
128        await update.effective_chat.send_message(
129            f"{member_name} was added by {cause_name}. Welcome!",
130            parse_mode=ParseMode.HTML,
131        )
132    elif was_member and not is_member:
133        await update.effective_chat.send_message(
134            f"{member_name} is no longer with us. Thanks a lot, {cause_name} ...",
135            parse_mode=ParseMode.HTML,
136        )
137
138
139async def start_private_chat(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
140    """Greets the user and records that they started a chat with the bot if it's a private chat.
141    Since no `my_chat_member` update is issued when a user starts a private chat with the bot
142    for the first time, we have to track it explicitly here.
143    """
144    user_name = update.effective_user.full_name
145    chat = update.effective_chat
146    if chat.type != Chat.PRIVATE or chat.id in context.bot_data.get("user_ids", set()):
147        return
148
149    logger.info("%s started a private chat with the bot", user_name)
150    context.bot_data.setdefault("user_ids", set()).add(chat.id)
151
152    await update.effective_message.reply_text(
153        f"Welcome {user_name}. Use /show_chats to see what chats I'm in."
154    )
155
156
157def main() -> None:
158    """Start the bot."""
159    # Create the Application and pass it your bot's token.
160    application = Application.builder().token("TOKEN").build()
161
162    # Keep track of which chats the bot is in
163    application.add_handler(ChatMemberHandler(track_chats, ChatMemberHandler.MY_CHAT_MEMBER))
164    application.add_handler(CommandHandler("show_chats", show_chats))
165
166    # Handle members joining/leaving chats.
167    application.add_handler(ChatMemberHandler(greet_chat_members, ChatMemberHandler.CHAT_MEMBER))
168
169    # Interpret any other command or text message as a start of a private chat.
170    # This will record the user as being in a private chat with bot.
171    application.add_handler(MessageHandler(filters.ALL, start_private_chat))
172
173    # Run the bot until the user presses Ctrl-C
174    # We pass 'allowed_updates' handle *all* updates including `chat_member` updates
175    # To reset this, simply pass `allowed_updates=[]`
176    application.run_polling(allowed_updates=Update.ALL_TYPES)
177
178
179if __name__ == "__main__":
180    main()