Vai al contenuto

telegram_app

telegram_app

Telegram bot for the application.

Telegram

Telegram bot for the application.

Source code in telegram_app.py
class Telegram:
    """
    Telegram bot for the application.
    """

    def __init__(self, token: str = None):
        """
        Initialize the Telegram bot.
        :param token: The token of the bot. If None, the default token is used.
        """
        if token is None:
            token = DefaultConfig().get('telegram_default')['token']
        self.application = (Application.builder().token(token).
                            concurrent_updates(True).build())
        self.bot = self.application.bot
        self.run = False
        self.__background_tasks = set()

        if DefaultConfig().get('plugins')['telegram']:
            commands = {
                'start': self.__handle_start,
                'help': self.__handle_help,
                'aiuto': self.__handle_help,
                'preset': self.__handle_preset,
                'screenshot': self.__handle_preview,
                'avvia_ora': self.__handle_start_live,
                'add_10m': self.__handle_10m,
                'termina': self.__handle_end_live
            }
            for cmd, handler in commands.items():
                self.application.add_handler(CommandHandler(cmd, handler))
            callbacks = {
                'h_preset': self.__handle_preset,
                'u_preset': self.__handle_preset,
                'h_screenshot': self.__handle_preview,
                'u_screenshot': self.__handle_preview,
                'h_avvia_ora': self.__handle_start_live,
                'h_add_10m': self.__handle_10m,
                'h_end_live': self.__handle_end_live,
                'u_end_live': self.__handle_end_live
            }
            for cb, handler in callbacks.items():
                self.application.add_handler(CallbackQueryHandler(handler, cb))
            handler = Telegram.__handle_message
            self.application.add_handler(MessageHandler(filters.ALL, handler))
            self.application.add_error_handler(self.__handle_errors)
            self.run = True
            self.task = asyncio.create_task(self.__start())

    async def __start(self) -> None:
        """
        Start the Telegram bot.
        """
        async with self.application:
            # Calls `initialize` and `shutdown`
            try:
                await self.application.start()
                await self.application.updater.start_polling()
                while self.run:
                    await asyncio.sleep(0.1)
            except BaseException as ex:  # pylint: disable=broad-except
                msg = f"Errore durante l'avvio del bot: {ex}"
                logging.error(msg)
            finally:
                try:
                    async with TaskGroup() as tg:
                        tg.create_task(self.application.updater.stop())
                        tg.create_task(self.application.stop())
                except BaseException as ex:  # pylint: disable=broad-except
                    msg = f"Errore durante la chiusura del bot: {ex}"
                    logging.error(msg)

    async def stop(self) -> None:
        """
        Stop the Telegram bot.
        """
        if self.run:
            self.run = False
            await self.task

    def send(self, message: Any, chat_id: str = None,
             exception: Exception = None) -> None:
        """
        Send a message using Telegram to the given chat.
        :param message: The message to send.
        :param chat_id: The chat id to send the message to.
        :param exception: The exception to include in the message.
        """
        # https://api.telegram.org/botTOKEN/getUpdates
        if chat_id is None:
            chat_id = DefaultConfig().get('telegram_default')['chat_id']
        message = str(message)
        if exception:
            message += f"\n\nERROR: {exception}"
        task = asyncio.create_task(self.__telegram_msg(message, chat_id))
        self.__background_tasks.add(task)
        task.add_done_callback(self.__background_tasks.discard)

    async def __telegram_msg(self, message: str, chat_id: str = None,
                             retry: int = 3) -> None:
        """
        Async method to send a message using Telegram to the given chat.
        :param message: The message to send.
        :param chat_id: The chat id to send the message to.
        :param retry: The number of times to retry the message sending.
        """
        try:
            await self.bot.send_message(chat_id, message, None, None, True,
                                        disable_web_page_preview=False)
        except asyncio.CancelledError:
            pass
        except BaseException as ex:  # pylint: disable=broad-except
            if retry == 0:
                msg = ("Impossibile inviare il seguente messaggio su "
                       f"Telegram: {message}")
                logging.error(msg, exc_info=ex)
            else:
                with suppress(BaseException):
                    await asyncio.sleep(1)
                    await self.__telegram_msg(message, chat_id, retry - 1)

    @staticmethod
    async def __authorized(update: Update, context: Context) -> bool:
        """
        Check if the user is authorized to use the bot.
        :param update: The update object.

        :return: True if the user is authorized, False otherwise.
        """
        auth = False
        if update.effective_user is not None:
            users = DefaultConfig().get('telegram_default')['auth_users']
            auth = (update.effective_user.username in users or
                    str(update.effective_user.id) in users)
        if not auth and update.effective_chat is not None:
            msg = "🚫 Non sei autorizzato a usare il bot!"
            await context.bot.send_message(update.effective_chat.id, msg)
        return auth

    @staticmethod
    async def __handle_start(update: Update, context: Context) -> None:
        """
        Handle the start command of the bot.
        :param update: The update object.
        :param context: The context object.
        """
        if not await Telegram.__authorized(update, context):
            return

        ora = datetime.now().hour
        saluto = "Buongiorno" if 4 <= ora <= 12 else (
            "Buon pomeriggio" if 12 < ora <= 17 else "Buonasera")
        msg = (f"{saluto} {update.effective_user.first_name} sono a tua "
               "disposizione!\nSe serve puoi chiedere /aiuto .")
        await update.effective_message.reply_text(msg)

    @staticmethod
    async def __handle_help(update: Update, context: Context) -> None:
        """
        Handle the help command of the bot.
        :param update: The update object.
        :param context: The context object.
        """
        if not await Telegram.__authorized(update, context):
            return

        screenshot = InlineKeyboardButton(text="🧿 Richiedi screenshot",
                                          callback_data='h_screenshot')
        preset = InlineKeyboardButton("✅ Attiva preset", None, 'h_preset')
        add_10m = InlineKeyboardButton("🕑 Aggiungi 10 minuti",
                                       callback_data='h_add_10m')
        avvia_ora = InlineKeyboardButton("🚀 Avvia in anticipo",
                                         callback_data='h_avvia_ora')
        end_live = InlineKeyboardButton("⛔ Termina live", None, 'h_end_live')
        markup = InlineKeyboardMarkup([[screenshot, preset],
                                       [avvia_ora, add_10m],
                                       [end_live]])
        msg = update.effective_message
        await msg.reply_text("Ecco cosa posso fare per te:", None, True, markup)

    @staticmethod
    async def __handle_preset(update: Update, context: Context) -> None:
        """
        Handle the preset command of the bot.
        :param update: The update object.
        :param context: The context object.
        """
        if not await Telegram.__authorized(update, context):
            return

        try:
            msg = update.effective_message
            query = update.callback_query
            presets = [InlineKeyboardButton(v, None, f'u_preset#{v}')
                       for v in DefaultConfig().get('presets')]
            markup = InlineKeyboardMarkup(list(chunked(presets, 3)))
            if query is not None:
                await query.answer()
            if query is not None and 'u_preset' in query.data:
                preset = query.data.split('#', 1)[1]
                async with asyncio.TaskGroup() as tg:
                    tg.create_task(call_preset(preset))
                    tg.create_task(query.edit_message_text(
                        "✅ Il preset è stato attivato!", None, markup))
                    tg.create_task(asyncio.sleep(3))
                await query.edit_message_text("Preset disponibili:",
                                              reply_markup=markup)
            else:
                await msg.reply_text("Preset disponibili:", None, True, markup)
        except error.BadRequest as ex:
            if 'message is not modified' not in str(ex):
                logging.warning("TELEGRAM BOT (preset) ERROR", exc_info=ex)
                msg = (f"☢ Mi spiace {update.effective_user.first_name}, "
                       "non riesco a soddisfare la tua richiesta.")
                await update.effective_message.reply_text(msg, None, True)
        except BaseException as ex:  # pylint: disable=broad-except
            logging.warning("TELEGRAM BOT (preset) ERROR", exc_info=ex)
            msg = (f"☢ Mi spiace {update.effective_user.first_name}, "
                   "non riesco a soddisfare la tua richiesta.")
            await update.effective_message.reply_text(msg, None, True)

    @staticmethod
    async def __handle_preview(update: Update, context: Context) -> None:
        """
        Handle the screenshot command of the bot.
        :param update: The update object.
        :param context: The context object.
        """
        if not await Telegram.__authorized(update, context):
            return

        try:
            msg = update.effective_message
            query = update.callback_query
            button = InlineKeyboardButton("🧿 Aggiornami", None, 'u_screenshot')
            markup = InlineKeyboardMarkup([[button]])
            if query is not None:
                await query.answer()
            ws = WebSocketClient(DefaultConfig().get('obs_url'),
                                 DefaultConfig().get('obs_pwd'))
            await ws.connect()
            await ws.wait_until_identified()
            r = await LiveEvent.obs_exec(ws, 'GetCurrentProgramScene')
            p = {'imageFormat': 'png',
                 'sourceUuid': r['sceneUuid'],
                 'imageWidth': 800,
                 'imageCompressionQuality': 80}
            r = await LiveEvent.obs_exec(ws, 'GetSourceScreenshot', p)
            await ws.disconnect()
            frame = base64.b64decode(r['imageData'].split(',')[-1])
            if query is not None and query.data in 'u_screenshot':
                await query.edit_message_media(InputMediaPhoto(frame),
                                               reply_markup=markup)
            else:
                await msg.reply_photo(frame, None, True, markup)
        except error.BadRequest as ex:
            if 'message is not modified' not in str(ex):
                logging.warning("TELEGRAM BOT (screenshot) ERROR", exc_info=ex)
                msg = (f"☢ Mi spiace {update.effective_user.first_name}, non "
                       f"riesco a soddisfare la tua richiesta.")
                await update.effective_message.reply_text(msg, None, True)
        except BaseException as ex:  # pylint: disable=broad-except
            logging.warning("TELEGRAM BOT (screenshot) ERROR", exc_info=ex)
            msg = (f"☢ Mi spiace {update.effective_user.first_name}, non "
                   f"riesco a soddisfare la tua richiesta.")
            await update.effective_message.reply_text(msg, None, True)

    async def __handle_start_live(self, update: Update,
                                  context: Context) -> None:
        """
        Handle the start-live command of the bot.
        :param update: The update object.
        :param context: The context object.
        """
        if not await Telegram.__authorized(update, context):
            return

        query = update.callback_query
        if query is not None:
            await query.answer()

        if StreamingStatus().is_live():
            msg = "⚠ La trasmissione è già in corso!"
        elif StreamingStatus().is_ready():
            msg = "❗ Non c'è alcuna trasmissione in corso."
        else:
            task = asyncio.create_task(StreamingStatus().get_owner().go_live(),
                                       name='TG_Anticipate_live')
            self.__background_tasks.add(task)
            msg = "✅ La trasmissione è iniziata!"
        await update.effective_message.reply_text(msg)

    @staticmethod
    async def __handle_10m(update: Update, context: Context) -> None:
        """
        Handle the add_10m command of the bot.
        :param update: The update object.
        :param context: The context object.
        """
        if not await Telegram.__authorized(update, context):
            return

        query = update.callback_query
        if query is not None:
            await query.answer()

        if StreamingStatus().is_live():
            StreamingStatus().increase_bonus_time()
            msg = "✅ Ho aggiunto 10 minuti alla trasmissione!"
        else:
            msg = "❗ Non c'è alcuna trasmissione in corso."
        await update.effective_message.reply_text(msg)

    @staticmethod
    async def __handle_end_live(update: Update, context: Context) -> None:
        """
        Handle the end-live command of the bot.
        :param update: The update object.
        :param context: The context object.
        """
        if not await Telegram.__authorized(update, context):
            return

        try:
            msg = update.effective_message
            query = update.callback_query
            if query is not None:
                await query.answer()

            if query is not None and 'u_end_live' in query.data:
                answer = query.data.split('#', 1)[1].strip()
                if answer == 'no':
                    msg = "🆗 Ok, come desideri!"
                elif answer == 'yes' and StreamingStatus().is_live():
                    StreamingStatus().stop_now()
                    msg = (f"✅ Perfetto {update.effective_user.first_name}, "
                           "ho fatto come hai richiesto!")
                else:
                    msg = "❗ Non c'è alcuna trasmissione in corso."
                await query.edit_message_text(msg)
            else:
                markup = InlineKeyboardMarkup([[
                    InlineKeyboardButton("✔ Si", None, 'u_end_live#yes'),
                    InlineKeyboardButton("❌ No", None, 'u_end_live#no')]])
                text = "⚠ Vuoi terminare la live?"
                await msg.reply_text(text, None, True, markup)
        except error.BadRequest as ex:
            if 'message is not modified' not in str(ex):
                logging.warning("TELEGRAM BOT (end-live) ERROR", exc_info=ex)
                msg = (f"☢ Mi spiace {update.effective_user.first_name}, non "
                       f"riesco a soddisfare la tua richiesta.")
                await update.effective_message.reply_text(msg, None, True)
        except BaseException as ex:  # pylint: disable=broad-except
            logging.warning("TELEGRAM BOT (end-live) ERROR", exc_info=ex)
            msg = (f"☢ Mi spiace {update.effective_user.first_name}, non "
                   f"riesco a soddisfare la tua richiesta.")
            await update.effective_message.reply_text(msg, None, True)

    @staticmethod
    async def __handle_message(update: Update, context: Context) -> None:
        """
        Handle the generic message sent to the bot. If the message contains
        'jarvis' call the start command, otherwise if the message contains
        'do your magic' write on the log the context object for inspection of
        the context content (e.g. the id and username of the user).
        :param update: The update object.
        :param context: The context object.
        """
        if 'jarvis' in update.effective_message.text.lower():
            await Telegram.__handle_start(update, context)
        elif 'do your magic' in update.effective_message.text.lower():
            logging.info(update)
            logging.info(context)

    @staticmethod
    async def __handle_errors(update: Optional[object], ctxt: Context) -> None:
        """
        Handle the errors raised by the bot.
        :param update: The update object.
        :param ctxt: The context object.
        """
        msg = f"TELEGRAM BOT H-ERROR\nUpdate: {update}\nERROR: {ctxt.error}"
        logging.warning(msg, exc_info=ctxt.error)

__authorized(update, context) async staticmethod

Check if the user is authorized to use the bot. :param update: The update object.

:return: True if the user is authorized, False otherwise.

Source code in telegram_app.py
@staticmethod
async def __authorized(update: Update, context: Context) -> bool:
    """
    Check if the user is authorized to use the bot.
    :param update: The update object.

    :return: True if the user is authorized, False otherwise.
    """
    auth = False
    if update.effective_user is not None:
        users = DefaultConfig().get('telegram_default')['auth_users']
        auth = (update.effective_user.username in users or
                str(update.effective_user.id) in users)
    if not auth and update.effective_chat is not None:
        msg = "🚫 Non sei autorizzato a usare il bot!"
        await context.bot.send_message(update.effective_chat.id, msg)
    return auth

__handle_10m(update, context) async staticmethod

Handle the add_10m command of the bot. :param update: The update object. :param context: The context object.

Source code in telegram_app.py
@staticmethod
async def __handle_10m(update: Update, context: Context) -> None:
    """
    Handle the add_10m command of the bot.
    :param update: The update object.
    :param context: The context object.
    """
    if not await Telegram.__authorized(update, context):
        return

    query = update.callback_query
    if query is not None:
        await query.answer()

    if StreamingStatus().is_live():
        StreamingStatus().increase_bonus_time()
        msg = "✅ Ho aggiunto 10 minuti alla trasmissione!"
    else:
        msg = "❗ Non c'è alcuna trasmissione in corso."
    await update.effective_message.reply_text(msg)

__handle_end_live(update, context) async staticmethod

Handle the end-live command of the bot. :param update: The update object. :param context: The context object.

Source code in telegram_app.py
@staticmethod
async def __handle_end_live(update: Update, context: Context) -> None:
    """
    Handle the end-live command of the bot.
    :param update: The update object.
    :param context: The context object.
    """
    if not await Telegram.__authorized(update, context):
        return

    try:
        msg = update.effective_message
        query = update.callback_query
        if query is not None:
            await query.answer()

        if query is not None and 'u_end_live' in query.data:
            answer = query.data.split('#', 1)[1].strip()
            if answer == 'no':
                msg = "🆗 Ok, come desideri!"
            elif answer == 'yes' and StreamingStatus().is_live():
                StreamingStatus().stop_now()
                msg = (f"✅ Perfetto {update.effective_user.first_name}, "
                       "ho fatto come hai richiesto!")
            else:
                msg = "❗ Non c'è alcuna trasmissione in corso."
            await query.edit_message_text(msg)
        else:
            markup = InlineKeyboardMarkup([[
                InlineKeyboardButton("✔ Si", None, 'u_end_live#yes'),
                InlineKeyboardButton("❌ No", None, 'u_end_live#no')]])
            text = "⚠ Vuoi terminare la live?"
            await msg.reply_text(text, None, True, markup)
    except error.BadRequest as ex:
        if 'message is not modified' not in str(ex):
            logging.warning("TELEGRAM BOT (end-live) ERROR", exc_info=ex)
            msg = (f"☢ Mi spiace {update.effective_user.first_name}, non "
                   f"riesco a soddisfare la tua richiesta.")
            await update.effective_message.reply_text(msg, None, True)
    except BaseException as ex:  # pylint: disable=broad-except
        logging.warning("TELEGRAM BOT (end-live) ERROR", exc_info=ex)
        msg = (f"☢ Mi spiace {update.effective_user.first_name}, non "
               f"riesco a soddisfare la tua richiesta.")
        await update.effective_message.reply_text(msg, None, True)

__handle_errors(update, ctxt) async staticmethod

Handle the errors raised by the bot. :param update: The update object. :param ctxt: The context object.

Source code in telegram_app.py
@staticmethod
async def __handle_errors(update: Optional[object], ctxt: Context) -> None:
    """
    Handle the errors raised by the bot.
    :param update: The update object.
    :param ctxt: The context object.
    """
    msg = f"TELEGRAM BOT H-ERROR\nUpdate: {update}\nERROR: {ctxt.error}"
    logging.warning(msg, exc_info=ctxt.error)

__handle_help(update, context) async staticmethod

Handle the help command of the bot. :param update: The update object. :param context: The context object.

Source code in telegram_app.py
@staticmethod
async def __handle_help(update: Update, context: Context) -> None:
    """
    Handle the help command of the bot.
    :param update: The update object.
    :param context: The context object.
    """
    if not await Telegram.__authorized(update, context):
        return

    screenshot = InlineKeyboardButton(text="🧿 Richiedi screenshot",
                                      callback_data='h_screenshot')
    preset = InlineKeyboardButton("✅ Attiva preset", None, 'h_preset')
    add_10m = InlineKeyboardButton("🕑 Aggiungi 10 minuti",
                                   callback_data='h_add_10m')
    avvia_ora = InlineKeyboardButton("🚀 Avvia in anticipo",
                                     callback_data='h_avvia_ora')
    end_live = InlineKeyboardButton("⛔ Termina live", None, 'h_end_live')
    markup = InlineKeyboardMarkup([[screenshot, preset],
                                   [avvia_ora, add_10m],
                                   [end_live]])
    msg = update.effective_message
    await msg.reply_text("Ecco cosa posso fare per te:", None, True, markup)

__handle_message(update, context) async staticmethod

Handle the generic message sent to the bot. If the message contains 'jarvis' call the start command, otherwise if the message contains 'do your magic' write on the log the context object for inspection of the context content (e.g. the id and username of the user). :param update: The update object. :param context: The context object.

Source code in telegram_app.py
@staticmethod
async def __handle_message(update: Update, context: Context) -> None:
    """
    Handle the generic message sent to the bot. If the message contains
    'jarvis' call the start command, otherwise if the message contains
    'do your magic' write on the log the context object for inspection of
    the context content (e.g. the id and username of the user).
    :param update: The update object.
    :param context: The context object.
    """
    if 'jarvis' in update.effective_message.text.lower():
        await Telegram.__handle_start(update, context)
    elif 'do your magic' in update.effective_message.text.lower():
        logging.info(update)
        logging.info(context)

__handle_preset(update, context) async staticmethod

Handle the preset command of the bot. :param update: The update object. :param context: The context object.

Source code in telegram_app.py
@staticmethod
async def __handle_preset(update: Update, context: Context) -> None:
    """
    Handle the preset command of the bot.
    :param update: The update object.
    :param context: The context object.
    """
    if not await Telegram.__authorized(update, context):
        return

    try:
        msg = update.effective_message
        query = update.callback_query
        presets = [InlineKeyboardButton(v, None, f'u_preset#{v}')
                   for v in DefaultConfig().get('presets')]
        markup = InlineKeyboardMarkup(list(chunked(presets, 3)))
        if query is not None:
            await query.answer()
        if query is not None and 'u_preset' in query.data:
            preset = query.data.split('#', 1)[1]
            async with asyncio.TaskGroup() as tg:
                tg.create_task(call_preset(preset))
                tg.create_task(query.edit_message_text(
                    "✅ Il preset è stato attivato!", None, markup))
                tg.create_task(asyncio.sleep(3))
            await query.edit_message_text("Preset disponibili:",
                                          reply_markup=markup)
        else:
            await msg.reply_text("Preset disponibili:", None, True, markup)
    except error.BadRequest as ex:
        if 'message is not modified' not in str(ex):
            logging.warning("TELEGRAM BOT (preset) ERROR", exc_info=ex)
            msg = (f"☢ Mi spiace {update.effective_user.first_name}, "
                   "non riesco a soddisfare la tua richiesta.")
            await update.effective_message.reply_text(msg, None, True)
    except BaseException as ex:  # pylint: disable=broad-except
        logging.warning("TELEGRAM BOT (preset) ERROR", exc_info=ex)
        msg = (f"☢ Mi spiace {update.effective_user.first_name}, "
               "non riesco a soddisfare la tua richiesta.")
        await update.effective_message.reply_text(msg, None, True)

__handle_preview(update, context) async staticmethod

Handle the screenshot command of the bot. :param update: The update object. :param context: The context object.

Source code in telegram_app.py
@staticmethod
async def __handle_preview(update: Update, context: Context) -> None:
    """
    Handle the screenshot command of the bot.
    :param update: The update object.
    :param context: The context object.
    """
    if not await Telegram.__authorized(update, context):
        return

    try:
        msg = update.effective_message
        query = update.callback_query
        button = InlineKeyboardButton("🧿 Aggiornami", None, 'u_screenshot')
        markup = InlineKeyboardMarkup([[button]])
        if query is not None:
            await query.answer()
        ws = WebSocketClient(DefaultConfig().get('obs_url'),
                             DefaultConfig().get('obs_pwd'))
        await ws.connect()
        await ws.wait_until_identified()
        r = await LiveEvent.obs_exec(ws, 'GetCurrentProgramScene')
        p = {'imageFormat': 'png',
             'sourceUuid': r['sceneUuid'],
             'imageWidth': 800,
             'imageCompressionQuality': 80}
        r = await LiveEvent.obs_exec(ws, 'GetSourceScreenshot', p)
        await ws.disconnect()
        frame = base64.b64decode(r['imageData'].split(',')[-1])
        if query is not None and query.data in 'u_screenshot':
            await query.edit_message_media(InputMediaPhoto(frame),
                                           reply_markup=markup)
        else:
            await msg.reply_photo(frame, None, True, markup)
    except error.BadRequest as ex:
        if 'message is not modified' not in str(ex):
            logging.warning("TELEGRAM BOT (screenshot) ERROR", exc_info=ex)
            msg = (f"☢ Mi spiace {update.effective_user.first_name}, non "
                   f"riesco a soddisfare la tua richiesta.")
            await update.effective_message.reply_text(msg, None, True)
    except BaseException as ex:  # pylint: disable=broad-except
        logging.warning("TELEGRAM BOT (screenshot) ERROR", exc_info=ex)
        msg = (f"☢ Mi spiace {update.effective_user.first_name}, non "
               f"riesco a soddisfare la tua richiesta.")
        await update.effective_message.reply_text(msg, None, True)

__handle_start(update, context) async staticmethod

Handle the start command of the bot. :param update: The update object. :param context: The context object.

Source code in telegram_app.py
@staticmethod
async def __handle_start(update: Update, context: Context) -> None:
    """
    Handle the start command of the bot.
    :param update: The update object.
    :param context: The context object.
    """
    if not await Telegram.__authorized(update, context):
        return

    ora = datetime.now().hour
    saluto = "Buongiorno" if 4 <= ora <= 12 else (
        "Buon pomeriggio" if 12 < ora <= 17 else "Buonasera")
    msg = (f"{saluto} {update.effective_user.first_name} sono a tua "
           "disposizione!\nSe serve puoi chiedere /aiuto .")
    await update.effective_message.reply_text(msg)

__handle_start_live(update, context) async

Handle the start-live command of the bot. :param update: The update object. :param context: The context object.

Source code in telegram_app.py
async def __handle_start_live(self, update: Update,
                              context: Context) -> None:
    """
    Handle the start-live command of the bot.
    :param update: The update object.
    :param context: The context object.
    """
    if not await Telegram.__authorized(update, context):
        return

    query = update.callback_query
    if query is not None:
        await query.answer()

    if StreamingStatus().is_live():
        msg = "⚠ La trasmissione è già in corso!"
    elif StreamingStatus().is_ready():
        msg = "❗ Non c'è alcuna trasmissione in corso."
    else:
        task = asyncio.create_task(StreamingStatus().get_owner().go_live(),
                                   name='TG_Anticipate_live')
        self.__background_tasks.add(task)
        msg = "✅ La trasmissione è iniziata!"
    await update.effective_message.reply_text(msg)

__init__(token=None)

Initialize the Telegram bot. :param token: The token of the bot. If None, the default token is used.

Source code in telegram_app.py
def __init__(self, token: str = None):
    """
    Initialize the Telegram bot.
    :param token: The token of the bot. If None, the default token is used.
    """
    if token is None:
        token = DefaultConfig().get('telegram_default')['token']
    self.application = (Application.builder().token(token).
                        concurrent_updates(True).build())
    self.bot = self.application.bot
    self.run = False
    self.__background_tasks = set()

    if DefaultConfig().get('plugins')['telegram']:
        commands = {
            'start': self.__handle_start,
            'help': self.__handle_help,
            'aiuto': self.__handle_help,
            'preset': self.__handle_preset,
            'screenshot': self.__handle_preview,
            'avvia_ora': self.__handle_start_live,
            'add_10m': self.__handle_10m,
            'termina': self.__handle_end_live
        }
        for cmd, handler in commands.items():
            self.application.add_handler(CommandHandler(cmd, handler))
        callbacks = {
            'h_preset': self.__handle_preset,
            'u_preset': self.__handle_preset,
            'h_screenshot': self.__handle_preview,
            'u_screenshot': self.__handle_preview,
            'h_avvia_ora': self.__handle_start_live,
            'h_add_10m': self.__handle_10m,
            'h_end_live': self.__handle_end_live,
            'u_end_live': self.__handle_end_live
        }
        for cb, handler in callbacks.items():
            self.application.add_handler(CallbackQueryHandler(handler, cb))
        handler = Telegram.__handle_message
        self.application.add_handler(MessageHandler(filters.ALL, handler))
        self.application.add_error_handler(self.__handle_errors)
        self.run = True
        self.task = asyncio.create_task(self.__start())

__start() async

Start the Telegram bot.

Source code in telegram_app.py
async def __start(self) -> None:
    """
    Start the Telegram bot.
    """
    async with self.application:
        # Calls `initialize` and `shutdown`
        try:
            await self.application.start()
            await self.application.updater.start_polling()
            while self.run:
                await asyncio.sleep(0.1)
        except BaseException as ex:  # pylint: disable=broad-except
            msg = f"Errore durante l'avvio del bot: {ex}"
            logging.error(msg)
        finally:
            try:
                async with TaskGroup() as tg:
                    tg.create_task(self.application.updater.stop())
                    tg.create_task(self.application.stop())
            except BaseException as ex:  # pylint: disable=broad-except
                msg = f"Errore durante la chiusura del bot: {ex}"
                logging.error(msg)

__telegram_msg(message, chat_id=None, retry=3) async

Async method to send a message using Telegram to the given chat. :param message: The message to send. :param chat_id: The chat id to send the message to. :param retry: The number of times to retry the message sending.

Source code in telegram_app.py
async def __telegram_msg(self, message: str, chat_id: str = None,
                         retry: int = 3) -> None:
    """
    Async method to send a message using Telegram to the given chat.
    :param message: The message to send.
    :param chat_id: The chat id to send the message to.
    :param retry: The number of times to retry the message sending.
    """
    try:
        await self.bot.send_message(chat_id, message, None, None, True,
                                    disable_web_page_preview=False)
    except asyncio.CancelledError:
        pass
    except BaseException as ex:  # pylint: disable=broad-except
        if retry == 0:
            msg = ("Impossibile inviare il seguente messaggio su "
                   f"Telegram: {message}")
            logging.error(msg, exc_info=ex)
        else:
            with suppress(BaseException):
                await asyncio.sleep(1)
                await self.__telegram_msg(message, chat_id, retry - 1)

send(message, chat_id=None, exception=None)

Send a message using Telegram to the given chat. :param message: The message to send. :param chat_id: The chat id to send the message to. :param exception: The exception to include in the message.

Source code in telegram_app.py
def send(self, message: Any, chat_id: str = None,
         exception: Exception = None) -> None:
    """
    Send a message using Telegram to the given chat.
    :param message: The message to send.
    :param chat_id: The chat id to send the message to.
    :param exception: The exception to include in the message.
    """
    # https://api.telegram.org/botTOKEN/getUpdates
    if chat_id is None:
        chat_id = DefaultConfig().get('telegram_default')['chat_id']
    message = str(message)
    if exception:
        message += f"\n\nERROR: {exception}"
    task = asyncio.create_task(self.__telegram_msg(message, chat_id))
    self.__background_tasks.add(task)
    task.add_done_callback(self.__background_tasks.discard)

stop() async

Stop the Telegram bot.

Source code in telegram_app.py
async def stop(self) -> None:
    """
    Stop the Telegram bot.
    """
    if self.run:
        self.run = False
        await self.task