class StreamingSantaCroce:
"""
Controller class for StreamingSantaCroce application.
"""
def __init__(self):
"""
Initialize the controller and start the application.
"""
self.log_obj = StreamingSantaCroce.__setup_log()
DefaultConfig()
self.__prefetch = asyncio.gather(
asyncio.to_thread(LiveEvent.get_youtube_playlists),
asyncio.to_thread(LiveEvent.get_youtube_categories))
self.telegram_bot = Telegram()
self.event_scheduler = EventScheduler(self.telegram_bot)
self.__close = False
self.window_event = asyncio.Event()
self.__child_windows = {}
self.__background_tasks = set()
self.main_window = MainWindow(DefaultConfig().get('presets'))
self.main_window.set_events(self.event_scheduler.get_future_events())
self.main_window.set_presets_callback(self.__presets_callback)
self.main_window.protocol("WM_DELETE_WINDOW", self.__set_close)
task = asyncio.create_task(self.__write_log_on_gui())
self.__background_tasks.add(task)
task.add_done_callback(self.__background_tasks.discard)
status: StreamingStatus = StreamingStatus()
task = asyncio.create_task(self.__fetch_preview())
self.__background_tasks.add(task)
task.add_done_callback(self.__background_tasks.discard)
self.main_window.set_time_button_callback(status.increase_bonus_time)
self.main_window.set_live_button_callback(self.__live_button_callback)
self.main_window.set_preview_button_callback(self.__preview_callback)
self.main_window.set_prog_view_callback(self.__prog_button_callback)
self.main_window.set_log_button_callback(self.__log_button_callback)
self.main_window.set_prog_button_callback(self.__prog_button_callback)
async def run_app(self) -> None:
"""
Start the application gui and run the main loop.
After exiting main loop, stops all active threads.
"""
status: StreamingStatus = StreamingStatus()
status.add_update_callback(self.window_event.set)
while not self.__close:
if status.dispatch_updates():
self.main_window.set_title_description(status.get_title(),
status.get_description())
enabled = not status.is_busy()
self.main_window.set_enabled_title_description(enabled)
self.main_window.remaining_time(status.get_time_left())
self.main_window.set_live_status(status.get_live_status())
if status.should_refresh_program():
events = self.event_scheduler.get_future_events()
self.main_window.set_events(events)
status.set_refresh_program(False)
self.main_window.update()
if self.__prefetch is not None and self.__prefetch.done():
await self.__prefetch
self.__prefetch = None
await self.window_event.wait()
self.window_event.clear()
while task := StreamingStatus().get_ended_task():
try:
await task
except BaseException as ex: # pylint: disable=broad-except
msg = 'Errore nel LiveEvent terminato'
logging.error(msg, exc_info=ex)
await self.shutdown()
def __set_close(self):
"""
Set the close flag to True.
"""
self.__close = True
self.__log_ok_button_callback()
self.__prog_exit_callback()
self.window_event.set()
async def shutdown(self) -> None:
"""
Shutdown the application and stop all active threads.
"""
logging.info("Chiusura del programma")
self.main_window.withdraw()
self.__set_close()
if self.event_scheduler:
self.event_scheduler.stop()
if StreamingStatus().is_live():
msg = "Trasmissione in corso durante la chiusura del programma" \
f"programma:\n{StreamingStatus().get_owner()}"
logging.error(msg)
await self.telegram_bot.stop()
for task in asyncio.all_tasks():
if task != asyncio.current_task():
try:
msg = (f"Chiusura del task: {task.get_name()}\n"
f"Coroutine: {task.get_coro().__qualname__}")
logging.debug(msg)
await asyncio.wait_for(task, timeout=1)
except BaseException as ex: # pylint: disable=broad-except
msg = f"Errore durante l'esecuzione di un task: {ex}"
logging.error(msg, exc_info=ex)
self.main_window.destroy()
@staticmethod
def __setup_log(path: str = r'./streaming.log') -> CustomLog:
"""
Setup the log file for the application.
:param path: The path of the log file.
:return: The custom log object for the application.
"""
handlers = [logging.StreamHandler(log_obj := CustomLog()),
RotatingFileHandler(abspath(path), maxBytes=1048576 * 5,
backupCount=7, encoding='utf-8')]
fmt = '%(levelname)s (%(asctime)s) [%(threadName)-10.15s]: %(message)s'
logging.basicConfig(format=fmt, datefmt='%d/%m/%Y %H:%M:%S', force=True,
level=logging.INFO, handlers=handlers)
return log_obj
async def __write_log_on_gui(self) -> None:
"""
Write the last log message on the GUI.
"""
while not self.__close:
for message in self.log_obj.get_messages_to_print():
level = message.split(' ', 1)[0]
msg = f' {message.partition(chr(10))[0]}'
self.main_window.set_status_message(msg, level)
self.window_event.set()
await asyncio.sleep(.1)
def __presets_callback(self, preset: str) -> None:
"""
Call the given preset on the camera.
:param preset: The preset name to call.
"""
task = asyncio.create_task(call_preset(preset), name=preset)
self.__background_tasks.add(task)
task.add_done_callback(self.__background_tasks.discard)
self.window_event.set()
def __live_button_callback(self) -> None:
"""
Manage the live button events.
"""
if StreamingStatus().is_live():
StreamingStatus().stop_now()
logging.info("Trasmissione interrotta dall'utente")
elif StreamingStatus().is_prepared():
logging.info("Trasmissione anticipata dall'utente")
task = asyncio.create_task(StreamingStatus().get_owner().go_live(),
name='Anticipate_live')
self.__background_tasks.add(task)
task.add_done_callback(self.__background_tasks.discard)
elif StreamingStatus().is_ready():
title, description = self.main_window.get_title_description()
title = title.strip()
description = description.strip()
if not title:
logging.error('Il titolo deve essere settato per avviare una '
'live immediata')
else:
logging.info("Trasmissione avviata dall'utente")
try:
now = datetime.now()
config = ChainMap({
'title': title,
'description': description,
'day': f'{now:%d/%m/%Y}',
'time': f'{now + timedelta(seconds=2):%H:%M:%S}',
'type': 'onetime'
}, DefaultConfig().get('youtube_default'))
LiveEvent(self.telegram_bot, config)
except Exception as ex: # pylint: disable=broad-except
logging.error('Impossibile avviare una live immediata',
exc_info=ex)
self.window_event.set()
def __preview_callback(self) -> None:
"""
Manage the preview button events.
"""
StreamingStatus().switch_obs_live_view()
self.window_event.set()
async def __fetch_preview(self, frame_per_sec: int = 30) -> None:
"""
Fetch the preview from the camera and show it on the GUI.
:param frame_per_sec: The number of frame per second to fetch.
"""
config = DefaultConfig()
ws = None
size = self.main_window.get_preview_size()
while not self.__close:
if StreamingStatus().get_obs_live_view():
try:
if ws is None:
ws = WebSocketClient(config.get('obs_url'),
config.get('obs_pwd'))
await ws.connect()
await ws.wait_until_identified()
r = await LiveEvent.obs_exec(ws, 'GetCurrentProgramScene')
p = {'imageFormat': 'png',
'sourceUuid': r['sceneUuid'],
'imageWidth': size[0],
'imageHeight': size[1],
'imageCompressionQuality': 90}
r = await LiveEvent.obs_exec(ws, 'GetSourceScreenshot', p)
image = base64.b64decode(r['imageData'].split(',')[-1])
image = PIL.Image.open(io.BytesIO(image))
self.main_window.set_preview_image(image)
self.window_event.set()
except BaseException as ex: # pylint: disable=broad-except
msg = 'Errore durante il fetch della preview'
logging.error(msg, exc_info=ex)
if ws:
try:
await ws.disconnect()
# pylint: disable=broad-except
except BaseException as exc:
msg = 'Errore durante la disconnessione da OBS'
logging.error(msg, exc_info=exc)
ws = None
StreamingStatus().switch_obs_live_view(False)
else:
self.main_window.set_preview_image()
self.window_event.set()
await asyncio.sleep(1. / frame_per_sec)
if ws:
try:
await ws.disconnect()
except BaseException as ex: # pylint: disable=broad-except
msg = 'Errore durante la disconnessione da OBS'
logging.error(msg, exc_info=ex)
def __log_button_callback(self) -> None:
"""
Manage the log button events.
"""
old_log = self.__child_windows.get('log')
if old_log and not old_log['close']:
old_log['window'].focus_force()
return
log_window = None
try:
messages = self.log_obj.get_all_messages()
log_window = LogViewer(self.main_window, messages)
log_window.set_ok_button_callback(self.__log_ok_button_callback)
log_window.protocol("WM_DELETE_WINDOW",
self.__log_ok_button_callback)
log_window.set_reload_button_callback(
self.__log_reload_button_callback)
new_event = asyncio.Event()
self.__child_windows['log'] = {'window': log_window,
'event': new_event,
'close': False}
task = asyncio.create_task(self.__log_event_loop())
self.__background_tasks.add(task)
task.add_done_callback(self.__background_tasks.discard)
task = asyncio.create_task(self.__log_update_loop())
self.__background_tasks.add(task)
task.add_done_callback(self.__background_tasks.discard)
except BaseException as ex: # pylint: disable=broad-except
if log_window is not None:
log_window.destroy()
new_log = self.__child_windows.get('log')
if new_log and not new_log['close']:
new_log['close'] = True
new_log['event'].set()
msg = 'Errore durante l\'apertura della finestra di log'
logging.error(msg, exc_info=ex)
def __log_ok_button_callback(self) -> None:
"""
Manage the ok button events in the log window.
"""
if self.__child_windows.get('log'):
self.__child_windows['log']['close'] = True
self.__child_windows['log']['event'].set()
def __log_reload_button_callback(self) -> None:
"""
Manage the reload button events in the log window.
"""
messages = self.log_obj.get_all_messages()
self.__child_windows['log']['window'].update_logs(messages)
self.__child_windows['log']['window'].set_logs_updated(False)
self.__child_windows['log']['event'].set()
async def __log_event_loop(self) -> None:
"""
Update the log window with new messages.
"""
while not self.__child_windows['log']['close']:
self.__child_windows['log']['window'].update()
await self.__child_windows['log']['event'].wait()
self.__child_windows['log']['event'].clear()
self.__child_windows['log']['window'].withdraw()
self.__child_windows['log']['window'].destroy()
self.main_window.focus_force()
async def __log_update_loop(self) -> None:
"""
Update the log window indicator when new messages are available.
"""
n_messages = self.log_obj.total_messages()
while not self.__child_windows['log']['close']:
if self.log_obj.total_messages() > n_messages:
self.__child_windows['log']['window'].set_logs_updated(True)
n_messages = self.log_obj.total_messages()
await asyncio.sleep(.1)
def __prog_button_callback(self, event_id = None) -> None:
"""
Manage the program button events.
"""
old_prog = self.__child_windows.get('prog')
if old_prog and not old_prog['close']:
old_prog['window'].focus_force()
return
prog_window = None
try:
events = self.event_scheduler.events
categories = LiveEvent.get_youtube_categories()
playlists = LiveEvent.get_youtube_playlists()
prog_window = ProgramWindow(self.main_window, categories, playlists)
prog_window.protocol("WM_DELETE_WINDOW", self.__prog_exit_callback)
prog_window.set_first_button_callback(self.__prog_first_callback)
prog_window.set_prev_button_callback(self.__prog_prev_callback)
prog_window.set_next_button_callback(self.__prog_next_callback)
prog_window.set_last_button_callback(self.__prog_last_callback)
prog_window.set_save_button_callback(self.__prog_save_callback)
prog_window.set_new_button_callback(self.__prog_new_callback)
prog_window.set_copy_button_callback(self.__prog_copy_callback)
prog_window.set_delete_button_callback(self.__prog_delete_callback)
prog_window.set_reload_button_callback(self.__prog_reload_callback)
new_event = asyncio.Event()
self.__child_windows['prog'] = {'window': prog_window,
'event': new_event,
'close': False,
'shown': -1}
if len(events) > 0:
if event_id is not None:
event_id = self.__identify_event(event_id)
self.__child_windows['prog']['shown'] = event_id
prog_window.set(events[event_id].get_config(),
event_id + 1, len(events))
else:
self.__child_windows['prog']['shown'] = 0
prog_window.set(events[0].get_config(), 1, len(events))
else:
self.__prog_new_callback()
task = asyncio.create_task(self.__prog_event_loop())
self.__background_tasks.add(task)
task.add_done_callback(self.__background_tasks.discard)
except BaseException as ex: # pylint: disable=broad-except
if prog_window is not None:
prog_window.destroy()
new_prog = self.__child_windows.get('prog')
if new_prog and not new_prog['close']:
new_prog['close'] = True
new_prog['event'].set()
msg = 'Errore durante l\'apertura della finestra di programmazione'
logging.error(msg, exc_info=ex)
def __identify_event(self, event_id: int) -> int:
"""
Identify the event to show in the program window.
:param event_id: The event id to show.
:return: The event id to show.
"""
event = self.event_scheduler.get_future_events()[event_id]
for i, e in enumerate(self.event_scheduler.events):
tipo = e.type.upper()[0] == event[0]
giorno = e.day == event[1] or e.day.lower() == event[1].lower()[:2]
ora = e.time == event[2]
if tipo and giorno and ora:
return i
return -1
def __prog_exit_callback(self) -> None:
"""
Manage the close window event.
"""
if self.__child_windows.get('prog'):
self.__child_windows['prog']['close'] = True
self.__child_windows['prog']['event'].set()
def __prog_first_callback(self) -> None:
"""
Manage the first button event.
"""
events = self.event_scheduler.events
shown = self.__child_windows['prog']['shown']
prog_window = self.__child_windows['prog']['window']
if shown == 0:
return
shown = 0
self.__child_windows['prog']['shown'] = shown
prog_window.set(events[shown].get_config(), shown + 1, len(events))
def __prog_prev_callback(self) -> None:
"""
Manage the previous button event.
"""
events = self.event_scheduler.events
shown = self.__child_windows['prog']['shown']
prog_window = self.__child_windows['prog']['window']
if shown == 0:
return
shown -= 1
self.__child_windows['prog']['shown'] = shown
prog_window.set(events[shown].get_config(), shown + 1, len(events))
def __prog_next_callback(self) -> None:
"""
Manage the next button event.
"""
events = self.event_scheduler.events
shown = self.__child_windows['prog']['shown']
prog_window = self.__child_windows['prog']['window']
if shown == len(events) - 1:
return
shown += 1
self.__child_windows['prog']['shown'] = shown
prog_window.set(events[shown].get_config(), shown + 1, len(events))
def __prog_last_callback(self) -> None:
"""
Manage the last button event.
"""
events = self.event_scheduler.events
shown = self.__child_windows['prog']['shown']
prog_window = self.__child_windows['prog']['window']
if shown == len(events) - 1:
return
shown = len(events) - 1
self.__child_windows['prog']['shown'] = shown
prog_window.set(events[shown].get_config(), shown + 1, len(events))
def __prog_save_callback(self) -> None:
"""
Manage the save button event.
"""
events = self.event_scheduler.events
shown = self.__child_windows['prog']['shown']
prog_window = self.__child_windows['prog']['window']
if shown == -1:
self.event_scheduler.add_event(prog_window.get())
shown = len(events) - 1
prog_window.set(events[shown].get_config(), shown + 1, len(events))
else:
events[shown].reschedule(prog_window.get())
StreamingStatus().set_refresh_program(True)
if askquestion('Salvataggio', 'Vuoi salvare la programmazione su file?',
parent=prog_window) == 'yes':
if self.event_scheduler.save_to_file():
showinfo('Salvataggio', 'Evento salvato correttamente',
parent=prog_window)
def __prog_new_callback(self) -> None:
"""
Manage the new button event.
"""
self.__child_windows['prog']['shown'] = -1
default = DefaultConfig().get('youtube_default')
self.__child_windows['prog']['window'].set(default)
def __prog_copy_callback(self) -> None:
"""
Manage the copy button event.
"""
events = self.event_scheduler.events
shown = self.__child_windows['prog']['shown']
prog_window = self.__child_windows['prog']['window']
if shown == -1:
return
self.__child_windows['prog']['shown'] = -1
prog_window.set(events[shown].get_config())
def __prog_delete_callback(self) -> None:
"""
Manage the delete button event.
"""
events = self.event_scheduler.events
shown = self.__child_windows['prog']['shown']
prog_window = self.__child_windows['prog']['window']
if shown >= 0 and events[shown].remove_schedule():
events.pop(shown)
StreamingStatus().set_refresh_program(True)
shown = min(shown, len(events) - 1)
if shown < 0:
self.__prog_new_callback()
else:
self.__child_windows['prog']['shown'] = shown
prog_window.set(events[shown].get_config(), shown + 1, len(events))
def __prog_reload_callback(self) -> None:
"""
Manage the reload button event.
"""
self.event_scheduler.reload_from_file()
StreamingStatus().set_refresh_program(True)
showinfo('Caricamento', 'Eventi caricati correttamente',
parent=self.__child_windows['prog']['window'])
events = self.event_scheduler.events
prog_window = self.__child_windows['prog']['window']
if len(events) > 0:
self.__child_windows['prog']['shown'] = 0
prog_window.set(events[0].get_config(), 1, len(events))
else:
self.__prog_new_callback()
async def __prog_event_loop(self) -> None:
"""
Update the program window.
"""
while not self.__child_windows['prog']['close']:
self.__child_windows['prog']['window'].update()
await self.__child_windows['prog']['event'].wait()
self.__child_windows['prog']['event'].clear()
self.__child_windows['prog']['window'].withdraw()
self.__child_windows['prog']['window'].destroy()
self.main_window.focus_force()