Vai al contenuto

controller

controller

Controller for StreamingSantaCroce of the MVC pattern.

StreamingSantaCroce

Controller class for StreamingSantaCroce application.

Source code in controller.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
class StreamingSantaCroce:
    """
    Controller class for StreamingSantaCroce application.
    """

    def __init__(self):
        """
        Initialize the controller and start the application.
        """
        self.log_obj = StreamingSantaCroce.__setup_log()
        DefaultConfig()
        self.__prefetch = asyncio.gather(
            asyncio.to_thread(LiveEvent.get_youtube_playlists),
            asyncio.to_thread(LiveEvent.get_youtube_categories))
        self.telegram_bot = Telegram()
        self.event_scheduler = EventScheduler(self.telegram_bot)

        self.__close = False
        self.window_event = asyncio.Event()
        self.__child_windows = {}
        self.__background_tasks = set()
        self.main_window = MainWindow(DefaultConfig().get('presets'))
        self.main_window.set_events(self.event_scheduler.get_future_events())
        self.main_window.set_presets_callback(self.__presets_callback)
        self.main_window.protocol("WM_DELETE_WINDOW", self.__set_close)
        task = asyncio.create_task(self.__write_log_on_gui())
        self.__background_tasks.add(task)
        task.add_done_callback(self.__background_tasks.discard)
        status: StreamingStatus = StreamingStatus()
        task = asyncio.create_task(self.__fetch_preview())
        self.__background_tasks.add(task)
        task.add_done_callback(self.__background_tasks.discard)
        self.main_window.set_time_button_callback(status.increase_bonus_time)
        self.main_window.set_live_button_callback(self.__live_button_callback)
        self.main_window.set_preview_button_callback(self.__preview_callback)
        self.main_window.set_prog_view_callback(self.__prog_button_callback)
        self.main_window.set_log_button_callback(self.__log_button_callback)
        self.main_window.set_prog_button_callback(self.__prog_button_callback)

    async def run_app(self) -> None:
        """
        Start the application gui and run the main loop.
        After exiting main loop, stops all active threads.
        """
        status: StreamingStatus = StreamingStatus()
        status.add_update_callback(self.window_event.set)
        while not self.__close:
            if status.dispatch_updates():
                self.main_window.set_title_description(status.get_title(),
                                                       status.get_description())
                enabled = not status.is_busy()
                self.main_window.set_enabled_title_description(enabled)
                self.main_window.remaining_time(status.get_time_left())
                self.main_window.set_live_status(status.get_live_status())
                if status.should_refresh_program():
                    events = self.event_scheduler.get_future_events()
                    self.main_window.set_events(events)
                    status.set_refresh_program(False)
            self.main_window.update()
            if self.__prefetch is not None and self.__prefetch.done():
                await self.__prefetch
                self.__prefetch = None
            await self.window_event.wait()
            self.window_event.clear()
            while task := StreamingStatus().get_ended_task():
                try:
                    await task
                except BaseException as ex:  # pylint: disable=broad-except
                    msg = 'Errore nel LiveEvent terminato'
                    logging.error(msg, exc_info=ex)
        await self.shutdown()

    def __set_close(self):
        """
        Set the close flag to True.
        """
        self.__close = True
        self.__log_ok_button_callback()
        self.__prog_exit_callback()
        self.window_event.set()

    async def shutdown(self) -> None:
        """
        Shutdown the application and stop all active threads.
        """
        logging.info("Chiusura del programma")
        self.main_window.withdraw()
        self.__set_close()
        if self.event_scheduler:
            self.event_scheduler.stop()
        if StreamingStatus().is_live():
            msg = "Trasmissione in corso durante la chiusura del programma" \
                  f"programma:\n{StreamingStatus().get_owner()}"
            logging.error(msg)
        await self.telegram_bot.stop()
        for task in asyncio.all_tasks():
            if task != asyncio.current_task():
                try:
                    msg = (f"Chiusura del task: {task.get_name()}\n"
                           f"Coroutine: {task.get_coro().__qualname__}")
                    logging.debug(msg)
                    await asyncio.wait_for(task, timeout=1)
                except BaseException as ex:  # pylint: disable=broad-except
                    msg = f"Errore durante l'esecuzione di un task: {ex}"
                    logging.error(msg, exc_info=ex)
        self.main_window.destroy()

    @staticmethod
    def __setup_log(path: str = r'./streaming.log') -> CustomLog:
        """
        Setup the log file for the application.
        :param path: The path of the log file.

        :return: The custom log object for the application.
        """
        handlers = [logging.StreamHandler(log_obj := CustomLog()),
                    RotatingFileHandler(abspath(path), maxBytes=1048576 * 5,
                                        backupCount=7, encoding='utf-8')]
        fmt = '%(levelname)s (%(asctime)s) [%(threadName)-10.15s]: %(message)s'
        logging.basicConfig(format=fmt, datefmt='%d/%m/%Y %H:%M:%S', force=True,
                            level=logging.INFO, handlers=handlers)
        return log_obj

    async def __write_log_on_gui(self) -> None:
        """
        Write the last log message on the GUI.
        """
        while not self.__close:
            for message in self.log_obj.get_messages_to_print():
                level = message.split(' ', 1)[0]
                msg = f' {message.partition(chr(10))[0]}'
                self.main_window.set_status_message(msg, level)
            self.window_event.set()
            await asyncio.sleep(.1)

    def __presets_callback(self, preset: str) -> None:
        """
        Call the given preset on the camera.
        :param preset: The preset name to call.
        """
        task = asyncio.create_task(call_preset(preset), name=preset)
        self.__background_tasks.add(task)
        task.add_done_callback(self.__background_tasks.discard)
        self.window_event.set()

    def __live_button_callback(self) -> None:
        """
        Manage the live button events.
        """
        if StreamingStatus().is_live():
            StreamingStatus().stop_now()
            logging.info("Trasmissione interrotta dall'utente")
        elif StreamingStatus().is_prepared():
            logging.info("Trasmissione anticipata dall'utente")
            task = asyncio.create_task(StreamingStatus().get_owner().go_live(),
                                       name='Anticipate_live')
            self.__background_tasks.add(task)
            task.add_done_callback(self.__background_tasks.discard)
        elif StreamingStatus().is_ready():
            title, description = self.main_window.get_title_description()
            title = title.strip()
            description = description.strip()
            if not title:
                logging.error('Il titolo deve essere settato per avviare una '
                              'live immediata')
            else:
                logging.info("Trasmissione avviata dall'utente")
                try:
                    now = datetime.now()
                    config = ChainMap({
                        'title': title,
                        'description': description,
                        'day': f'{now:%d/%m/%Y}',
                        'time': f'{now + timedelta(seconds=2):%H:%M:%S}',
                        'type': 'onetime'
                    }, DefaultConfig().get('youtube_default'))
                    LiveEvent(self.telegram_bot, config)
                except Exception as ex:  # pylint: disable=broad-except
                    logging.error('Impossibile avviare una live immediata',
                                  exc_info=ex)
        self.window_event.set()

    def __preview_callback(self) -> None:
        """
        Manage the preview button events.
        """
        StreamingStatus().switch_obs_live_view()
        self.window_event.set()

    async def __fetch_preview(self, frame_per_sec: int = 30) -> None:
        """
        Fetch the preview from the camera and show it on the GUI.
        :param frame_per_sec: The number of frame per second to fetch.
        """
        config = DefaultConfig()
        ws = None
        size = self.main_window.get_preview_size()
        while not self.__close:
            if StreamingStatus().get_obs_live_view():
                try:
                    if ws is None:
                        ws = WebSocketClient(config.get('obs_url'),
                                             config.get('obs_pwd'))
                        await ws.connect()
                        await ws.wait_until_identified()
                    r = await LiveEvent.obs_exec(ws, 'GetCurrentProgramScene')
                    p = {'imageFormat': 'png',
                         'sourceUuid': r['sceneUuid'],
                         'imageWidth': size[0],
                         'imageHeight': size[1],
                         'imageCompressionQuality': 90}
                    r = await LiveEvent.obs_exec(ws, 'GetSourceScreenshot', p)
                    image = base64.b64decode(r['imageData'].split(',')[-1])
                    image = PIL.Image.open(io.BytesIO(image))
                    self.main_window.set_preview_image(image)
                    self.window_event.set()
                except BaseException as ex:  # pylint: disable=broad-except
                    msg = 'Errore durante il fetch della preview'
                    logging.error(msg, exc_info=ex)
                    if ws:
                        try:
                            await ws.disconnect()
                        # pylint: disable=broad-except
                        except BaseException as exc:
                            msg = 'Errore durante la disconnessione da OBS'
                            logging.error(msg, exc_info=exc)
                        ws = None
                    StreamingStatus().switch_obs_live_view(False)
            else:
                self.main_window.set_preview_image()
                self.window_event.set()
            await asyncio.sleep(1. / frame_per_sec)
        if ws:
            try:
                await ws.disconnect()
            except BaseException as ex:  # pylint: disable=broad-except
                msg = 'Errore durante la disconnessione da OBS'
                logging.error(msg, exc_info=ex)

    def __log_button_callback(self) -> None:
        """
        Manage the log button events.
        """
        old_log = self.__child_windows.get('log')
        if old_log and not old_log['close']:
            old_log['window'].focus_force()
            return
        log_window = None
        try:
            messages = self.log_obj.get_all_messages()
            log_window = LogViewer(self.main_window, messages)
            log_window.set_ok_button_callback(self.__log_ok_button_callback)
            log_window.protocol("WM_DELETE_WINDOW",
                                self.__log_ok_button_callback)
            log_window.set_reload_button_callback(
                self.__log_reload_button_callback)
            new_event = asyncio.Event()
            self.__child_windows['log'] = {'window': log_window,
                                           'event': new_event,
                                           'close': False}
            task = asyncio.create_task(self.__log_event_loop())
            self.__background_tasks.add(task)
            task.add_done_callback(self.__background_tasks.discard)
            task = asyncio.create_task(self.__log_update_loop())
            self.__background_tasks.add(task)
            task.add_done_callback(self.__background_tasks.discard)
        except BaseException as ex:  # pylint: disable=broad-except
            if log_window is not None:
                log_window.destroy()
            new_log = self.__child_windows.get('log')
            if new_log and not new_log['close']:
                new_log['close'] = True
                new_log['event'].set()
            msg = 'Errore durante l\'apertura della finestra di log'
            logging.error(msg, exc_info=ex)

    def __log_ok_button_callback(self) -> None:
        """
        Manage the ok button events in the log window.
        """
        if self.__child_windows.get('log'):
            self.__child_windows['log']['close'] = True
            self.__child_windows['log']['event'].set()

    def __log_reload_button_callback(self) -> None:
        """
        Manage the reload button events in the log window.
        """
        messages = self.log_obj.get_all_messages()
        self.__child_windows['log']['window'].update_logs(messages)
        self.__child_windows['log']['window'].set_logs_updated(False)
        self.__child_windows['log']['event'].set()

    async def __log_event_loop(self) -> None:
        """
        Update the log window with new messages.
        """
        while not self.__child_windows['log']['close']:
            self.__child_windows['log']['window'].update()
            await self.__child_windows['log']['event'].wait()
            self.__child_windows['log']['event'].clear()
        self.__child_windows['log']['window'].withdraw()
        self.__child_windows['log']['window'].destroy()
        self.main_window.focus_force()

    async def __log_update_loop(self) -> None:
        """
        Update the log window indicator when new messages are available.
        """
        n_messages = self.log_obj.total_messages()
        while not self.__child_windows['log']['close']:
            if self.log_obj.total_messages() > n_messages:
                self.__child_windows['log']['window'].set_logs_updated(True)
                n_messages = self.log_obj.total_messages()
            await asyncio.sleep(.1)

    def __prog_button_callback(self, event_id = None) -> None:
        """
        Manage the program button events.
        """
        old_prog = self.__child_windows.get('prog')
        if old_prog and not old_prog['close']:
            old_prog['window'].focus_force()
            return
        prog_window = None
        try:
            events = self.event_scheduler.events
            categories = LiveEvent.get_youtube_categories()
            playlists = LiveEvent.get_youtube_playlists()

            prog_window = ProgramWindow(self.main_window, categories, playlists)
            prog_window.protocol("WM_DELETE_WINDOW", self.__prog_exit_callback)
            prog_window.set_first_button_callback(self.__prog_first_callback)
            prog_window.set_prev_button_callback(self.__prog_prev_callback)
            prog_window.set_next_button_callback(self.__prog_next_callback)
            prog_window.set_last_button_callback(self.__prog_last_callback)
            prog_window.set_save_button_callback(self.__prog_save_callback)
            prog_window.set_new_button_callback(self.__prog_new_callback)
            prog_window.set_copy_button_callback(self.__prog_copy_callback)
            prog_window.set_delete_button_callback(self.__prog_delete_callback)
            prog_window.set_reload_button_callback(self.__prog_reload_callback)
            new_event = asyncio.Event()
            self.__child_windows['prog'] = {'window': prog_window,
                                            'event': new_event,
                                            'close': False,
                                            'shown': -1}
            if len(events) > 0:
                if event_id is not None:
                    event_id = self.__identify_event(event_id)
                    self.__child_windows['prog']['shown'] = event_id
                    prog_window.set(events[event_id].get_config(),
                                    event_id + 1, len(events))
                else:
                    self.__child_windows['prog']['shown'] = 0
                    prog_window.set(events[0].get_config(), 1, len(events))
            else:
                self.__prog_new_callback()
            task = asyncio.create_task(self.__prog_event_loop())
            self.__background_tasks.add(task)
            task.add_done_callback(self.__background_tasks.discard)
        except BaseException as ex:  # pylint: disable=broad-except
            if prog_window is not None:
                prog_window.destroy()
            new_prog = self.__child_windows.get('prog')
            if new_prog and not new_prog['close']:
                new_prog['close'] = True
                new_prog['event'].set()
            msg = 'Errore durante l\'apertura della finestra di programmazione'
            logging.error(msg, exc_info=ex)

    def __identify_event(self, event_id: int) -> int:
        """
        Identify the event to show in the program window.
        :param event_id: The event id to show.

        :return: The event id to show.
        """
        event = self.event_scheduler.get_future_events()[event_id]
        for i, e in enumerate(self.event_scheduler.events):
            tipo = e.type.upper()[0] == event[0]
            giorno = e.day == event[1] or e.day.lower() == event[1].lower()[:2]
            ora = e.time == event[2]
            if tipo and giorno and ora:
                return i
        return -1

    def __prog_exit_callback(self) -> None:
        """
        Manage the close window event.
        """
        if self.__child_windows.get('prog'):
            self.__child_windows['prog']['close'] = True
            self.__child_windows['prog']['event'].set()

    def __prog_first_callback(self) -> None:
        """
        Manage the first button event.
        """
        events = self.event_scheduler.events
        shown = self.__child_windows['prog']['shown']
        prog_window = self.__child_windows['prog']['window']
        if shown == 0:
            return
        shown = 0
        self.__child_windows['prog']['shown'] = shown
        prog_window.set(events[shown].get_config(), shown + 1, len(events))

    def __prog_prev_callback(self) -> None:
        """
        Manage the previous button event.
        """
        events = self.event_scheduler.events
        shown = self.__child_windows['prog']['shown']
        prog_window = self.__child_windows['prog']['window']
        if shown == 0:
            return
        shown -= 1
        self.__child_windows['prog']['shown'] = shown
        prog_window.set(events[shown].get_config(), shown + 1, len(events))

    def __prog_next_callback(self) -> None:
        """
        Manage the next button event.
        """
        events = self.event_scheduler.events
        shown = self.__child_windows['prog']['shown']
        prog_window = self.__child_windows['prog']['window']
        if shown == len(events) - 1:
            return
        shown += 1
        self.__child_windows['prog']['shown'] = shown
        prog_window.set(events[shown].get_config(), shown + 1, len(events))

    def __prog_last_callback(self) -> None:
        """
        Manage the last button event.
        """
        events = self.event_scheduler.events
        shown = self.__child_windows['prog']['shown']
        prog_window = self.__child_windows['prog']['window']
        if shown == len(events) - 1:
            return
        shown = len(events) - 1
        self.__child_windows['prog']['shown'] = shown
        prog_window.set(events[shown].get_config(), shown + 1, len(events))

    def __prog_save_callback(self) -> None:
        """
        Manage the save button event.
        """
        events = self.event_scheduler.events
        shown = self.__child_windows['prog']['shown']
        prog_window = self.__child_windows['prog']['window']
        if shown == -1:
            self.event_scheduler.add_event(prog_window.get())
            shown = len(events) - 1
            prog_window.set(events[shown].get_config(), shown + 1, len(events))
        else:
            events[shown].reschedule(prog_window.get())
        StreamingStatus().set_refresh_program(True)
        if askquestion('Salvataggio', 'Vuoi salvare la programmazione su file?',
                       parent=prog_window) == 'yes':
            if self.event_scheduler.save_to_file():
                showinfo('Salvataggio', 'Evento salvato correttamente',
                         parent=prog_window)

    def __prog_new_callback(self) -> None:
        """
        Manage the new button event.
        """
        self.__child_windows['prog']['shown'] = -1
        default = DefaultConfig().get('youtube_default')
        self.__child_windows['prog']['window'].set(default)

    def __prog_copy_callback(self) -> None:
        """
        Manage the copy button event.
        """
        events = self.event_scheduler.events
        shown = self.__child_windows['prog']['shown']
        prog_window = self.__child_windows['prog']['window']
        if shown == -1:
            return
        self.__child_windows['prog']['shown'] = -1
        prog_window.set(events[shown].get_config())

    def __prog_delete_callback(self) -> None:
        """
        Manage the delete button event.
        """
        events = self.event_scheduler.events
        shown = self.__child_windows['prog']['shown']
        prog_window = self.__child_windows['prog']['window']
        if shown >= 0 and events[shown].remove_schedule():
            events.pop(shown)
            StreamingStatus().set_refresh_program(True)
        shown = min(shown, len(events) - 1)
        if shown < 0:
            self.__prog_new_callback()
        else:
            self.__child_windows['prog']['shown'] = shown
            prog_window.set(events[shown].get_config(), shown + 1, len(events))

    def __prog_reload_callback(self) -> None:
        """
        Manage the reload button event.
        """
        self.event_scheduler.reload_from_file()
        StreamingStatus().set_refresh_program(True)
        showinfo('Caricamento', 'Eventi caricati correttamente',
                 parent=self.__child_windows['prog']['window'])
        events = self.event_scheduler.events
        prog_window = self.__child_windows['prog']['window']
        if len(events) > 0:
            self.__child_windows['prog']['shown'] = 0
            prog_window.set(events[0].get_config(), 1, len(events))
        else:
            self.__prog_new_callback()

    async def __prog_event_loop(self) -> None:
        """
        Update the program window.
        """
        while not self.__child_windows['prog']['close']:
            self.__child_windows['prog']['window'].update()
            await self.__child_windows['prog']['event'].wait()
            self.__child_windows['prog']['event'].clear()
        self.__child_windows['prog']['window'].withdraw()
        self.__child_windows['prog']['window'].destroy()
        self.main_window.focus_force()

__fetch_preview(frame_per_sec=30) async

Fetch the preview from the camera and show it on the GUI. :param frame_per_sec: The number of frame per second to fetch.

Source code in controller.py
async def __fetch_preview(self, frame_per_sec: int = 30) -> None:
    """
    Fetch the preview from the camera and show it on the GUI.
    :param frame_per_sec: The number of frame per second to fetch.
    """
    config = DefaultConfig()
    ws = None
    size = self.main_window.get_preview_size()
    while not self.__close:
        if StreamingStatus().get_obs_live_view():
            try:
                if ws is None:
                    ws = WebSocketClient(config.get('obs_url'),
                                         config.get('obs_pwd'))
                    await ws.connect()
                    await ws.wait_until_identified()
                r = await LiveEvent.obs_exec(ws, 'GetCurrentProgramScene')
                p = {'imageFormat': 'png',
                     'sourceUuid': r['sceneUuid'],
                     'imageWidth': size[0],
                     'imageHeight': size[1],
                     'imageCompressionQuality': 90}
                r = await LiveEvent.obs_exec(ws, 'GetSourceScreenshot', p)
                image = base64.b64decode(r['imageData'].split(',')[-1])
                image = PIL.Image.open(io.BytesIO(image))
                self.main_window.set_preview_image(image)
                self.window_event.set()
            except BaseException as ex:  # pylint: disable=broad-except
                msg = 'Errore durante il fetch della preview'
                logging.error(msg, exc_info=ex)
                if ws:
                    try:
                        await ws.disconnect()
                    # pylint: disable=broad-except
                    except BaseException as exc:
                        msg = 'Errore durante la disconnessione da OBS'
                        logging.error(msg, exc_info=exc)
                    ws = None
                StreamingStatus().switch_obs_live_view(False)
        else:
            self.main_window.set_preview_image()
            self.window_event.set()
        await asyncio.sleep(1. / frame_per_sec)
    if ws:
        try:
            await ws.disconnect()
        except BaseException as ex:  # pylint: disable=broad-except
            msg = 'Errore durante la disconnessione da OBS'
            logging.error(msg, exc_info=ex)

__identify_event(event_id)

Identify the event to show in the program window. :param event_id: The event id to show.

:return: The event id to show.

Source code in controller.py
def __identify_event(self, event_id: int) -> int:
    """
    Identify the event to show in the program window.
    :param event_id: The event id to show.

    :return: The event id to show.
    """
    event = self.event_scheduler.get_future_events()[event_id]
    for i, e in enumerate(self.event_scheduler.events):
        tipo = e.type.upper()[0] == event[0]
        giorno = e.day == event[1] or e.day.lower() == event[1].lower()[:2]
        ora = e.time == event[2]
        if tipo and giorno and ora:
            return i
    return -1

__init__()

Initialize the controller and start the application.

Source code in controller.py
def __init__(self):
    """
    Initialize the controller and start the application.
    """
    self.log_obj = StreamingSantaCroce.__setup_log()
    DefaultConfig()
    self.__prefetch = asyncio.gather(
        asyncio.to_thread(LiveEvent.get_youtube_playlists),
        asyncio.to_thread(LiveEvent.get_youtube_categories))
    self.telegram_bot = Telegram()
    self.event_scheduler = EventScheduler(self.telegram_bot)

    self.__close = False
    self.window_event = asyncio.Event()
    self.__child_windows = {}
    self.__background_tasks = set()
    self.main_window = MainWindow(DefaultConfig().get('presets'))
    self.main_window.set_events(self.event_scheduler.get_future_events())
    self.main_window.set_presets_callback(self.__presets_callback)
    self.main_window.protocol("WM_DELETE_WINDOW", self.__set_close)
    task = asyncio.create_task(self.__write_log_on_gui())
    self.__background_tasks.add(task)
    task.add_done_callback(self.__background_tasks.discard)
    status: StreamingStatus = StreamingStatus()
    task = asyncio.create_task(self.__fetch_preview())
    self.__background_tasks.add(task)
    task.add_done_callback(self.__background_tasks.discard)
    self.main_window.set_time_button_callback(status.increase_bonus_time)
    self.main_window.set_live_button_callback(self.__live_button_callback)
    self.main_window.set_preview_button_callback(self.__preview_callback)
    self.main_window.set_prog_view_callback(self.__prog_button_callback)
    self.main_window.set_log_button_callback(self.__log_button_callback)
    self.main_window.set_prog_button_callback(self.__prog_button_callback)

__live_button_callback()

Manage the live button events.

Source code in controller.py
def __live_button_callback(self) -> None:
    """
    Manage the live button events.
    """
    if StreamingStatus().is_live():
        StreamingStatus().stop_now()
        logging.info("Trasmissione interrotta dall'utente")
    elif StreamingStatus().is_prepared():
        logging.info("Trasmissione anticipata dall'utente")
        task = asyncio.create_task(StreamingStatus().get_owner().go_live(),
                                   name='Anticipate_live')
        self.__background_tasks.add(task)
        task.add_done_callback(self.__background_tasks.discard)
    elif StreamingStatus().is_ready():
        title, description = self.main_window.get_title_description()
        title = title.strip()
        description = description.strip()
        if not title:
            logging.error('Il titolo deve essere settato per avviare una '
                          'live immediata')
        else:
            logging.info("Trasmissione avviata dall'utente")
            try:
                now = datetime.now()
                config = ChainMap({
                    'title': title,
                    'description': description,
                    'day': f'{now:%d/%m/%Y}',
                    'time': f'{now + timedelta(seconds=2):%H:%M:%S}',
                    'type': 'onetime'
                }, DefaultConfig().get('youtube_default'))
                LiveEvent(self.telegram_bot, config)
            except Exception as ex:  # pylint: disable=broad-except
                logging.error('Impossibile avviare una live immediata',
                              exc_info=ex)
    self.window_event.set()

__log_button_callback()

Manage the log button events.

Source code in controller.py
def __log_button_callback(self) -> None:
    """
    Manage the log button events.
    """
    old_log = self.__child_windows.get('log')
    if old_log and not old_log['close']:
        old_log['window'].focus_force()
        return
    log_window = None
    try:
        messages = self.log_obj.get_all_messages()
        log_window = LogViewer(self.main_window, messages)
        log_window.set_ok_button_callback(self.__log_ok_button_callback)
        log_window.protocol("WM_DELETE_WINDOW",
                            self.__log_ok_button_callback)
        log_window.set_reload_button_callback(
            self.__log_reload_button_callback)
        new_event = asyncio.Event()
        self.__child_windows['log'] = {'window': log_window,
                                       'event': new_event,
                                       'close': False}
        task = asyncio.create_task(self.__log_event_loop())
        self.__background_tasks.add(task)
        task.add_done_callback(self.__background_tasks.discard)
        task = asyncio.create_task(self.__log_update_loop())
        self.__background_tasks.add(task)
        task.add_done_callback(self.__background_tasks.discard)
    except BaseException as ex:  # pylint: disable=broad-except
        if log_window is not None:
            log_window.destroy()
        new_log = self.__child_windows.get('log')
        if new_log and not new_log['close']:
            new_log['close'] = True
            new_log['event'].set()
        msg = 'Errore durante l\'apertura della finestra di log'
        logging.error(msg, exc_info=ex)

__log_event_loop() async

Update the log window with new messages.

Source code in controller.py
async def __log_event_loop(self) -> None:
    """
    Update the log window with new messages.
    """
    while not self.__child_windows['log']['close']:
        self.__child_windows['log']['window'].update()
        await self.__child_windows['log']['event'].wait()
        self.__child_windows['log']['event'].clear()
    self.__child_windows['log']['window'].withdraw()
    self.__child_windows['log']['window'].destroy()
    self.main_window.focus_force()

__log_ok_button_callback()

Manage the ok button events in the log window.

Source code in controller.py
def __log_ok_button_callback(self) -> None:
    """
    Manage the ok button events in the log window.
    """
    if self.__child_windows.get('log'):
        self.__child_windows['log']['close'] = True
        self.__child_windows['log']['event'].set()

__log_reload_button_callback()

Manage the reload button events in the log window.

Source code in controller.py
def __log_reload_button_callback(self) -> None:
    """
    Manage the reload button events in the log window.
    """
    messages = self.log_obj.get_all_messages()
    self.__child_windows['log']['window'].update_logs(messages)
    self.__child_windows['log']['window'].set_logs_updated(False)
    self.__child_windows['log']['event'].set()

__log_update_loop() async

Update the log window indicator when new messages are available.

Source code in controller.py
async def __log_update_loop(self) -> None:
    """
    Update the log window indicator when new messages are available.
    """
    n_messages = self.log_obj.total_messages()
    while not self.__child_windows['log']['close']:
        if self.log_obj.total_messages() > n_messages:
            self.__child_windows['log']['window'].set_logs_updated(True)
            n_messages = self.log_obj.total_messages()
        await asyncio.sleep(.1)

__presets_callback(preset)

Call the given preset on the camera. :param preset: The preset name to call.

Source code in controller.py
def __presets_callback(self, preset: str) -> None:
    """
    Call the given preset on the camera.
    :param preset: The preset name to call.
    """
    task = asyncio.create_task(call_preset(preset), name=preset)
    self.__background_tasks.add(task)
    task.add_done_callback(self.__background_tasks.discard)
    self.window_event.set()

__preview_callback()

Manage the preview button events.

Source code in controller.py
def __preview_callback(self) -> None:
    """
    Manage the preview button events.
    """
    StreamingStatus().switch_obs_live_view()
    self.window_event.set()

__prog_button_callback(event_id=None)

Manage the program button events.

Source code in controller.py
def __prog_button_callback(self, event_id = None) -> None:
    """
    Manage the program button events.
    """
    old_prog = self.__child_windows.get('prog')
    if old_prog and not old_prog['close']:
        old_prog['window'].focus_force()
        return
    prog_window = None
    try:
        events = self.event_scheduler.events
        categories = LiveEvent.get_youtube_categories()
        playlists = LiveEvent.get_youtube_playlists()

        prog_window = ProgramWindow(self.main_window, categories, playlists)
        prog_window.protocol("WM_DELETE_WINDOW", self.__prog_exit_callback)
        prog_window.set_first_button_callback(self.__prog_first_callback)
        prog_window.set_prev_button_callback(self.__prog_prev_callback)
        prog_window.set_next_button_callback(self.__prog_next_callback)
        prog_window.set_last_button_callback(self.__prog_last_callback)
        prog_window.set_save_button_callback(self.__prog_save_callback)
        prog_window.set_new_button_callback(self.__prog_new_callback)
        prog_window.set_copy_button_callback(self.__prog_copy_callback)
        prog_window.set_delete_button_callback(self.__prog_delete_callback)
        prog_window.set_reload_button_callback(self.__prog_reload_callback)
        new_event = asyncio.Event()
        self.__child_windows['prog'] = {'window': prog_window,
                                        'event': new_event,
                                        'close': False,
                                        'shown': -1}
        if len(events) > 0:
            if event_id is not None:
                event_id = self.__identify_event(event_id)
                self.__child_windows['prog']['shown'] = event_id
                prog_window.set(events[event_id].get_config(),
                                event_id + 1, len(events))
            else:
                self.__child_windows['prog']['shown'] = 0
                prog_window.set(events[0].get_config(), 1, len(events))
        else:
            self.__prog_new_callback()
        task = asyncio.create_task(self.__prog_event_loop())
        self.__background_tasks.add(task)
        task.add_done_callback(self.__background_tasks.discard)
    except BaseException as ex:  # pylint: disable=broad-except
        if prog_window is not None:
            prog_window.destroy()
        new_prog = self.__child_windows.get('prog')
        if new_prog and not new_prog['close']:
            new_prog['close'] = True
            new_prog['event'].set()
        msg = 'Errore durante l\'apertura della finestra di programmazione'
        logging.error(msg, exc_info=ex)

__prog_copy_callback()

Manage the copy button event.

Source code in controller.py
def __prog_copy_callback(self) -> None:
    """
    Manage the copy button event.
    """
    events = self.event_scheduler.events
    shown = self.__child_windows['prog']['shown']
    prog_window = self.__child_windows['prog']['window']
    if shown == -1:
        return
    self.__child_windows['prog']['shown'] = -1
    prog_window.set(events[shown].get_config())

__prog_delete_callback()

Manage the delete button event.

Source code in controller.py
def __prog_delete_callback(self) -> None:
    """
    Manage the delete button event.
    """
    events = self.event_scheduler.events
    shown = self.__child_windows['prog']['shown']
    prog_window = self.__child_windows['prog']['window']
    if shown >= 0 and events[shown].remove_schedule():
        events.pop(shown)
        StreamingStatus().set_refresh_program(True)
    shown = min(shown, len(events) - 1)
    if shown < 0:
        self.__prog_new_callback()
    else:
        self.__child_windows['prog']['shown'] = shown
        prog_window.set(events[shown].get_config(), shown + 1, len(events))

__prog_event_loop() async

Update the program window.

Source code in controller.py
async def __prog_event_loop(self) -> None:
    """
    Update the program window.
    """
    while not self.__child_windows['prog']['close']:
        self.__child_windows['prog']['window'].update()
        await self.__child_windows['prog']['event'].wait()
        self.__child_windows['prog']['event'].clear()
    self.__child_windows['prog']['window'].withdraw()
    self.__child_windows['prog']['window'].destroy()
    self.main_window.focus_force()

__prog_exit_callback()

Manage the close window event.

Source code in controller.py
def __prog_exit_callback(self) -> None:
    """
    Manage the close window event.
    """
    if self.__child_windows.get('prog'):
        self.__child_windows['prog']['close'] = True
        self.__child_windows['prog']['event'].set()

__prog_first_callback()

Manage the first button event.

Source code in controller.py
def __prog_first_callback(self) -> None:
    """
    Manage the first button event.
    """
    events = self.event_scheduler.events
    shown = self.__child_windows['prog']['shown']
    prog_window = self.__child_windows['prog']['window']
    if shown == 0:
        return
    shown = 0
    self.__child_windows['prog']['shown'] = shown
    prog_window.set(events[shown].get_config(), shown + 1, len(events))

__prog_last_callback()

Manage the last button event.

Source code in controller.py
def __prog_last_callback(self) -> None:
    """
    Manage the last button event.
    """
    events = self.event_scheduler.events
    shown = self.__child_windows['prog']['shown']
    prog_window = self.__child_windows['prog']['window']
    if shown == len(events) - 1:
        return
    shown = len(events) - 1
    self.__child_windows['prog']['shown'] = shown
    prog_window.set(events[shown].get_config(), shown + 1, len(events))

__prog_new_callback()

Manage the new button event.

Source code in controller.py
def __prog_new_callback(self) -> None:
    """
    Manage the new button event.
    """
    self.__child_windows['prog']['shown'] = -1
    default = DefaultConfig().get('youtube_default')
    self.__child_windows['prog']['window'].set(default)

__prog_next_callback()

Manage the next button event.

Source code in controller.py
def __prog_next_callback(self) -> None:
    """
    Manage the next button event.
    """
    events = self.event_scheduler.events
    shown = self.__child_windows['prog']['shown']
    prog_window = self.__child_windows['prog']['window']
    if shown == len(events) - 1:
        return
    shown += 1
    self.__child_windows['prog']['shown'] = shown
    prog_window.set(events[shown].get_config(), shown + 1, len(events))

__prog_prev_callback()

Manage the previous button event.

Source code in controller.py
def __prog_prev_callback(self) -> None:
    """
    Manage the previous button event.
    """
    events = self.event_scheduler.events
    shown = self.__child_windows['prog']['shown']
    prog_window = self.__child_windows['prog']['window']
    if shown == 0:
        return
    shown -= 1
    self.__child_windows['prog']['shown'] = shown
    prog_window.set(events[shown].get_config(), shown + 1, len(events))

__prog_reload_callback()

Manage the reload button event.

Source code in controller.py
def __prog_reload_callback(self) -> None:
    """
    Manage the reload button event.
    """
    self.event_scheduler.reload_from_file()
    StreamingStatus().set_refresh_program(True)
    showinfo('Caricamento', 'Eventi caricati correttamente',
             parent=self.__child_windows['prog']['window'])
    events = self.event_scheduler.events
    prog_window = self.__child_windows['prog']['window']
    if len(events) > 0:
        self.__child_windows['prog']['shown'] = 0
        prog_window.set(events[0].get_config(), 1, len(events))
    else:
        self.__prog_new_callback()

__prog_save_callback()

Manage the save button event.

Source code in controller.py
def __prog_save_callback(self) -> None:
    """
    Manage the save button event.
    """
    events = self.event_scheduler.events
    shown = self.__child_windows['prog']['shown']
    prog_window = self.__child_windows['prog']['window']
    if shown == -1:
        self.event_scheduler.add_event(prog_window.get())
        shown = len(events) - 1
        prog_window.set(events[shown].get_config(), shown + 1, len(events))
    else:
        events[shown].reschedule(prog_window.get())
    StreamingStatus().set_refresh_program(True)
    if askquestion('Salvataggio', 'Vuoi salvare la programmazione su file?',
                   parent=prog_window) == 'yes':
        if self.event_scheduler.save_to_file():
            showinfo('Salvataggio', 'Evento salvato correttamente',
                     parent=prog_window)

__set_close()

Set the close flag to True.

Source code in controller.py
def __set_close(self):
    """
    Set the close flag to True.
    """
    self.__close = True
    self.__log_ok_button_callback()
    self.__prog_exit_callback()
    self.window_event.set()

__setup_log(path='./streaming.log') staticmethod

Setup the log file for the application. :param path: The path of the log file.

:return: The custom log object for the application.

Source code in controller.py
@staticmethod
def __setup_log(path: str = r'./streaming.log') -> CustomLog:
    """
    Setup the log file for the application.
    :param path: The path of the log file.

    :return: The custom log object for the application.
    """
    handlers = [logging.StreamHandler(log_obj := CustomLog()),
                RotatingFileHandler(abspath(path), maxBytes=1048576 * 5,
                                    backupCount=7, encoding='utf-8')]
    fmt = '%(levelname)s (%(asctime)s) [%(threadName)-10.15s]: %(message)s'
    logging.basicConfig(format=fmt, datefmt='%d/%m/%Y %H:%M:%S', force=True,
                        level=logging.INFO, handlers=handlers)
    return log_obj

__write_log_on_gui() async

Write the last log message on the GUI.

Source code in controller.py
async def __write_log_on_gui(self) -> None:
    """
    Write the last log message on the GUI.
    """
    while not self.__close:
        for message in self.log_obj.get_messages_to_print():
            level = message.split(' ', 1)[0]
            msg = f' {message.partition(chr(10))[0]}'
            self.main_window.set_status_message(msg, level)
        self.window_event.set()
        await asyncio.sleep(.1)

run_app() async

Start the application gui and run the main loop. After exiting main loop, stops all active threads.

Source code in controller.py
async def run_app(self) -> None:
    """
    Start the application gui and run the main loop.
    After exiting main loop, stops all active threads.
    """
    status: StreamingStatus = StreamingStatus()
    status.add_update_callback(self.window_event.set)
    while not self.__close:
        if status.dispatch_updates():
            self.main_window.set_title_description(status.get_title(),
                                                   status.get_description())
            enabled = not status.is_busy()
            self.main_window.set_enabled_title_description(enabled)
            self.main_window.remaining_time(status.get_time_left())
            self.main_window.set_live_status(status.get_live_status())
            if status.should_refresh_program():
                events = self.event_scheduler.get_future_events()
                self.main_window.set_events(events)
                status.set_refresh_program(False)
        self.main_window.update()
        if self.__prefetch is not None and self.__prefetch.done():
            await self.__prefetch
            self.__prefetch = None
        await self.window_event.wait()
        self.window_event.clear()
        while task := StreamingStatus().get_ended_task():
            try:
                await task
            except BaseException as ex:  # pylint: disable=broad-except
                msg = 'Errore nel LiveEvent terminato'
                logging.error(msg, exc_info=ex)
    await self.shutdown()

shutdown() async

Shutdown the application and stop all active threads.

Source code in controller.py
async def shutdown(self) -> None:
    """
    Shutdown the application and stop all active threads.
    """
    logging.info("Chiusura del programma")
    self.main_window.withdraw()
    self.__set_close()
    if self.event_scheduler:
        self.event_scheduler.stop()
    if StreamingStatus().is_live():
        msg = "Trasmissione in corso durante la chiusura del programma" \
              f"programma:\n{StreamingStatus().get_owner()}"
        logging.error(msg)
    await self.telegram_bot.stop()
    for task in asyncio.all_tasks():
        if task != asyncio.current_task():
            try:
                msg = (f"Chiusura del task: {task.get_name()}\n"
                       f"Coroutine: {task.get_coro().__qualname__}")
                logging.debug(msg)
                await asyncio.wait_for(task, timeout=1)
            except BaseException as ex:  # pylint: disable=broad-except
                msg = f"Errore durante l'esecuzione di un task: {ex}"
                logging.error(msg, exc_info=ex)
    self.main_window.destroy()