1#!/usr/bin/env python
2# pylint: disable=unused-argument
3# This program is dedicated to the public domain under the CC0 license.
4
5"""
6Simple Bot to handle '(my_)chat_member' updates.
7Greets new users & keeps track of which chats the bot is in.
8
9Usage:
10Press Ctrl-C on the command line or send a signal to the process to stop the
11bot.
12"""
13
14import logging
15from typing import Optional, Tuple
16
17from telegram import Chat, ChatMember, ChatMemberUpdated, Update
18from telegram.constants import ParseMode
19from telegram.ext import (
20 Application,
21 ChatMemberHandler,
22 CommandHandler,
23 ContextTypes,
24 MessageHandler,
25 filters,
26)
27
28# Enable logging
29
30logging.basicConfig(
31 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
32)
33
34# set higher logging level for httpx to avoid all GET and POST requests being logged
35logging.getLogger("httpx").setLevel(logging.WARNING)
36
37logger = logging.getLogger(__name__)
38
39
40def extract_status_change(chat_member_update: ChatMemberUpdated) -> Optional[Tuple[bool, bool]]:
41 """Takes a ChatMemberUpdated instance and extracts whether the 'old_chat_member' was a member
42 of the chat and whether the 'new_chat_member' is a member of the chat. Returns None, if
43 the status didn't change.
44 """
45 status_change = chat_member_update.difference().get("status")
46 old_is_member, new_is_member = chat_member_update.difference().get("is_member", (None, None))
47
48 if status_change is None:
49 return None
50
51 old_status, new_status = status_change
52 was_member = old_status in [
53 ChatMember.MEMBER,
54 ChatMember.OWNER,
55 ChatMember.ADMINISTRATOR,
56 ] or (old_status == ChatMember.RESTRICTED and old_is_member is True)
57 is_member = new_status in [
58 ChatMember.MEMBER,
59 ChatMember.OWNER,
60 ChatMember.ADMINISTRATOR,
61 ] or (new_status == ChatMember.RESTRICTED and new_is_member is True)
62
63 return was_member, is_member
64
65
66async def track_chats(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
67 """Tracks the chats the bot is in."""
68 result = extract_status_change(update.my_chat_member)
69 if result is None:
70 return
71 was_member, is_member = result
72
73 # Let's check who is responsible for the change
74 cause_name = update.effective_user.full_name
75
76 # Handle chat types differently:
77 chat = update.effective_chat
78 if chat.type == Chat.PRIVATE:
79 if not was_member and is_member:
80 # This may not be really needed in practice because most clients will automatically
81 # send a /start command after the user unblocks the bot, and start_private_chat()
82 # will add the user to "user_ids".
83 # We're including this here for the sake of the example.
84 logger.info("%s unblocked the bot", cause_name)
85 context.bot_data.setdefault("user_ids", set()).add(chat.id)
86 elif was_member and not is_member:
87 logger.info("%s blocked the bot", cause_name)
88 context.bot_data.setdefault("user_ids", set()).discard(chat.id)
89 elif chat.type in [Chat.GROUP, Chat.SUPERGROUP]:
90 if not was_member and is_member:
91 logger.info("%s added the bot to the group %s", cause_name, chat.title)
92 context.bot_data.setdefault("group_ids", set()).add(chat.id)
93 elif was_member and not is_member:
94 logger.info("%s removed the bot from the group %s", cause_name, chat.title)
95 context.bot_data.setdefault("group_ids", set()).discard(chat.id)
96 elif not was_member and is_member:
97 logger.info("%s added the bot to the channel %s", cause_name, chat.title)
98 context.bot_data.setdefault("channel_ids", set()).add(chat.id)
99 elif was_member and not is_member:
100 logger.info("%s removed the bot from the channel %s", cause_name, chat.title)
101 context.bot_data.setdefault("channel_ids", set()).discard(chat.id)
102
103
104async def show_chats(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
105 """Shows which chats the bot is in"""
106 user_ids = ", ".join(str(uid) for uid in context.bot_data.setdefault("user_ids", set()))
107 group_ids = ", ".join(str(gid) for gid in context.bot_data.setdefault("group_ids", set()))
108 channel_ids = ", ".join(str(cid) for cid in context.bot_data.setdefault("channel_ids", set()))
109 text = (
110 f"@{context.bot.username} is currently in a conversation with the user IDs {user_ids}."
111 f" Moreover it is a member of the groups with IDs {group_ids} "
112 f"and administrator in the channels with IDs {channel_ids}."
113 )
114 await update.effective_message.reply_text(text)
115
116
117async def greet_chat_members(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
118 """Greets new users in chats and announces when someone leaves"""
119 result = extract_status_change(update.chat_member)
120 if result is None:
121 return
122
123 was_member, is_member = result
124 cause_name = update.chat_member.from_user.mention_html()
125 member_name = update.chat_member.new_chat_member.user.mention_html()
126
127 if not was_member and is_member:
128 await update.effective_chat.send_message(
129 f"{member_name} was added by {cause_name}. Welcome!",
130 parse_mode=ParseMode.HTML,
131 )
132 elif was_member and not is_member:
133 await update.effective_chat.send_message(
134 f"{member_name} is no longer with us. Thanks a lot, {cause_name} ...",
135 parse_mode=ParseMode.HTML,
136 )
137
138
139async def start_private_chat(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
140 """Greets the user and records that they started a chat with the bot if it's a private chat.
141 Since no `my_chat_member` update is issued when a user starts a private chat with the bot
142 for the first time, we have to track it explicitly here.
143 """
144 user_name = update.effective_user.full_name
145 chat = update.effective_chat
146 if chat.type != Chat.PRIVATE or chat.id in context.bot_data.get("user_ids", set()):
147 return
148
149 logger.info("%s started a private chat with the bot", user_name)
150 context.bot_data.setdefault("user_ids", set()).add(chat.id)
151
152 await update.effective_message.reply_text(
153 f"Welcome {user_name}. Use /show_chats to see what chats I'm in."
154 )
155
156
157def main() -> None:
158 """Start the bot."""
159 # Create the Application and pass it your bot's token.
160 application = Application.builder().token("TOKEN").build()
161
162 # Keep track of which chats the bot is in
163 application.add_handler(ChatMemberHandler(track_chats, ChatMemberHandler.MY_CHAT_MEMBER))
164 application.add_handler(CommandHandler("show_chats", show_chats))
165
166 # Handle members joining/leaving chats.
167 application.add_handler(ChatMemberHandler(greet_chat_members, ChatMemberHandler.CHAT_MEMBER))
168
169 # Interpret any other command or text message as a start of a private chat.
170 # This will record the user as being in a private chat with bot.
171 application.add_handler(MessageHandler(filters.ALL, start_private_chat))
172
173 # Run the bot until the user presses Ctrl-C
174 # We pass 'allowed_updates' handle *all* updates including `chat_member` updates
175 # To reset this, simply pass `allowed_updates=[]`
176 application.run_polling(allowed_updates=Update.ALL_TYPES)
177
178
179if __name__ == "__main__":
180 main()