class Telegram:
"""
Telegram bot for the application.
"""
def __init__(self, token: str = None):
"""
Initialize the Telegram bot.
:param token: The token of the bot. If None, the default token is used.
"""
if token is None:
token = DefaultConfig().get('telegram_default')['token']
self.application = (Application.builder().token(token).
concurrent_updates(True).build())
self.bot = self.application.bot
self.run = False
self.__background_tasks = set()
if DefaultConfig().get('plugins')['telegram']:
commands = {
'start': self.__handle_start,
'help': self.__handle_help,
'aiuto': self.__handle_help,
'preset': self.__handle_preset,
'screenshot': self.__handle_preview,
'avvia_ora': self.__handle_start_live,
'add_10m': self.__handle_10m,
'termina': self.__handle_end_live
}
for cmd, handler in commands.items():
self.application.add_handler(CommandHandler(cmd, handler))
callbacks = {
'h_preset': self.__handle_preset,
'u_preset': self.__handle_preset,
'h_screenshot': self.__handle_preview,
'u_screenshot': self.__handle_preview,
'h_avvia_ora': self.__handle_start_live,
'h_add_10m': self.__handle_10m,
'h_end_live': self.__handle_end_live,
'u_end_live': self.__handle_end_live
}
for cb, handler in callbacks.items():
self.application.add_handler(CallbackQueryHandler(handler, cb))
handler = Telegram.__handle_message
self.application.add_handler(MessageHandler(filters.ALL, handler))
self.application.add_error_handler(self.__handle_errors)
self.run = True
self.task = asyncio.create_task(self.__start())
async def __start(self) -> None:
"""
Start the Telegram bot.
"""
async with self.application:
# Calls `initialize` and `shutdown`
try:
await self.application.start()
await self.application.updater.start_polling()
while self.run:
await asyncio.sleep(0.1)
except BaseException as ex: # pylint: disable=broad-except
msg = f"Errore durante l'avvio del bot: {ex}"
logging.error(msg)
finally:
try:
async with TaskGroup() as tg:
tg.create_task(self.application.updater.stop())
tg.create_task(self.application.stop())
except BaseException as ex: # pylint: disable=broad-except
msg = f"Errore durante la chiusura del bot: {ex}"
logging.error(msg)
async def stop(self) -> None:
"""
Stop the Telegram bot.
"""
if self.run:
self.run = False
await self.task
def send(self, message: Any, chat_id: str = None,
exception: Exception = None) -> None:
"""
Send a message using Telegram to the given chat.
:param message: The message to send.
:param chat_id: The chat id to send the message to.
:param exception: The exception to include in the message.
"""
# https://api.telegram.org/botTOKEN/getUpdates
if chat_id is None:
chat_id = DefaultConfig().get('telegram_default')['chat_id']
message = str(message)
if exception:
message += f"\n\nERROR: {exception}"
task = asyncio.create_task(self.__telegram_msg(message, chat_id))
self.__background_tasks.add(task)
task.add_done_callback(self.__background_tasks.discard)
async def __telegram_msg(self, message: str, chat_id: str = None,
retry: int = 3) -> None:
"""
Async method to send a message using Telegram to the given chat.
:param message: The message to send.
:param chat_id: The chat id to send the message to.
:param retry: The number of times to retry the message sending.
"""
try:
await self.bot.send_message(chat_id, message, None, None, True,
disable_web_page_preview=False)
except asyncio.CancelledError:
pass
except BaseException as ex: # pylint: disable=broad-except
if retry == 0:
msg = ("Impossibile inviare il seguente messaggio su "
f"Telegram: {message}")
logging.error(msg, exc_info=ex)
else:
with suppress(BaseException):
await asyncio.sleep(1)
await self.__telegram_msg(message, chat_id, retry - 1)
@staticmethod
async def __authorized(update: Update, context: Context) -> bool:
"""
Check if the user is authorized to use the bot.
:param update: The update object.
:return: True if the user is authorized, False otherwise.
"""
auth = False
if update.effective_user is not None:
users = DefaultConfig().get('telegram_default')['auth_users']
auth = (update.effective_user.username in users or
str(update.effective_user.id) in users)
if not auth and update.effective_chat is not None:
msg = "🚫 Non sei autorizzato a usare il bot!"
await context.bot.send_message(update.effective_chat.id, msg)
return auth
@staticmethod
async def __handle_start(update: Update, context: Context) -> None:
"""
Handle the start command of the bot.
:param update: The update object.
:param context: The context object.
"""
if not await Telegram.__authorized(update, context):
return
ora = datetime.now().hour
saluto = "Buongiorno" if 4 <= ora <= 12 else (
"Buon pomeriggio" if 12 < ora <= 17 else "Buonasera")
msg = (f"{saluto} {update.effective_user.first_name} sono a tua "
"disposizione!\nSe serve puoi chiedere /aiuto .")
await update.effective_message.reply_text(msg)
@staticmethod
async def __handle_help(update: Update, context: Context) -> None:
"""
Handle the help command of the bot.
:param update: The update object.
:param context: The context object.
"""
if not await Telegram.__authorized(update, context):
return
screenshot = InlineKeyboardButton(text="🧿 Richiedi screenshot",
callback_data='h_screenshot')
preset = InlineKeyboardButton("✅ Attiva preset", None, 'h_preset')
add_10m = InlineKeyboardButton("🕑 Aggiungi 10 minuti",
callback_data='h_add_10m')
avvia_ora = InlineKeyboardButton("🚀 Avvia in anticipo",
callback_data='h_avvia_ora')
end_live = InlineKeyboardButton("⛔ Termina live", None, 'h_end_live')
markup = InlineKeyboardMarkup([[screenshot, preset],
[avvia_ora, add_10m],
[end_live]])
msg = update.effective_message
await msg.reply_text("Ecco cosa posso fare per te:", None, True, markup)
@staticmethod
async def __handle_preset(update: Update, context: Context) -> None:
"""
Handle the preset command of the bot.
:param update: The update object.
:param context: The context object.
"""
if not await Telegram.__authorized(update, context):
return
try:
msg = update.effective_message
query = update.callback_query
presets = [InlineKeyboardButton(v, None, f'u_preset#{v}')
for v in DefaultConfig().get('presets')]
markup = InlineKeyboardMarkup(list(chunked(presets, 3)))
if query is not None:
await query.answer()
if query is not None and 'u_preset' in query.data:
preset = query.data.split('#', 1)[1]
async with asyncio.TaskGroup() as tg:
tg.create_task(call_preset(preset))
tg.create_task(query.edit_message_text(
"✅ Il preset è stato attivato!", None, markup))
tg.create_task(asyncio.sleep(3))
await query.edit_message_text("Preset disponibili:",
reply_markup=markup)
else:
await msg.reply_text("Preset disponibili:", None, True, markup)
except error.BadRequest as ex:
if 'message is not modified' not in str(ex):
logging.warning("TELEGRAM BOT (preset) ERROR", exc_info=ex)
msg = (f"☢ Mi spiace {update.effective_user.first_name}, "
"non riesco a soddisfare la tua richiesta.")
await update.effective_message.reply_text(msg, None, True)
except BaseException as ex: # pylint: disable=broad-except
logging.warning("TELEGRAM BOT (preset) ERROR", exc_info=ex)
msg = (f"☢ Mi spiace {update.effective_user.first_name}, "
"non riesco a soddisfare la tua richiesta.")
await update.effective_message.reply_text(msg, None, True)
@staticmethod
async def __handle_preview(update: Update, context: Context) -> None:
"""
Handle the screenshot command of the bot.
:param update: The update object.
:param context: The context object.
"""
if not await Telegram.__authorized(update, context):
return
try:
msg = update.effective_message
query = update.callback_query
button = InlineKeyboardButton("🧿 Aggiornami", None, 'u_screenshot')
markup = InlineKeyboardMarkup([[button]])
if query is not None:
await query.answer()
ws = WebSocketClient(DefaultConfig().get('obs_url'),
DefaultConfig().get('obs_pwd'))
await ws.connect()
await ws.wait_until_identified()
r = await LiveEvent.obs_exec(ws, 'GetCurrentProgramScene')
p = {'imageFormat': 'png',
'sourceUuid': r['sceneUuid'],
'imageWidth': 800,
'imageCompressionQuality': 80}
r = await LiveEvent.obs_exec(ws, 'GetSourceScreenshot', p)
await ws.disconnect()
frame = base64.b64decode(r['imageData'].split(',')[-1])
if query is not None and query.data in 'u_screenshot':
await query.edit_message_media(InputMediaPhoto(frame),
reply_markup=markup)
else:
await msg.reply_photo(frame, None, True, markup)
except error.BadRequest as ex:
if 'message is not modified' not in str(ex):
logging.warning("TELEGRAM BOT (screenshot) ERROR", exc_info=ex)
msg = (f"☢ Mi spiace {update.effective_user.first_name}, non "
f"riesco a soddisfare la tua richiesta.")
await update.effective_message.reply_text(msg, None, True)
except BaseException as ex: # pylint: disable=broad-except
logging.warning("TELEGRAM BOT (screenshot) ERROR", exc_info=ex)
msg = (f"☢ Mi spiace {update.effective_user.first_name}, non "
f"riesco a soddisfare la tua richiesta.")
await update.effective_message.reply_text(msg, None, True)
async def __handle_start_live(self, update: Update,
context: Context) -> None:
"""
Handle the start-live command of the bot.
:param update: The update object.
:param context: The context object.
"""
if not await Telegram.__authorized(update, context):
return
query = update.callback_query
if query is not None:
await query.answer()
if StreamingStatus().is_live():
msg = "⚠ La trasmissione è già in corso!"
elif StreamingStatus().is_ready():
msg = "❗ Non c'è alcuna trasmissione in corso."
else:
task = asyncio.create_task(StreamingStatus().get_owner().go_live(),
name='TG_Anticipate_live')
self.__background_tasks.add(task)
msg = "✅ La trasmissione è iniziata!"
await update.effective_message.reply_text(msg)
@staticmethod
async def __handle_10m(update: Update, context: Context) -> None:
"""
Handle the add_10m command of the bot.
:param update: The update object.
:param context: The context object.
"""
if not await Telegram.__authorized(update, context):
return
query = update.callback_query
if query is not None:
await query.answer()
if StreamingStatus().is_live():
StreamingStatus().increase_bonus_time()
msg = "✅ Ho aggiunto 10 minuti alla trasmissione!"
else:
msg = "❗ Non c'è alcuna trasmissione in corso."
await update.effective_message.reply_text(msg)
@staticmethod
async def __handle_end_live(update: Update, context: Context) -> None:
"""
Handle the end-live command of the bot.
:param update: The update object.
:param context: The context object.
"""
if not await Telegram.__authorized(update, context):
return
try:
msg = update.effective_message
query = update.callback_query
if query is not None:
await query.answer()
if query is not None and 'u_end_live' in query.data:
answer = query.data.split('#', 1)[1].strip()
if answer == 'no':
msg = "🆗 Ok, come desideri!"
elif answer == 'yes' and StreamingStatus().is_live():
StreamingStatus().stop_now()
msg = (f"✅ Perfetto {update.effective_user.first_name}, "
"ho fatto come hai richiesto!")
else:
msg = "❗ Non c'è alcuna trasmissione in corso."
await query.edit_message_text(msg)
else:
markup = InlineKeyboardMarkup([[
InlineKeyboardButton("✔ Si", None, 'u_end_live#yes'),
InlineKeyboardButton("❌ No", None, 'u_end_live#no')]])
text = "⚠ Vuoi terminare la live?"
await msg.reply_text(text, None, True, markup)
except error.BadRequest as ex:
if 'message is not modified' not in str(ex):
logging.warning("TELEGRAM BOT (end-live) ERROR", exc_info=ex)
msg = (f"☢ Mi spiace {update.effective_user.first_name}, non "
f"riesco a soddisfare la tua richiesta.")
await update.effective_message.reply_text(msg, None, True)
except BaseException as ex: # pylint: disable=broad-except
logging.warning("TELEGRAM BOT (end-live) ERROR", exc_info=ex)
msg = (f"☢ Mi spiace {update.effective_user.first_name}, non "
f"riesco a soddisfare la tua richiesta.")
await update.effective_message.reply_text(msg, None, True)
@staticmethod
async def __handle_message(update: Update, context: Context) -> None:
"""
Handle the generic message sent to the bot. If the message contains
'jarvis' call the start command, otherwise if the message contains
'do your magic' write on the log the context object for inspection of
the context content (e.g. the id and username of the user).
:param update: The update object.
:param context: The context object.
"""
if 'jarvis' in update.effective_message.text.lower():
await Telegram.__handle_start(update, context)
elif 'do your magic' in update.effective_message.text.lower():
logging.info(update)
logging.info(context)
@staticmethod
async def __handle_errors(update: Optional[object], ctxt: Context) -> None:
"""
Handle the errors raised by the bot.
:param update: The update object.
:param ctxt: The context object.
"""
msg = f"TELEGRAM BOT H-ERROR\nUpdate: {update}\nERROR: {ctxt.error}"
logging.warning(msg, exc_info=ctxt.error)