1#!/usr/bin/env python
2# pylint: disable=unused-argument, wrong-import-position
3# This program is dedicated to the public domain under the CC0 license.
4
5"""
6Simple Bot to handle '(my_)chat_member' updates.
7Greets new users & keeps track of which chats the bot is in.
8
9Usage:
10Press Ctrl-C on the command line or send a signal to the process to stop the
11bot.
12"""
13
14import logging
15from typing import Optional, Tuple
16
17from telegram import __version__ as TG_VER
18
19try:
20 from telegram import __version_info__
21except ImportError:
22 __version_info__ = (0, 0, 0, 0, 0) # type: ignore[assignment]
23
24if __version_info__ < (20, 0, 0, "alpha", 1):
25 raise RuntimeError(
26 f"This example is not compatible with your current PTB version {TG_VER}. To view the "
27 f"{TG_VER} version of this example, "
28 f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html"
29 )
30from telegram import Chat, ChatMember, ChatMemberUpdated, Update
31from telegram.constants import ParseMode
32from telegram.ext import (
33 Application,
34 ChatMemberHandler,
35 CommandHandler,
36 ContextTypes,
37 MessageHandler,
38 filters,
39)
40
41# Enable logging
42
43logging.basicConfig(
44 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
45)
46
47# set higher logging level for httpx to avoid all GET and POST requests being logged
48logging.getLogger("httpx").setLevel(logging.WARNING)
49
50logger = logging.getLogger(__name__)
51
52
53def extract_status_change(chat_member_update: ChatMemberUpdated) -> Optional[Tuple[bool, bool]]:
54 """Takes a ChatMemberUpdated instance and extracts whether the 'old_chat_member' was a member
55 of the chat and whether the 'new_chat_member' is a member of the chat. Returns None, if
56 the status didn't change.
57 """
58 status_change = chat_member_update.difference().get("status")
59 old_is_member, new_is_member = chat_member_update.difference().get("is_member", (None, None))
60
61 if status_change is None:
62 return None
63
64 old_status, new_status = status_change
65 was_member = old_status in [
66 ChatMember.MEMBER,
67 ChatMember.OWNER,
68 ChatMember.ADMINISTRATOR,
69 ] or (old_status == ChatMember.RESTRICTED and old_is_member is True)
70 is_member = new_status in [
71 ChatMember.MEMBER,
72 ChatMember.OWNER,
73 ChatMember.ADMINISTRATOR,
74 ] or (new_status == ChatMember.RESTRICTED and new_is_member is True)
75
76 return was_member, is_member
77
78
79async def track_chats(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
80 """Tracks the chats the bot is in."""
81 result = extract_status_change(update.my_chat_member)
82 if result is None:
83 return
84 was_member, is_member = result
85
86 # Let's check who is responsible for the change
87 cause_name = update.effective_user.full_name
88
89 # Handle chat types differently:
90 chat = update.effective_chat
91 if chat.type == Chat.PRIVATE:
92 if not was_member and is_member:
93 # This may not be really needed in practice because most clients will automatically
94 # send a /start command after the user unblocks the bot, and start_private_chat()
95 # will add the user to "user_ids".
96 # We're including this here for the sake of the example.
97 logger.info("%s unblocked the bot", cause_name)
98 context.bot_data.setdefault("user_ids", set()).add(chat.id)
99 elif was_member and not is_member:
100 logger.info("%s blocked the bot", cause_name)
101 context.bot_data.setdefault("user_ids", set()).discard(chat.id)
102 elif chat.type in [Chat.GROUP, Chat.SUPERGROUP]:
103 if not was_member and is_member:
104 logger.info("%s added the bot to the group %s", cause_name, chat.title)
105 context.bot_data.setdefault("group_ids", set()).add(chat.id)
106 elif was_member and not is_member:
107 logger.info("%s removed the bot from the group %s", cause_name, chat.title)
108 context.bot_data.setdefault("group_ids", set()).discard(chat.id)
109 elif not was_member and is_member:
110 logger.info("%s added the bot to the channel %s", cause_name, chat.title)
111 context.bot_data.setdefault("channel_ids", set()).add(chat.id)
112 elif was_member and not is_member:
113 logger.info("%s removed the bot from the channel %s", cause_name, chat.title)
114 context.bot_data.setdefault("channel_ids", set()).discard(chat.id)
115
116
117async def show_chats(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
118 """Shows which chats the bot is in"""
119 user_ids = ", ".join(str(uid) for uid in context.bot_data.setdefault("user_ids", set()))
120 group_ids = ", ".join(str(gid) for gid in context.bot_data.setdefault("group_ids", set()))
121 channel_ids = ", ".join(str(cid) for cid in context.bot_data.setdefault("channel_ids", set()))
122 text = (
123 f"@{context.bot.username} is currently in a conversation with the user IDs {user_ids}."
124 f" Moreover it is a member of the groups with IDs {group_ids} "
125 f"and administrator in the channels with IDs {channel_ids}."
126 )
127 await update.effective_message.reply_text(text)
128
129
130async def greet_chat_members(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
131 """Greets new users in chats and announces when someone leaves"""
132 result = extract_status_change(update.chat_member)
133 if result is None:
134 return
135
136 was_member, is_member = result
137 cause_name = update.chat_member.from_user.mention_html()
138 member_name = update.chat_member.new_chat_member.user.mention_html()
139
140 if not was_member and is_member:
141 await update.effective_chat.send_message(
142 f"{member_name} was added by {cause_name}. Welcome!",
143 parse_mode=ParseMode.HTML,
144 )
145 elif was_member and not is_member:
146 await update.effective_chat.send_message(
147 f"{member_name} is no longer with us. Thanks a lot, {cause_name} ...",
148 parse_mode=ParseMode.HTML,
149 )
150
151
152async def start_private_chat(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
153 """Greets the user and records that they started a chat with the bot if it's a private chat.
154 Since no `my_chat_member` update is issued when a user starts a private chat with the bot
155 for the first time, we have to track it explicitly here.
156 """
157 user_name = update.effective_user.full_name
158 chat = update.effective_chat
159 if chat.type != Chat.PRIVATE or chat.id in context.bot_data.get("user_ids", set()):
160 return
161
162 logger.info("%s started a private chat with the bot", user_name)
163 context.bot_data.setdefault("user_ids", set()).add(chat.id)
164
165 await update.effective_message.reply_text(
166 f"Welcome {user_name}. Use /show_chats to see what chats I'm in."
167 )
168
169
170def main() -> None:
171 """Start the bot."""
172 # Create the Application and pass it your bot's token.
173 application = Application.builder().token("TOKEN").build()
174
175 # Keep track of which chats the bot is in
176 application.add_handler(ChatMemberHandler(track_chats, ChatMemberHandler.MY_CHAT_MEMBER))
177 application.add_handler(CommandHandler("show_chats", show_chats))
178
179 # Handle members joining/leaving chats.
180 application.add_handler(ChatMemberHandler(greet_chat_members, ChatMemberHandler.CHAT_MEMBER))
181
182 # Interpret any other command or text message as a start of a private chat.
183 # This will record the user as being in a private chat with bot.
184 application.add_handler(MessageHandler(filters.ALL, start_private_chat))
185
186 # Run the bot until the user presses Ctrl-C
187 # We pass 'allowed_updates' handle *all* updates including `chat_member` updates
188 # To reset this, simply pass `allowed_updates=[]`
189 application.run_polling(allowed_updates=Update.ALL_TYPES)
190
191
192if __name__ == "__main__":
193 main()