nestedconversationbot.py

  1#!/usr/bin/env python
  2# pylint: disable=unused-argument
  3# This program is dedicated to the public domain under the CC0 license.
  4
  5"""
  6First, a few callback functions are defined. Then, those functions are passed to
  7the Application and registered at their respective places.
  8Then, the bot is started and runs until we press Ctrl-C on the command line.
  9
 10Usage:
 11Example of a bot-user conversation using nested ConversationHandlers.
 12Send /start to initiate the conversation.
 13Press Ctrl-C on the command line or send a signal to the process to stop the
 14bot.
 15"""
 16
 17import logging
 18from typing import Any, Dict, Tuple
 19
 20from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update
 21from telegram.ext import (
 22    Application,
 23    CallbackQueryHandler,
 24    CommandHandler,
 25    ContextTypes,
 26    ConversationHandler,
 27    MessageHandler,
 28    filters,
 29)
 30
 31# Enable logging
 32logging.basicConfig(
 33    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
 34)
 35# set higher logging level for httpx to avoid all GET and POST requests being logged
 36logging.getLogger("httpx").setLevel(logging.WARNING)
 37
 38logger = logging.getLogger(__name__)
 39
 40# State definitions for top level conversation
 41SELECTING_ACTION, ADDING_MEMBER, ADDING_SELF, DESCRIBING_SELF = map(chr, range(4))
 42# State definitions for second level conversation
 43SELECTING_LEVEL, SELECTING_GENDER = map(chr, range(4, 6))
 44# State definitions for descriptions conversation
 45SELECTING_FEATURE, TYPING = map(chr, range(6, 8))
 46# Meta states
 47STOPPING, SHOWING = map(chr, range(8, 10))
 48# Shortcut for ConversationHandler.END
 49END = ConversationHandler.END
 50
 51# Different constants for this example
 52(
 53    PARENTS,
 54    CHILDREN,
 55    SELF,
 56    GENDER,
 57    MALE,
 58    FEMALE,
 59    AGE,
 60    NAME,
 61    START_OVER,
 62    FEATURES,
 63    CURRENT_FEATURE,
 64    CURRENT_LEVEL,
 65) = map(chr, range(10, 22))
 66
 67
 68# Helper
 69def _name_switcher(level: str) -> Tuple[str, str]:
 70    if level == PARENTS:
 71        return "Father", "Mother"
 72    return "Brother", "Sister"
 73
 74
 75# Top level conversation callbacks
 76async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> str:
 77    """Select an action: Adding parent/child or show data."""
 78    text = (
 79        "You may choose to add a family member, yourself, show the gathered data, or end the "
 80        "conversation. To abort, simply type /stop."
 81    )
 82
 83    buttons = [
 84        [
 85            InlineKeyboardButton(text="Add family member", callback_data=str(ADDING_MEMBER)),
 86            InlineKeyboardButton(text="Add yourself", callback_data=str(ADDING_SELF)),
 87        ],
 88        [
 89            InlineKeyboardButton(text="Show data", callback_data=str(SHOWING)),
 90            InlineKeyboardButton(text="Done", callback_data=str(END)),
 91        ],
 92    ]
 93    keyboard = InlineKeyboardMarkup(buttons)
 94
 95    # If we're starting over we don't need to send a new message
 96    if context.user_data.get(START_OVER):
 97        await update.callback_query.answer()
 98        await update.callback_query.edit_message_text(text=text, reply_markup=keyboard)
 99    else:
100        await update.message.reply_text(
101            "Hi, I'm Family Bot and I'm here to help you gather information about your family."
102        )
103        await update.message.reply_text(text=text, reply_markup=keyboard)
104
105    context.user_data[START_OVER] = False
106    return SELECTING_ACTION
107
108
109async def adding_self(update: Update, context: ContextTypes.DEFAULT_TYPE) -> str:
110    """Add information about yourself."""
111    context.user_data[CURRENT_LEVEL] = SELF
112    text = "Okay, please tell me about yourself."
113    button = InlineKeyboardButton(text="Add info", callback_data=str(MALE))
114    keyboard = InlineKeyboardMarkup.from_button(button)
115
116    await update.callback_query.answer()
117    await update.callback_query.edit_message_text(text=text, reply_markup=keyboard)
118
119    return DESCRIBING_SELF
120
121
122async def show_data(update: Update, context: ContextTypes.DEFAULT_TYPE) -> str:
123    """Pretty print gathered data."""
124
125    def pretty_print(data: Dict[str, Any], level: str) -> str:
126        people = data.get(level)
127        if not people:
128            return "\nNo information yet."
129
130        return_str = ""
131        if level == SELF:
132            for person in data[level]:
133                return_str += f"\nName: {person.get(NAME, '-')}, Age: {person.get(AGE, '-')}"
134        else:
135            male, female = _name_switcher(level)
136
137            for person in data[level]:
138                gender = female if person[GENDER] == FEMALE else male
139                return_str += (
140                    f"\n{gender}: Name: {person.get(NAME, '-')}, Age: {person.get(AGE, '-')}"
141                )
142        return return_str
143
144    user_data = context.user_data
145    text = f"Yourself:{pretty_print(user_data, SELF)}"
146    text += f"\n\nParents:{pretty_print(user_data, PARENTS)}"
147    text += f"\n\nChildren:{pretty_print(user_data, CHILDREN)}"
148
149    buttons = [[InlineKeyboardButton(text="Back", callback_data=str(END))]]
150    keyboard = InlineKeyboardMarkup(buttons)
151
152    await update.callback_query.answer()
153    await update.callback_query.edit_message_text(text=text, reply_markup=keyboard)
154    user_data[START_OVER] = True
155
156    return SHOWING
157
158
159async def stop(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
160    """End Conversation by command."""
161    await update.message.reply_text("Okay, bye.")
162
163    return END
164
165
166async def end(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
167    """End conversation from InlineKeyboardButton."""
168    await update.callback_query.answer()
169
170    text = "See you around!"
171    await update.callback_query.edit_message_text(text=text)
172
173    return END
174
175
176# Second level conversation callbacks
177async def select_level(update: Update, context: ContextTypes.DEFAULT_TYPE) -> str:
178    """Choose to add a parent or a child."""
179    text = "You may add a parent or a child. Also you can show the gathered data or go back."
180    buttons = [
181        [
182            InlineKeyboardButton(text="Add parent", callback_data=str(PARENTS)),
183            InlineKeyboardButton(text="Add child", callback_data=str(CHILDREN)),
184        ],
185        [
186            InlineKeyboardButton(text="Show data", callback_data=str(SHOWING)),
187            InlineKeyboardButton(text="Back", callback_data=str(END)),
188        ],
189    ]
190    keyboard = InlineKeyboardMarkup(buttons)
191
192    await update.callback_query.answer()
193    await update.callback_query.edit_message_text(text=text, reply_markup=keyboard)
194
195    return SELECTING_LEVEL
196
197
198async def select_gender(update: Update, context: ContextTypes.DEFAULT_TYPE) -> str:
199    """Choose to add mother or father."""
200    level = update.callback_query.data
201    context.user_data[CURRENT_LEVEL] = level
202
203    text = "Please choose, whom to add."
204
205    male, female = _name_switcher(level)
206
207    buttons = [
208        [
209            InlineKeyboardButton(text=f"Add {male}", callback_data=str(MALE)),
210            InlineKeyboardButton(text=f"Add {female}", callback_data=str(FEMALE)),
211        ],
212        [
213            InlineKeyboardButton(text="Show data", callback_data=str(SHOWING)),
214            InlineKeyboardButton(text="Back", callback_data=str(END)),
215        ],
216    ]
217    keyboard = InlineKeyboardMarkup(buttons)
218
219    await update.callback_query.answer()
220    await update.callback_query.edit_message_text(text=text, reply_markup=keyboard)
221
222    return SELECTING_GENDER
223
224
225async def end_second_level(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
226    """Return to top level conversation."""
227    context.user_data[START_OVER] = True
228    await start(update, context)
229
230    return END
231
232
233# Third level callbacks
234async def select_feature(update: Update, context: ContextTypes.DEFAULT_TYPE) -> str:
235    """Select a feature to update for the person."""
236    buttons = [
237        [
238            InlineKeyboardButton(text="Name", callback_data=str(NAME)),
239            InlineKeyboardButton(text="Age", callback_data=str(AGE)),
240            InlineKeyboardButton(text="Done", callback_data=str(END)),
241        ]
242    ]
243    keyboard = InlineKeyboardMarkup(buttons)
244
245    # If we collect features for a new person, clear the cache and save the gender
246    if not context.user_data.get(START_OVER):
247        context.user_data[FEATURES] = {GENDER: update.callback_query.data}
248        text = "Please select a feature to update."
249
250        await update.callback_query.answer()
251        await update.callback_query.edit_message_text(text=text, reply_markup=keyboard)
252    # But after we do that, we need to send a new message
253    else:
254        text = "Got it! Please select a feature to update."
255        await update.message.reply_text(text=text, reply_markup=keyboard)
256
257    context.user_data[START_OVER] = False
258    return SELECTING_FEATURE
259
260
261async def ask_for_input(update: Update, context: ContextTypes.DEFAULT_TYPE) -> str:
262    """Prompt user to input data for selected feature."""
263    context.user_data[CURRENT_FEATURE] = update.callback_query.data
264    text = "Okay, tell me."
265
266    await update.callback_query.answer()
267    await update.callback_query.edit_message_text(text=text)
268
269    return TYPING
270
271
272async def save_input(update: Update, context: ContextTypes.DEFAULT_TYPE) -> str:
273    """Save input for feature and return to feature selection."""
274    user_data = context.user_data
275    user_data[FEATURES][user_data[CURRENT_FEATURE]] = update.message.text
276
277    user_data[START_OVER] = True
278
279    return await select_feature(update, context)
280
281
282async def end_describing(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
283    """End gathering of features and return to parent conversation."""
284    user_data = context.user_data
285    level = user_data[CURRENT_LEVEL]
286    if not user_data.get(level):
287        user_data[level] = []
288    user_data[level].append(user_data[FEATURES])
289
290    # Print upper level menu
291    if level == SELF:
292        user_data[START_OVER] = True
293        await start(update, context)
294    else:
295        await select_level(update, context)
296
297    return END
298
299
300async def stop_nested(update: Update, context: ContextTypes.DEFAULT_TYPE) -> str:
301    """Completely end conversation from within nested conversation."""
302    await update.message.reply_text("Okay, bye.")
303
304    return STOPPING
305
306
307def main() -> None:
308    """Run the bot."""
309    # Create the Application and pass it your bot's token.
310    application = Application.builder().token("TOKEN").build()
311
312    # Set up third level ConversationHandler (collecting features)
313    description_conv = ConversationHandler(
314        entry_points=[
315            CallbackQueryHandler(
316                select_feature, pattern="^" + str(MALE) + "$|^" + str(FEMALE) + "$"
317            )
318        ],
319        states={
320            SELECTING_FEATURE: [
321                CallbackQueryHandler(ask_for_input, pattern="^(?!" + str(END) + ").*$")
322            ],
323            TYPING: [MessageHandler(filters.TEXT & ~filters.COMMAND, save_input)],
324        },
325        fallbacks=[
326            CallbackQueryHandler(end_describing, pattern="^" + str(END) + "$"),
327            CommandHandler("stop", stop_nested),
328        ],
329        map_to_parent={
330            # Return to second level menu
331            END: SELECTING_LEVEL,
332            # End conversation altogether
333            STOPPING: STOPPING,
334        },
335    )
336
337    # Set up second level ConversationHandler (adding a person)
338    add_member_conv = ConversationHandler(
339        entry_points=[CallbackQueryHandler(select_level, pattern="^" + str(ADDING_MEMBER) + "$")],
340        states={
341            SELECTING_LEVEL: [
342                CallbackQueryHandler(select_gender, pattern=f"^{PARENTS}$|^{CHILDREN}$")
343            ],
344            SELECTING_GENDER: [description_conv],
345        },
346        fallbacks=[
347            CallbackQueryHandler(show_data, pattern="^" + str(SHOWING) + "$"),
348            CallbackQueryHandler(end_second_level, pattern="^" + str(END) + "$"),
349            CommandHandler("stop", stop_nested),
350        ],
351        map_to_parent={
352            # After showing data return to top level menu
353            SHOWING: SHOWING,
354            # Return to top level menu
355            END: SELECTING_ACTION,
356            # End conversation altogether
357            STOPPING: END,
358        },
359    )
360
361    # Set up top level ConversationHandler (selecting action)
362    # Because the states of the third level conversation map to the ones of the second level
363    # conversation, we need to make sure the top level conversation can also handle them
364    selection_handlers = [
365        add_member_conv,
366        CallbackQueryHandler(show_data, pattern="^" + str(SHOWING) + "$"),
367        CallbackQueryHandler(adding_self, pattern="^" + str(ADDING_SELF) + "$"),
368        CallbackQueryHandler(end, pattern="^" + str(END) + "$"),
369    ]
370    conv_handler = ConversationHandler(
371        entry_points=[CommandHandler("start", start)],
372        states={
373            SHOWING: [CallbackQueryHandler(start, pattern="^" + str(END) + "$")],
374            SELECTING_ACTION: selection_handlers,  # type: ignore[dict-item]
375            SELECTING_LEVEL: selection_handlers,  # type: ignore[dict-item]
376            DESCRIBING_SELF: [description_conv],
377            STOPPING: [CommandHandler("start", start)],
378        },
379        fallbacks=[CommandHandler("stop", stop)],
380    )
381
382    application.add_handler(conv_handler)
383
384    # Run the bot until the user presses Ctrl-C
385    application.run_polling(allowed_updates=Update.ALL_TYPES)
386
387
388if __name__ == "__main__":
389    main()

State Diagram

flowchart TB %% Documentation: https://mermaid-js.github.io/mermaid/#/flowchart A(("/start")):::entryPoint -->|Hi! I'm FamilyBot...| B((SELECTING_ACTION)):::state B --> C("Show Data"):::userInput C --> |"(List of gathered data)"| D((SHOWING)):::state D --> E("Back"):::userInput E --> B B --> F("Add Yourself"):::userInput F --> G(("DESCRIBING_SELF")):::state G --> H("Add info"):::userInput H --> I((SELECT_FEATURE)):::state I --> |"Please select a feature to update. <br /> - Name <br /> - Age <br /> - Done"|J("(choice)"):::userInput J --> |"Okay, tell me."| K((TYPING)):::state K --> L("(text)"):::userInput L --> |"[saving]"|I I --> M("Done"):::userInput M --> B B --> N("Add family member"):::userInput R --> I W --> |"See you around!"|End(("END")):::termination Y(("ANY STATE")):::state --> Z("/stop"):::userInput Z -->|"Okay, bye."| End B --> W("Done"):::userInput subgraph nestedConversation[Nested Conversation: Add Family Member] direction BT N --> O(("SELECT_LEVEL")):::state O --> |"Add... <br /> - Add Parent <br /> - Add Child <br />"|P("(choice)"):::userInput P --> Q(("SELECT_GENDER")):::state Q --> |"- Mother <br /> - Father <br /> / <br /> - Sister <br /> - Brother"| R("(choice)"):::userInput Q --> V("Show Data"):::userInput Q --> T(("SELECTING_ACTION")):::state Q --> U("Back"):::userInput U --> T O --> U O --> V V --> S(("SHOWING")):::state V --> T end classDef userInput fill:#2a5279, color:#ffffff, stroke:#ffffff classDef state fill:#222222, color:#ffffff, stroke:#ffffff classDef entryPoint fill:#009c11, stroke:#42FF57, color:#ffffff classDef termination fill:#bb0007, stroke:#E60109, color:#ffffff style nestedConversation fill:#999999, stroke-width:2px, stroke:#333333