Vai al contenuto

models

models

Models of the MVC design pattern for the application.

CustomLog

Bases: TextIOBase

Class that implements a custom log that can be used to redirect the output of the logging module to a widget.

Source code in models.py
class CustomLog(TextIOBase):
    """
    Class that implements a custom log that can be used to redirect the output
    of the logging module to a widget.
    """

    def __init__(self, size: int = 100000):
        """
        Constructor of the class. It initializes the list of messages and the
        lock used to synchronize the access to it.
        :param size: The maximum size of the log.
        """
        super().__init__()
        self.messages = []
        self.pending = 0
        self.maxsize = size
        self.log_lock = RLock()

    def __str__(self) -> str:
        """
        Returns the string representation of the log.

        :return: The string representation of the log.
        """
        with self.log_lock:
            return '\n'.join(reversed(self.messages))

    def write(self, msg: Any) -> None:
        """
        Writes the given message to the log.
        :param msg: The message to be written.
        """
        with self.log_lock:
            self.messages.append(str(msg))
            self.pending += 1
            self.shrink()

    def shrink(self) -> None:
        """
        Shrinks the log to the maximum size.
        """
        if self.maxsize is None or len(self.messages) == 1:
            return
        size = sum(map(len, self.messages))
        while size > self.maxsize:
            size -= len(self.messages.pop(0))

    def get_messages_to_print(self) -> tuple:
        """
        Returns the messages that have been written to the log since the last
        call to this method. The messages are returned in a tuple and then the
        log is cleared. If no messages have been written since the last call to
        this method, an empty tuple is returned.

        :return: The messages that have been written to the log.
        """
        with self.log_lock:
            result = self.messages[-self.pending:] if self.pending > 0 else []
            self.pending = 0
            return tuple(result)

    def get_all_messages(self) -> tuple:
        """
        Returns all the messages that have been written to the log as a tuple.

        :return: All the messages that have been written to the log.
        """
        with self.log_lock:
            return tuple(self.messages)

    def total_messages(self) -> int:
        """
        Returns the total number of messages that have been written to the log.

        :return: The total number of messages written.
        """
        with self.log_lock:
            return len(self.messages)

__init__(size=100000)

Constructor of the class. It initializes the list of messages and the lock used to synchronize the access to it. :param size: The maximum size of the log.

Source code in models.py
def __init__(self, size: int = 100000):
    """
    Constructor of the class. It initializes the list of messages and the
    lock used to synchronize the access to it.
    :param size: The maximum size of the log.
    """
    super().__init__()
    self.messages = []
    self.pending = 0
    self.maxsize = size
    self.log_lock = RLock()

__str__()

Returns the string representation of the log.

:return: The string representation of the log.

Source code in models.py
def __str__(self) -> str:
    """
    Returns the string representation of the log.

    :return: The string representation of the log.
    """
    with self.log_lock:
        return '\n'.join(reversed(self.messages))

get_all_messages()

Returns all the messages that have been written to the log as a tuple.

:return: All the messages that have been written to the log.

Source code in models.py
def get_all_messages(self) -> tuple:
    """
    Returns all the messages that have been written to the log as a tuple.

    :return: All the messages that have been written to the log.
    """
    with self.log_lock:
        return tuple(self.messages)

get_messages_to_print()

Returns the messages that have been written to the log since the last call to this method. The messages are returned in a tuple and then the log is cleared. If no messages have been written since the last call to this method, an empty tuple is returned.

:return: The messages that have been written to the log.

Source code in models.py
def get_messages_to_print(self) -> tuple:
    """
    Returns the messages that have been written to the log since the last
    call to this method. The messages are returned in a tuple and then the
    log is cleared. If no messages have been written since the last call to
    this method, an empty tuple is returned.

    :return: The messages that have been written to the log.
    """
    with self.log_lock:
        result = self.messages[-self.pending:] if self.pending > 0 else []
        self.pending = 0
        return tuple(result)

shrink()

Shrinks the log to the maximum size.

Source code in models.py
def shrink(self) -> None:
    """
    Shrinks the log to the maximum size.
    """
    if self.maxsize is None or len(self.messages) == 1:
        return
    size = sum(map(len, self.messages))
    while size > self.maxsize:
        size -= len(self.messages.pop(0))

total_messages()

Returns the total number of messages that have been written to the log.

:return: The total number of messages written.

Source code in models.py
def total_messages(self) -> int:
    """
    Returns the total number of messages that have been written to the log.

    :return: The total number of messages written.
    """
    with self.log_lock:
        return len(self.messages)

write(msg)

Writes the given message to the log. :param msg: The message to be written.

Source code in models.py
def write(self, msg: Any) -> None:
    """
    Writes the given message to the log.
    :param msg: The message to be written.
    """
    with self.log_lock:
        self.messages.append(str(msg))
        self.pending += 1
        self.shrink()

DefaultConfig

Class that loads the default configuration from a json file and provides it to the application as a singleton object.

Source code in models.py
@singleton
class DefaultConfig:
    """
    Class that loads the default configuration from a json file and provides
    it to the application as a singleton object.
    """

    def __init__(self, cfg_file: str = 'default.json'):
        """
        Constructor of the class. It loads the configuration from the json file
        and stores it in a dictionary.
        :param cfg_file: The path of the json file containing the configuration.

        :raise ConfigException: If the configuration file is empty.
        """
        self._config_lock = RLock()
        self.cfg_file = cfg_file
        self.config = {}
        self.yt_credentials: Credentials | None = None
        self.reload()

        # Schedula il rinnovo del token ogni 5 giorni
        every(5).days.do(self._refresh_youtube_token)

    def reload(self) -> None:
        """
        Reloads the configuration from the JSON file.

        :raise ConfigException: If the configuration file is empty.
        """
        with self._config_lock, open(self.cfg_file, encoding='utf-8') as jf:
            json_data = load(jf)
            if json_data is None or len(json_data) == 0:
                raise ConfigException(f"Configurazione vuota in {jf.name}")
            self.config = json_data
            log = self.config.get('log_level')
            if log is None or log not in logging.getLevelNamesMapping():
                log = logging.WARNING
            logging.getLogger().setLevel(log)
            log_debug(json_data)

    def get(self, key: str) -> Any:
        """
        Returns the value associated with the given key.
        :param key: The key of the value to be returned.

        :return: The value associated with the given key.
        """
        with self._config_lock:
            return self.config[key]

    def youtube_credentials(self) -> Credentials | None:
        """
        Get the credentials for the YouTube account. If the credentials are not
        available, ask the user to log in. If the credentials are expired, ask
        the user to refresh the token. If the credentials are valid, return
        them.

        :return: The credentials for the YouTube account.
        """
        scopes = ["https://www.googleapis.com/auth/youtube",
                  "https://www.googleapis.com/auth/youtube.force-ssl",
                  "https://www.googleapis.com/auth/youtube.upload"]
        token_file = 'token.json'
        client_secrets_file = 'client_id.json'

        if self.yt_credentials is None and exists('token.json'):
            # token.json stores the user's credentials from previously
            # successful logins
            logging.info('Caricamento credenziali dal file...')
            self.yt_credentials = Credentials.from_authorized_user_file(
                token_file, scopes)

        # If there are no valid credentials available, then either refresh
        # the token or log in.
        save_credentials = False
        if self.yt_credentials is None or not self.yt_credentials.valid:
            save_credentials = True
            creds: Credentials | None = self.yt_credentials
            if creds and creds.expired and creds.refresh_token:
                logging.info("Rinnovo l'Access Token...")
                try:
                    creds.refresh(GoogleRequest())
                except RefreshError as ex:
                    logging.error('Errore durante il rinnovo del token',
                                  exc_info=ex)
                    self.yt_credentials = None
                    if exists(token_file):
                        remove(token_file)
                    return self.youtube_credentials()
            else:
                logging.info("Richiesta di un nuovo token...")
                flow = InstalledAppFlow.from_client_secrets_file(
                    client_secrets_file, scopes)
                flow.run_local_server(port=8080, prompt='consent',
                                      authorization_prompt_message='')
                self.yt_credentials = flow.credentials

        if save_credentials:
            # Save the credentials for the next run
            with open(token_file, 'w', encoding='utf-8') as f:
                f.write(self.yt_credentials.to_json())
                logging.info("Credenziali salvate per i prossimi usi...")
        return self.yt_credentials

    def _refresh_youtube_token(self) -> None:
        """
        Rinnova il token YouTube silenziosamente.
        Schedulato ogni 6 giorni per evitare la scadenza del refresh token
        (che Google impone dopo 7 giorni per app in modalità Testing).
        """
        logging.info("Rinnovo programmato del token YouTube...")
        with self._config_lock:
            if self.yt_credentials and self.yt_credentials.refresh_token:
                try:
                    self.yt_credentials.refresh(GoogleRequest())
                    with open('token.json', 'w', encoding='utf-8') as f:
                        f.write(self.yt_credentials.to_json())
                    logging.info("Token YouTube rinnovato automaticamente.")
                except RefreshError as ex:
                    logging.error("Impossibile rinnovare il token automaticamente.",
                                  exc_info=ex)
                    # Il prossimo accesso alle credenziali forzerà un nuovo login
                    self.yt_credentials = None
            else:
                logging.warning("Nessuna credenziale disponibile per il rinnovo.")

__init__(cfg_file='default.json')

Constructor of the class. It loads the configuration from the json file and stores it in a dictionary. :param cfg_file: The path of the json file containing the configuration.

:raise ConfigException: If the configuration file is empty.

Source code in models.py
def __init__(self, cfg_file: str = 'default.json'):
    """
    Constructor of the class. It loads the configuration from the json file
    and stores it in a dictionary.
    :param cfg_file: The path of the json file containing the configuration.

    :raise ConfigException: If the configuration file is empty.
    """
    self._config_lock = RLock()
    self.cfg_file = cfg_file
    self.config = {}
    self.yt_credentials: Credentials | None = None
    self.reload()

    # Schedula il rinnovo del token ogni 5 giorni
    every(5).days.do(self._refresh_youtube_token)

get(key)

Returns the value associated with the given key. :param key: The key of the value to be returned.

:return: The value associated with the given key.

Source code in models.py
def get(self, key: str) -> Any:
    """
    Returns the value associated with the given key.
    :param key: The key of the value to be returned.

    :return: The value associated with the given key.
    """
    with self._config_lock:
        return self.config[key]

reload()

Reloads the configuration from the JSON file.

:raise ConfigException: If the configuration file is empty.

Source code in models.py
def reload(self) -> None:
    """
    Reloads the configuration from the JSON file.

    :raise ConfigException: If the configuration file is empty.
    """
    with self._config_lock, open(self.cfg_file, encoding='utf-8') as jf:
        json_data = load(jf)
        if json_data is None or len(json_data) == 0:
            raise ConfigException(f"Configurazione vuota in {jf.name}")
        self.config = json_data
        log = self.config.get('log_level')
        if log is None or log not in logging.getLevelNamesMapping():
            log = logging.WARNING
        logging.getLogger().setLevel(log)
        log_debug(json_data)

youtube_credentials()

Get the credentials for the YouTube account. If the credentials are not available, ask the user to log in. If the credentials are expired, ask the user to refresh the token. If the credentials are valid, return them.

:return: The credentials for the YouTube account.

Source code in models.py
def youtube_credentials(self) -> Credentials | None:
    """
    Get the credentials for the YouTube account. If the credentials are not
    available, ask the user to log in. If the credentials are expired, ask
    the user to refresh the token. If the credentials are valid, return
    them.

    :return: The credentials for the YouTube account.
    """
    scopes = ["https://www.googleapis.com/auth/youtube",
              "https://www.googleapis.com/auth/youtube.force-ssl",
              "https://www.googleapis.com/auth/youtube.upload"]
    token_file = 'token.json'
    client_secrets_file = 'client_id.json'

    if self.yt_credentials is None and exists('token.json'):
        # token.json stores the user's credentials from previously
        # successful logins
        logging.info('Caricamento credenziali dal file...')
        self.yt_credentials = Credentials.from_authorized_user_file(
            token_file, scopes)

    # If there are no valid credentials available, then either refresh
    # the token or log in.
    save_credentials = False
    if self.yt_credentials is None or not self.yt_credentials.valid:
        save_credentials = True
        creds: Credentials | None = self.yt_credentials
        if creds and creds.expired and creds.refresh_token:
            logging.info("Rinnovo l'Access Token...")
            try:
                creds.refresh(GoogleRequest())
            except RefreshError as ex:
                logging.error('Errore durante il rinnovo del token',
                              exc_info=ex)
                self.yt_credentials = None
                if exists(token_file):
                    remove(token_file)
                return self.youtube_credentials()
        else:
            logging.info("Richiesta di un nuovo token...")
            flow = InstalledAppFlow.from_client_secrets_file(
                client_secrets_file, scopes)
            flow.run_local_server(port=8080, prompt='consent',
                                  authorization_prompt_message='')
            self.yt_credentials = flow.credentials

    if save_credentials:
        # Save the credentials for the next run
        with open(token_file, 'w', encoding='utf-8') as f:
            f.write(self.yt_credentials.to_json())
            logging.info("Credenziali salvate per i prossimi usi...")
    return self.yt_credentials

EventScheduler

Bases: Thread

Thread that schedules the live events.

Source code in models.py
class EventScheduler(Thread):
    """
    Thread that schedules the live events.
    """

    def __init__(self, telegram, cfg_file: str = 'orari_registrazioni.json'):
        """
        Constructor of the class. It initializes the thread and loads the events
        from the json configuration file.
        :param telegram: The Telegram object used to send messages.
        :param cfg_file: The path of the json file containing the events.
        """
        super().__init__(name="Scheduler-Thread")
        self.__telegram = telegram
        self.__cfg_file = cfg_file
        self.events = None
        self.reload_from_file()
        self.__cease_continuous_run = Event()
        self.__process_events = Event()
        self.__process_events.set()
        self.start()

    def reload_from_file(self) -> None:
        """
        Loads the events from the JSON configuration file. If an event is not
        valid, it is discarded.
        """
        logging.info("Caricamento eventi da file...")
        if self.events is not None:
            self.__process_events.clear()
            for event in self.events:
                event.remove_schedule()
            self.__process_events.set()

        self.events = []
        with open(self.__cfg_file, encoding='utf-8') as jf:
            for event_data in load(jf):
                self.add_event(event_data)

    def add_event(self, event_data: dict[str, Any]) -> None:
        """
        Add an event to the list of events.
        :param event_data: The data of the event to be added.
        """
        default = DefaultConfig().get('youtube_default')
        try:
            event_data_map = ChainMap(event_data, default)
            log_debug(event := LiveEvent(self.__telegram, event_data_map))
            self.events.append(event)
        except Exception as ex:  # pylint: disable=broad-except
            logging.error('Evento programmato mal formato', exc_info=ex)

    def save_to_file(self) -> bool:
        """
        Save the events to the JSON configuration file.

        :return: True if the events have been saved, False otherwise (i.e. the
        events have not been modified).
        """
        logging.info("Salvataggio eventi su file...")
        events = [event.get_config(True) for event in self.events]
        if exists(self.__cfg_file):
            with open(self.__cfg_file, encoding='utf-8') as jf:
                if events == load(jf):
                    return False
        backup = f'{self.__cfg_file}_{datetime.now():%Y-%m-%d_%H%M%S}.bak'
        rename(self.__cfg_file, backup)
        with open(self.__cfg_file, 'w', encoding='utf-8') as jf:
            json.dump(events, jf, indent=2)
        return True

    def run(self) -> None:
        """
        Run the thread that processes the live events.
        """
        asyncio.run(self.__thread_job())

    async def __thread_job(self) -> None:
        """
        Async method that processes the live events.
        """
        while not self.__cease_continuous_run.is_set():
            self.__process_events.wait()
            run_pending()
            await asyncio.sleep(1)
        clear()

    def stop(self) -> None:
        """
        Stop the thread that schedules the live events.
        """
        self.__cease_continuous_run.set()

    def get_future_events(self) -> list[tuple[str, str, str]]:
        """
        Get the list of future events.

        :return: The list of future events.
        """
        if self.__cease_continuous_run.is_set():
            return []
        days = {'lu': "Lunedì", 'ma': "Martedì", 'me': "Mercoledì",
                'gi': "Giovedì", 've': "Venerdì", 'sa': "Sabato",
                'do': "Domenica"}
        event_type = {'recurring': 'R', 'onetime': 'O'}
        future_events: list[tuple[str, str, str]] = []
        for event in [event for event in self.events if event.enabled]:
            e_type = str(event_type[event.type])
            day = days[event.day.lower()] if e_type == 'R' else str(event.day)
            if e_type == 'O':
                o_day = f"{day} {event.time}"
                o_day = datetime.strptime(o_day, '%d/%m/%Y %H:%M')
                if o_day < datetime.now():
                    continue
            future_events.append((e_type, day, str(event.time)))
        r = [event for event in future_events if event[0] == 'R']
        r.sort(key=lambda x: (list(days.values()).index(x[1]), x[2]))
        o = [event for event in future_events if event[0] == 'O']
        o.sort(key=lambda x: (x[1].split('/')[2], x[1].split('/')[1],
                              x[1].split('/')[0], x[2]))
        return r + o

__init__(telegram, cfg_file='orari_registrazioni.json')

Constructor of the class. It initializes the thread and loads the events from the json configuration file. :param telegram: The Telegram object used to send messages. :param cfg_file: The path of the json file containing the events.

Source code in models.py
def __init__(self, telegram, cfg_file: str = 'orari_registrazioni.json'):
    """
    Constructor of the class. It initializes the thread and loads the events
    from the json configuration file.
    :param telegram: The Telegram object used to send messages.
    :param cfg_file: The path of the json file containing the events.
    """
    super().__init__(name="Scheduler-Thread")
    self.__telegram = telegram
    self.__cfg_file = cfg_file
    self.events = None
    self.reload_from_file()
    self.__cease_continuous_run = Event()
    self.__process_events = Event()
    self.__process_events.set()
    self.start()

__thread_job() async

Async method that processes the live events.

Source code in models.py
async def __thread_job(self) -> None:
    """
    Async method that processes the live events.
    """
    while not self.__cease_continuous_run.is_set():
        self.__process_events.wait()
        run_pending()
        await asyncio.sleep(1)
    clear()

add_event(event_data)

Add an event to the list of events. :param event_data: The data of the event to be added.

Source code in models.py
def add_event(self, event_data: dict[str, Any]) -> None:
    """
    Add an event to the list of events.
    :param event_data: The data of the event to be added.
    """
    default = DefaultConfig().get('youtube_default')
    try:
        event_data_map = ChainMap(event_data, default)
        log_debug(event := LiveEvent(self.__telegram, event_data_map))
        self.events.append(event)
    except Exception as ex:  # pylint: disable=broad-except
        logging.error('Evento programmato mal formato', exc_info=ex)

get_future_events()

Get the list of future events.

:return: The list of future events.

Source code in models.py
def get_future_events(self) -> list[tuple[str, str, str]]:
    """
    Get the list of future events.

    :return: The list of future events.
    """
    if self.__cease_continuous_run.is_set():
        return []
    days = {'lu': "Lunedì", 'ma': "Martedì", 'me': "Mercoledì",
            'gi': "Giovedì", 've': "Venerdì", 'sa': "Sabato",
            'do': "Domenica"}
    event_type = {'recurring': 'R', 'onetime': 'O'}
    future_events: list[tuple[str, str, str]] = []
    for event in [event for event in self.events if event.enabled]:
        e_type = str(event_type[event.type])
        day = days[event.day.lower()] if e_type == 'R' else str(event.day)
        if e_type == 'O':
            o_day = f"{day} {event.time}"
            o_day = datetime.strptime(o_day, '%d/%m/%Y %H:%M')
            if o_day < datetime.now():
                continue
        future_events.append((e_type, day, str(event.time)))
    r = [event for event in future_events if event[0] == 'R']
    r.sort(key=lambda x: (list(days.values()).index(x[1]), x[2]))
    o = [event for event in future_events if event[0] == 'O']
    o.sort(key=lambda x: (x[1].split('/')[2], x[1].split('/')[1],
                          x[1].split('/')[0], x[2]))
    return r + o

reload_from_file()

Loads the events from the JSON configuration file. If an event is not valid, it is discarded.

Source code in models.py
def reload_from_file(self) -> None:
    """
    Loads the events from the JSON configuration file. If an event is not
    valid, it is discarded.
    """
    logging.info("Caricamento eventi da file...")
    if self.events is not None:
        self.__process_events.clear()
        for event in self.events:
            event.remove_schedule()
        self.__process_events.set()

    self.events = []
    with open(self.__cfg_file, encoding='utf-8') as jf:
        for event_data in load(jf):
            self.add_event(event_data)

run()

Run the thread that processes the live events.

Source code in models.py
def run(self) -> None:
    """
    Run the thread that processes the live events.
    """
    asyncio.run(self.__thread_job())

save_to_file()

Save the events to the JSON configuration file.

:return: True if the events have been saved, False otherwise (i.e. the events have not been modified).

Source code in models.py
def save_to_file(self) -> bool:
    """
    Save the events to the JSON configuration file.

    :return: True if the events have been saved, False otherwise (i.e. the
    events have not been modified).
    """
    logging.info("Salvataggio eventi su file...")
    events = [event.get_config(True) for event in self.events]
    if exists(self.__cfg_file):
        with open(self.__cfg_file, encoding='utf-8') as jf:
            if events == load(jf):
                return False
    backup = f'{self.__cfg_file}_{datetime.now():%Y-%m-%d_%H%M%S}.bak'
    rename(self.__cfg_file, backup)
    with open(self.__cfg_file, 'w', encoding='utf-8') as jf:
        json.dump(events, jf, indent=2)
    return True

stop()

Stop the thread that schedules the live events.

Source code in models.py
def stop(self) -> None:
    """
    Stop the thread that schedules the live events.
    """
    self.__cease_continuous_run.set()

LiveEvent

Class that represents a recurring or onetime live event and schedules it. @DynamicAttrs

Source code in models.py
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
class LiveEvent:
    """
    Class that represents a recurring or onetime live event and schedules it.
    @DynamicAttrs
    """
    EVENT_TYPE_RECURRING = "recurring"
    EVENT_TYPE_ONETIME = "onetime"

    SHORT_TIME_PATTERN = re.compile(r"\d:\d\d")
    HOUR_MINUTE_PATTERN = re.compile(r"\d\d:\d\d")

    DATE_FORMAT = "%d/%m/%Y"
    TIME_FORMAT = "%H:%M"
    TIME_WITH_SECONDS_FORMAT = "%H:%M:%S"
    DATETIME_FORMAT = "%d/%m/%Y %H:%M:%S"

    __yt_lock = RLock()

    def __init__(self, telegram, event_data: ChainMap):
        """
        Constructor of the class. It initializes the event data.
        :param telegram: The Telegram bot object.
        :param event_data: The data of the event.
        """
        self.__telegram = telegram
        self.__uuid = uuid4()
        self.__streaming_config = None
        self.__background_tasks = set()
        self.__update_event(dict(event_data))
        self.time = self.__normalize_time(self.time)
        self.__configure_schedule()

    def __configure_schedule(self) -> None:
        """
        Configure the event schedule according to the current event data.
        """
        self.__schedule = None
        self.__schedule_prepare = None

        if not self.enabled:
            return

        if self.type == self.EVENT_TYPE_RECURRING:
            self.__schedule, self.__schedule_prepare = self.__recurring()
        elif self.type == self.EVENT_TYPE_ONETIME:
            self.__schedule, self.__schedule_prepare = self.__onetime()

    @classmethod
    def __normalize_time(cls, time: str) -> str:
        """
        Normalize a time value to ensure single-digit hours are zero-padded.
        :param time: The time value to normalize.
        :return: The normalized time value.
        """
        if cls.SHORT_TIME_PATTERN.match(time):
            return f"0{time}"
        return time

    @classmethod
    def __ensure_seconds(cls, time: str) -> str:
        """
        Ensure a time value includes seconds.
        :param time: The time value to normalize.
        :return: The time value with seconds.
        """
        if cls.HOUR_MINUTE_PATTERN.fullmatch(time):
            return f"{time}:00"
        return time

    def __recurring(self) -> tuple[Job, Job]:
        """
        Set up the recurring event schedule.

        :return: The schedule and the schedule_prepare objects.
        """
        early = self.__schedule_early(None, self.time)
        r_schedule = LiveEvent.schedule_every(self.day, early['anticipate'])
        r_schedule.at(early['time'])
        r_schedule.do(self.__to_task, self.go_live, name='RE_go_live')

        early = self.__schedule_early(early['date'], self.time,
                                      self.prepare * 60)
        r_prepare = LiveEvent.schedule_every(self.day, early['anticipate'])
        r_prepare.at(early['time'])
        r_prepare.do(self.__to_task, self.prepare_live, name='RE_prepare')
        return r_schedule, r_prepare

    def __onetime(self) -> tuple[Job | None, Job | None]:
        """
        Set up the one-time event schedule.

        :return: The schedule and the schedule_prepare objects.
        """
        time = self.time
        if self.HOUR_MINUTE_PATTERN.fullmatch(time):
            time = f'{time}:00'
        event_date = f"{self.day} {time}"
        event_date = datetime.strptime(event_date, self.DATETIME_FORMAT)
        now = datetime.now()
        if event_date >= now:
            early = self.__schedule_early(None, self.time)
            o_schedule = every().day.at(early['time'])
            o_schedule.do(self.__run_once, event_date, self.__to_task,
                          self.go_live, name='OT_go_live')

            p_time = event_date + timedelta(minutes=-self.prepare)
            if p_time >= now:
                o_prepare = every().day.at(f"{p_time:{self.TIME_WITH_SECONDS_FORMAT}}")
                o_prepare.do(self.__run_once, p_time, self.__to_task,
                             self.prepare_live, name='OT_prepare')
                return o_schedule, o_prepare
            return o_schedule, None
        return None, None

    @staticmethod
    def __schedule_early(date: str | None, time: str,
                         sec: int = 5) -> dict[str, str | bool]:
        if date is None:
            date_d = datetime.today().date()
        else:
            date_d = datetime.strptime(date, LiveEvent.DATE_FORMAT).date()
        try:
            time_t = datetime.strptime(time, LiveEvent.TIME_FORMAT).time()
        except ValueError:
            logging.warning(f"Ora non valida: {time}. Provo a correggerla...")
            time_t = datetime.strptime(time, LiveEvent.TIME_WITH_SECONDS_FORMAT).time()
        datetime_obj = datetime.combine(date_d, time_t)
        datetime_obj = datetime_obj - timedelta(seconds=sec)
        return {
            'time': datetime_obj.strftime(LiveEvent.TIME_WITH_SECONDS_FORMAT),
            'date': datetime_obj.date().strftime(LiveEvent.DATE_FORMAT),
            'anticipate': datetime_obj.date() != date
        }

    def __to_task(self, coro: Callable, *args: Any, **kwargs: Any) -> None:
        """
        Add a coroutine to the background tasks.
        :param coro: The coroutine to be added.
        :param args: The arguments to be passed to the coroutine.
        :param kwargs: The keyword arguments to be passed to the coroutine.
        """
        coro = coro()
        if asyncio.iscoroutine(coro):
            task = asyncio.create_task(coro, *args, **kwargs)
            self.__background_tasks.add(task)
            task.add_done_callback(self.__task_done)

    def __task_done(self, task: asyncio.Task) -> None:
        """
        Remove the task from the background tasks.
        :param task: The task to be removed.
        """
        self.__background_tasks.discard(task)
        StreamingStatus().add_ended_task(task)

    @staticmethod
    def __run_once(event_date: datetime, job_func: Callable, *args,
                   **kvargs) -> Type[CancelJob] | None:
        """
        Run a function once at a specific date and time.
        :param event_date: date and time to run the function at
        :param job_func: function to run once
        :param args: arguments (positional) to pass to the function
        :param kvargs: arguments (keyword) to pass to the function

        :return: None if the function was not scheduled, CancelJob otherwise
        """
        now = datetime.now()
        if abs((now - event_date).total_seconds()) < 60:
            msg = f"Avvio singolo alle {now:{LiveEvent.DATETIME_FORMAT}}"
            logging.info(msg)
            try:
                job_func(*args, **kvargs)
                return CancelJob
            except BaseException as ex:  # pylint: disable=broad-except
                msg = f"Errore non gestito in avvio singolo: {ex}"
                logging.error(msg)
        msg = f"Avvio singolo futuro alle {event_date:{LiveEvent.DATETIME_FORMAT}}"
        logging.info(msg)
        return None

    def remove_schedule(self) -> bool:
        """
        Removes the schedule of the event.

        :return: True if the schedule has been removed, False otherwise.
        """
        if self.__streaming_config is None:
            if self.__schedule is not None:
                cancel_job(self.__schedule)
            if self.__schedule_prepare is not None:
                cancel_job(self.__schedule_prepare)
            return True
        return False

    def reschedule(self, update: dict[str, Any] = None) -> bool:
        """
        Reschedules the event.
        :param update: The new data of the event.

        :return: True if the event has been rescheduled, False otherwise.
        """
        if not self.remove_schedule():
            return False
        self.__update_event(update)
        self.__streaming_config = None
        self.time = self.__normalize_time(self.time)
        self.__configure_schedule()
        return True

    def __update_event(self, event_data: dict[str, Any] = None) -> None:
        """
        Updates the event data.
        :param event_data: The new data of the event.
        """
        if event_data is not None:
            for key in event_data:
                setattr(self, key, event_data[key])

    def get_config(self, basic: bool = False) -> dict[str, Any]:
        """
        Returns the configuration of the event.
        :param basic: If True, the basic configuration is returned, otherwise
        the complete configuration is returned.

        :return: The configuration of the event.
        """
        config = {k: v for k, v in vars(self).items() if not k.startswith('_')}
        if basic:
            default = DefaultConfig().get('youtube_default')
            config = {k: v for k, v in config.items() if v != default.get(k)}
        return config

    def __eq__(self, other: object) -> bool:
        """
        Compare the LiveEvent with the given object. The comparison is based on
        the uuid of the LiveEvent. If the given object is not a LiveEvent, the
        comparison returns NotImplemented.
        :param other: The object to be compared with the LiveEvent.

        :return: True if the LiveEvent is equal to the given object, False
        otherwise.
        """
        if isinstance(other, LiveEvent):
            # pylint: disable=protected-access
            return self.__uuid == other.__uuid
        return NotImplemented

    async def go_live(self) -> None:
        """
        Async method that starts the live-streaming.
        """
        if StreamingStatus().is_live():
            return
        logging.info('Live streaming avviata')
        config = DefaultConfig()
        ws = None
        live_done = False
        try:
            await self.__setup_go_live()

            ws = simpleobsws.WebSocketClient(url=config.get('obs_url'),
                                             password=config.get('obs_pwd'))
            await ws.connect()
            await ws.wait_until_identified()
            p = {'inputName': config.get('youtube_default')['source'],
                 'mediaAction': 'OBS_WEBSOCKET_MEDIA_INPUT_ACTION_RESTART'}
            await LiveEvent.obs_exec(ws, 'TriggerMediaInputAction', p)
            p = {'streamServiceType': 'rtmp_custom',
                 'streamServiceSettings': {
                     'server': self.__streaming_config['rtmp_url'],
                     'key': self.__streaming_config['streaming_key']}
                 }
            await LiveEvent.obs_exec(ws, 'SetStreamServiceSettings', p)
            await self.__notify_streaming_start()
            await LiveEvent.obs_exec(ws, 'StartStream')
            await self.__streaming_in_progress()
            await LiveEvent.obs_exec(ws, 'StopStream')
            await self.__notify_streaming_ended()

            live_done = True
        except Exception as ex:  # pylint: disable=broad-except
            logging.error('Errore durante la live', exc_info=ex)
            self.__telegram.send('Errore durante la live. Avviala manualmente!',
                                 exception=ex)
        finally:
            if ws is not None:
                await ws.disconnect()
            StreamingStatus().set_ready(self)
            StreamingStatus().set_refresh_program(True)
            if self.enable_presets:
                await call_preset(config.get('offline_preset'))

        # Cancello la chiave di stream
        try:
            if live_done and self.__streaming_config['input_stream_id']:
                await asyncio.sleep(120)
                with self.__youtube_credentials() as youtube:
                    self.__delete_key(youtube)
                    logging.info('Streaming key cancellata correttamente')
        except Exception as ex:  # pylint: disable=broad-except
            logging.error('Impossibile cancellare la chiave di stream',
                          exc_info=ex)
        self.__streaming_config = None

    async def __setup_go_live(self):
        """
        Set up the live-streaming in go_live method.
        """
        if self.__streaming_config is None:
            await self.prepare_live(now=True)
        logging.info(str(self))
        StreamingStatus().set_live(self)
        if self.enable_presets:
            default_preset = DefaultConfig().get('streaming_preset')
            await call_preset(default_preset)
            await asyncio.sleep(3)

    async def __notify_streaming_start(self):
        """
        Notify the start of the live-streaming.
        """
        msg = ("Indirizzo del video: https://youtu.be/"
               f"{self.__streaming_config['output_stream_id']}")
        logging.info(msg)
        if self.privacy == 'public':
            msg = ("Avvio della live: https://youtu.be/"
                   f"{self.__streaming_config['output_stream_id']}")
            self.__telegram.send(msg)

    @staticmethod
    async def __streaming_in_progress():
        """
        Async method that waits for the live-streaming to end.
        """
        sec = 0
        status = StreamingStatus()
        while sec < status.get_total_time() and not status.should_stop():
            await asyncio.sleep(1)
            sec += 1
            status.set_time_left(sec)

    async def __notify_streaming_ended(self):
        """
        Notify the end of the live-streaming.
        """
        logging.info("Live conclusa correttamente!")
        if self.privacy == 'public':
            self.__telegram.send("Live conclusa correttamente!")

    def __delete_key(self, youtube: Any) -> None:
        """
        Method that deletes the streaming key after 2 minutes.
        :param youtube: The YouTube object used to delete the streaming key.
        """
        delete_key = self.__streaming_config['input_stream_id']
        response = youtube.liveStreams().delete(id=delete_key).execute()
        log_debug(response)

    @staticmethod
    @contextmanager
    def __youtube_credentials() -> Credentials:
        """
        Context manager that returns the YouTube object used to create the live
        broadcast and the live stream.
        :yield: The YouTube object used to create the live broadcast and the
        live stream.
        """
        credentials = DefaultConfig().youtube_credentials()
        # noinspection PyPackageRequirements
        # pylint: disable=C0415
        from googleapiclient.discovery import build
        youtube = None
        try:
            youtube = build('youtube', 'v3', credentials=credentials,
                            static_discovery=False, num_retries=20,
                            cache=MemoryCache())
            yield youtube
        finally:
            if youtube is not None:
                youtube.close()

    async def prepare_live(self, now: bool = False) -> None:
        """
        Prepares the live-streaming by setting up the streaming configuration.
        :param now: If True, the live-streaming is prepared immediately.

        :raise PrepareLiveException: If the live-streaming configuration is not
        valid.
        """
        if StreamingStatus().is_prepared():
            return
        logging.info('Preparazione della trasmissione programmata')
        StreamingStatus().set_prepared(self)

        try:
            with self.__youtube_credentials() as yt:
                title, desc, live_dt, privacy = self.__setup_prepare_live(now)
                StreamingStatus().set_title(title)
                StreamingStatus().set_description(desc)

                self.__streaming_config = {}

                s_id = await self.__create_live_broadcast(live_dt, privacy, yt)
                self.__streaming_config['output_stream_id'] = s_id

                l_id, s_key, rtmp_url = await self.__create_live_stream(yt)
                self.__streaming_config['input_stream_id'] = l_id
                self.__streaming_config['streaming_key'] = s_key
                self.__streaming_config['rtmp_url'] = rtmp_url
                logging.info(self.__streaming_config)
                if None in self.__streaming_config.values():
                    msg = ('Youtube ha restituito una configurazione non '
                           'valida per la prossima Live')
                    raise PrepareLiveException(msg)

                await self.__bind_streams(l_id, s_id, yt)
                await self.__configure_video(title, desc, live_dt, yt)
                if privacy == 'public' and self.playlist != '':
                    await self.__add_to_playlist(self.playlist, yt)
        except Exception as ex:  # pylint: disable=broad-except
            msg = ("Preparazione dell'evento fallita.\n"
                   f"Config: {self.__streaming_config}")
            logging.error(msg, exc_info=ex)
            msg = "Preparazione dell'evento fallita"
            self.__telegram.send(msg, exception=ex)
            self.__streaming_config = None
            StreamingStatus().set_ready(self)
        await self.__open_obs()

    def __setup_prepare_live(self, now: bool = False) -> (
            tuple)[str, str, datetime, str]:
        """
        Compute the parameters of the live-streaming.
        :param now: If True, the live-streaming is prepared immediately.

        :return: The title, the description, the date and the privacy of the
        live-streaming.
        """
        live_date = datetime.now(
            timezone.utc) if now else self.__schedule.next_run
        live_date = live_date.astimezone()
        title = self.title.replace('$DAY$', f"{live_date:{self.DATE_FORMAT}}")
        title = title.replace('$TIME$', self.time)
        description = self.description.replace('$DAY$', f"{live_date:{self.DATE_FORMAT}}")
        description = description.replace('$TIME$', self.time)
        privacy = self.privacy
        if privacy not in ('public', 'private', 'unlisted'):
            privacy = 'unlisted'
        return title, description, live_date, privacy

    async def __create_live_broadcast(self, live_date: datetime, privacy: str,
                                      youtube: Any) -> str:
        """
        Create a live broadcast object (the output stream) and set the
        parameters. Returns the id of the live broadcast.
        :param live_date: The date of the live broadcast.
        :param privacy: The privacy of the live.
        :param youtube: The YouTube object used to create the live broadcast.

        :return: The id of the live broadcast.
        """
        end_time = live_date + timedelta(minutes=self.duration)
        body = {
            'snippet': {
                'title': StreamingStatus().get_title(),
                'description': StreamingStatus().get_description(),
                'scheduledStartTime': str(live_date.isoformat()),
                'scheduledEndTime': str(end_time.isoformat())
            },
            'status': {
                'privacyStatus': privacy,
                'selfDeclaredMadeForKids': False
            },
            'contentDetails': {
                'enableDvr': self.enableDvr,
                'enableEmbed': True,
                'recordFromStart': True,
                'enableAutoStart': self.autoStart,
                'enableAutoStop': self.autoStop
            }
        }
        request = youtube.liveBroadcasts().insert(
            part="snippet,status,contentDetails", body=body)
        response = await asyncio.to_thread(request.execute)
        log_debug(response)
        return response['id']

    @staticmethod
    async def __create_live_stream(youtube: Any) -> Tuple[str, str, str]:
        """
        Create a live stream object (the input stream) and set the parameters.
        Returns the id, the stream key and the rtmp url of the live stream.
        :param youtube: The YouTube object used to create the live stream.

        :return: The id, the stream key and the rtmp url of the live stream.
        """
        body = {
            'snippet': {
                'title': StreamingStatus().get_title(),
                'description': StreamingStatus().get_description()
            },
            'cdn': {
                'ingestionType': "rtmp",
                'resolution': "variable",
                'frameRate': "variable"
            }
        }
        request = youtube.liveStreams().insert(
            part="snippet,cdn,contentDetails,status",
            body=body)
        response = await asyncio.to_thread(request.execute)
        log_debug(response)

        return (response['id'], response['cdn']['ingestionInfo']['streamName'],
                response['cdn']['ingestionInfo']['rtmpsIngestionAddress'])

    @staticmethod
    async def __bind_streams(input_stream_id: str, output_stream_id: str,
                             youtube: Any) -> None:
        """
        Bind the input stream to the output stream (i.e. the live broadcast).
        :param input_stream_id: The id of the input stream.
        :param output_stream_id: The id of the output stream.
        :param youtube: The YouTube object used to bind the streams.
        """
        request = youtube.liveBroadcasts().bind(
            part="id,contentDetails,status", id=output_stream_id,
            streamId=input_stream_id)
        response = await asyncio.to_thread(request.execute)
        log_debug(response)

    async def __configure_video(self, title: str, description: str,
                                live_date: datetime, youtube: Any) -> None:
        """
        Configure live-streaming video settings (i.e. title, description,
        category, language and recording date).
        :param title: The title of the live-streaming.
        :param description: The description of the live-streaming.
        :param live_date: The date of the live-streaming.
        :param youtube: The YouTube object used to configure the video.
        """
        body = {
            'id': self.__streaming_config['output_stream_id'],
            'snippet': {
                'title': title,
                'categoryId': self.category,
                'defaultLanguage': "it",
                'defaultAudioLanguage': "it",
                'description': description
            },
            'recordingDetails': {
                'recordingDate': live_date.isoformat()
            }
        }
        request = youtube.videos().update(
            part="id,snippet,recordingDetails", body=body)
        response = await asyncio.to_thread(request.execute)
        log_debug(response)

    async def __add_to_playlist(self, playlist: str, youtube: Any) -> None:
        """
        Add the video to the playlist with the given id.
        :param playlist: The id of the playlist.
        :param youtube: The YouTube object used to add the video to the
        playlist.
        """
        body = {
            'snippet': {
                'playlistId': playlist,
                'resourceId': {
                    'kind': "youtube#video",
                    'videoId': self.__streaming_config['output_stream_id']
                }
            }
        }
        # Aggiungo il video alla playlist
        request = youtube.playlistItems().insert(
            part="snippet", body=body)
        response = await asyncio.to_thread(request.execute)
        log_debug(response)

    @staticmethod
    @cache
    def get_youtube_categories() -> dict[str, str]:
        """
        Get the list of YouTube video categories.

        :return: The dictionary of YouTube video categories.
        """
        while True:
            with LiveEvent.__yt_lock:
                try:
                    with LiveEvent.__youtube_credentials() as youtube:
                        response = youtube.videoCategories().list(
                            part="snippet", hl="it", regionCode="IT").execute()
                        log_debug(response)
                        return {item['id']: item['snippet']['title'] for item in
                                response['items']}
                except Exception as ex:  # pylint: disable=broad-except
                    logging.error('Errore durante il recupero delle categorie',
                                  exc_info=ex)
            sleep(5)

    @staticmethod
    @cache
    def get_youtube_playlists() -> dict[str, str]:
        """
        Get the list of YouTube video playlists.

        :return: The dictionary of YouTube video playlists.
        """
        while True:
            with LiveEvent.__yt_lock:
                try:
                    with LiveEvent.__youtube_credentials() as youtube:
                        response = youtube.playlists().list(
                            part="snippet", mine=True, maxResults=50).execute()
                        log_debug(response)
                        return {item['id']: item['snippet']['title'] for item in
                                response['items']}
                except Exception as ex:  # pylint: disable=broad-except
                    logging.error('Errore durante il recupero delle playlist',
                                  exc_info=ex)
            sleep(5)

    async def __open_obs(self) -> None:
        """
        Open OBS if it is not running.
        """
        try:
            proc = process_iter()
            if not [p for p in proc if p.name().lower().startswith('obs64')]:
                path = r'C:\Program Files\obs-studio\bin\64bit\obs64.exe'
                cwd = r'C:\Program Files\obs-studio\bin\64bit'
                # pylint: disable=R1732
                await asyncio.to_thread(lambda: Popen(path, cwd=cwd))
                await asyncio.sleep(5)
        except Exception as ex:  # pylint: disable=broad-except
            logging.error('OBS non sembra essere in esecuzione.', exc_info=ex)
            msg = 'OBS non sembra essere in esecuzione.'
            self.__telegram.send(msg, exception=ex)

    def __str__(self) -> str:
        """
        Returns the string representation of the LiveEvent.

        :return: The string representation of the LiveEvent.
        """
        return str(vars(self))

    @staticmethod
    def schedule_every(day: str, anticipate: bool = False) -> Job:
        """
        Returns the schedule object associated with the given day.
        :param day: The day of the week to be scheduled (i.e. 'lu', 'ma', 'me',
        'gi', 've', 'sa', 'do').
        :param anticipate: If True, the schedule is set to the day before the
        given day.

        :raise ScheduleException: If the given day is not valid.
        """
        day = day.lower()
        days = {'lu': every().monday, 'ma': every().tuesday,
                'me': every().wednesday, 'gi': every().thursday,
                've': every().friday, 'sa': every().saturday,
                'do': every().sunday}
        if day in days:
            keys = list(days.keys())
            if anticipate:
                return days.get(keys[keys.index(day) - 1])
            return days.get(day)
        raise ScheduleException(f"Giorno '{day}' non valido")

    @staticmethod
    async def obs_exec(ws: simpleobsws.WebSocketClient, method: str,
                       params: dict = None) -> dict | None:
        """
        Execute the given command with the given parameters on OBS.
        :param ws: The websocket client to use.
        :param method: The command to call.
        :param params: The parameters of the command.

        :return: The response of the command.
        """
        request = simpleobsws.Request(method, {} if params is None else params)
        ret = await ws.call(request)
        if ret.ok():
            return ret.responseData
        msg = f"Errore comando su OBS\nREQUEST: {request}\nRESPONSE: {ret}"
        logging.error(msg)
        return None

__add_to_playlist(playlist, youtube) async

Add the video to the playlist with the given id. :param playlist: The id of the playlist. :param youtube: The YouTube object used to add the video to the playlist.

Source code in models.py
async def __add_to_playlist(self, playlist: str, youtube: Any) -> None:
    """
    Add the video to the playlist with the given id.
    :param playlist: The id of the playlist.
    :param youtube: The YouTube object used to add the video to the
    playlist.
    """
    body = {
        'snippet': {
            'playlistId': playlist,
            'resourceId': {
                'kind': "youtube#video",
                'videoId': self.__streaming_config['output_stream_id']
            }
        }
    }
    # Aggiungo il video alla playlist
    request = youtube.playlistItems().insert(
        part="snippet", body=body)
    response = await asyncio.to_thread(request.execute)
    log_debug(response)

__bind_streams(input_stream_id, output_stream_id, youtube) async staticmethod

Bind the input stream to the output stream (i.e. the live broadcast). :param input_stream_id: The id of the input stream. :param output_stream_id: The id of the output stream. :param youtube: The YouTube object used to bind the streams.

Source code in models.py
@staticmethod
async def __bind_streams(input_stream_id: str, output_stream_id: str,
                         youtube: Any) -> None:
    """
    Bind the input stream to the output stream (i.e. the live broadcast).
    :param input_stream_id: The id of the input stream.
    :param output_stream_id: The id of the output stream.
    :param youtube: The YouTube object used to bind the streams.
    """
    request = youtube.liveBroadcasts().bind(
        part="id,contentDetails,status", id=output_stream_id,
        streamId=input_stream_id)
    response = await asyncio.to_thread(request.execute)
    log_debug(response)

__configure_schedule()

Configure the event schedule according to the current event data.

Source code in models.py
def __configure_schedule(self) -> None:
    """
    Configure the event schedule according to the current event data.
    """
    self.__schedule = None
    self.__schedule_prepare = None

    if not self.enabled:
        return

    if self.type == self.EVENT_TYPE_RECURRING:
        self.__schedule, self.__schedule_prepare = self.__recurring()
    elif self.type == self.EVENT_TYPE_ONETIME:
        self.__schedule, self.__schedule_prepare = self.__onetime()

__configure_video(title, description, live_date, youtube) async

Configure live-streaming video settings (i.e. title, description, category, language and recording date). :param title: The title of the live-streaming. :param description: The description of the live-streaming. :param live_date: The date of the live-streaming. :param youtube: The YouTube object used to configure the video.

Source code in models.py
async def __configure_video(self, title: str, description: str,
                            live_date: datetime, youtube: Any) -> None:
    """
    Configure live-streaming video settings (i.e. title, description,
    category, language and recording date).
    :param title: The title of the live-streaming.
    :param description: The description of the live-streaming.
    :param live_date: The date of the live-streaming.
    :param youtube: The YouTube object used to configure the video.
    """
    body = {
        'id': self.__streaming_config['output_stream_id'],
        'snippet': {
            'title': title,
            'categoryId': self.category,
            'defaultLanguage': "it",
            'defaultAudioLanguage': "it",
            'description': description
        },
        'recordingDetails': {
            'recordingDate': live_date.isoformat()
        }
    }
    request = youtube.videos().update(
        part="id,snippet,recordingDetails", body=body)
    response = await asyncio.to_thread(request.execute)
    log_debug(response)

__create_live_broadcast(live_date, privacy, youtube) async

Create a live broadcast object (the output stream) and set the parameters. Returns the id of the live broadcast. :param live_date: The date of the live broadcast. :param privacy: The privacy of the live. :param youtube: The YouTube object used to create the live broadcast.

:return: The id of the live broadcast.

Source code in models.py
async def __create_live_broadcast(self, live_date: datetime, privacy: str,
                                  youtube: Any) -> str:
    """
    Create a live broadcast object (the output stream) and set the
    parameters. Returns the id of the live broadcast.
    :param live_date: The date of the live broadcast.
    :param privacy: The privacy of the live.
    :param youtube: The YouTube object used to create the live broadcast.

    :return: The id of the live broadcast.
    """
    end_time = live_date + timedelta(minutes=self.duration)
    body = {
        'snippet': {
            'title': StreamingStatus().get_title(),
            'description': StreamingStatus().get_description(),
            'scheduledStartTime': str(live_date.isoformat()),
            'scheduledEndTime': str(end_time.isoformat())
        },
        'status': {
            'privacyStatus': privacy,
            'selfDeclaredMadeForKids': False
        },
        'contentDetails': {
            'enableDvr': self.enableDvr,
            'enableEmbed': True,
            'recordFromStart': True,
            'enableAutoStart': self.autoStart,
            'enableAutoStop': self.autoStop
        }
    }
    request = youtube.liveBroadcasts().insert(
        part="snippet,status,contentDetails", body=body)
    response = await asyncio.to_thread(request.execute)
    log_debug(response)
    return response['id']

__create_live_stream(youtube) async staticmethod

Create a live stream object (the input stream) and set the parameters. Returns the id, the stream key and the rtmp url of the live stream. :param youtube: The YouTube object used to create the live stream.

:return: The id, the stream key and the rtmp url of the live stream.

Source code in models.py
@staticmethod
async def __create_live_stream(youtube: Any) -> Tuple[str, str, str]:
    """
    Create a live stream object (the input stream) and set the parameters.
    Returns the id, the stream key and the rtmp url of the live stream.
    :param youtube: The YouTube object used to create the live stream.

    :return: The id, the stream key and the rtmp url of the live stream.
    """
    body = {
        'snippet': {
            'title': StreamingStatus().get_title(),
            'description': StreamingStatus().get_description()
        },
        'cdn': {
            'ingestionType': "rtmp",
            'resolution': "variable",
            'frameRate': "variable"
        }
    }
    request = youtube.liveStreams().insert(
        part="snippet,cdn,contentDetails,status",
        body=body)
    response = await asyncio.to_thread(request.execute)
    log_debug(response)

    return (response['id'], response['cdn']['ingestionInfo']['streamName'],
            response['cdn']['ingestionInfo']['rtmpsIngestionAddress'])

__delete_key(youtube)

Method that deletes the streaming key after 2 minutes. :param youtube: The YouTube object used to delete the streaming key.

Source code in models.py
def __delete_key(self, youtube: Any) -> None:
    """
    Method that deletes the streaming key after 2 minutes.
    :param youtube: The YouTube object used to delete the streaming key.
    """
    delete_key = self.__streaming_config['input_stream_id']
    response = youtube.liveStreams().delete(id=delete_key).execute()
    log_debug(response)

__ensure_seconds(time) classmethod

Ensure a time value includes seconds. :param time: The time value to normalize. :return: The time value with seconds.

Source code in models.py
@classmethod
def __ensure_seconds(cls, time: str) -> str:
    """
    Ensure a time value includes seconds.
    :param time: The time value to normalize.
    :return: The time value with seconds.
    """
    if cls.HOUR_MINUTE_PATTERN.fullmatch(time):
        return f"{time}:00"
    return time

__eq__(other)

Compare the LiveEvent with the given object. The comparison is based on the uuid of the LiveEvent. If the given object is not a LiveEvent, the comparison returns NotImplemented. :param other: The object to be compared with the LiveEvent.

:return: True if the LiveEvent is equal to the given object, False otherwise.

Source code in models.py
def __eq__(self, other: object) -> bool:
    """
    Compare the LiveEvent with the given object. The comparison is based on
    the uuid of the LiveEvent. If the given object is not a LiveEvent, the
    comparison returns NotImplemented.
    :param other: The object to be compared with the LiveEvent.

    :return: True if the LiveEvent is equal to the given object, False
    otherwise.
    """
    if isinstance(other, LiveEvent):
        # pylint: disable=protected-access
        return self.__uuid == other.__uuid
    return NotImplemented

__init__(telegram, event_data)

Constructor of the class. It initializes the event data. :param telegram: The Telegram bot object. :param event_data: The data of the event.

Source code in models.py
def __init__(self, telegram, event_data: ChainMap):
    """
    Constructor of the class. It initializes the event data.
    :param telegram: The Telegram bot object.
    :param event_data: The data of the event.
    """
    self.__telegram = telegram
    self.__uuid = uuid4()
    self.__streaming_config = None
    self.__background_tasks = set()
    self.__update_event(dict(event_data))
    self.time = self.__normalize_time(self.time)
    self.__configure_schedule()

__normalize_time(time) classmethod

Normalize a time value to ensure single-digit hours are zero-padded. :param time: The time value to normalize. :return: The normalized time value.

Source code in models.py
@classmethod
def __normalize_time(cls, time: str) -> str:
    """
    Normalize a time value to ensure single-digit hours are zero-padded.
    :param time: The time value to normalize.
    :return: The normalized time value.
    """
    if cls.SHORT_TIME_PATTERN.match(time):
        return f"0{time}"
    return time

__notify_streaming_ended() async

Notify the end of the live-streaming.

Source code in models.py
async def __notify_streaming_ended(self):
    """
    Notify the end of the live-streaming.
    """
    logging.info("Live conclusa correttamente!")
    if self.privacy == 'public':
        self.__telegram.send("Live conclusa correttamente!")

__notify_streaming_start() async

Notify the start of the live-streaming.

Source code in models.py
async def __notify_streaming_start(self):
    """
    Notify the start of the live-streaming.
    """
    msg = ("Indirizzo del video: https://youtu.be/"
           f"{self.__streaming_config['output_stream_id']}")
    logging.info(msg)
    if self.privacy == 'public':
        msg = ("Avvio della live: https://youtu.be/"
               f"{self.__streaming_config['output_stream_id']}")
        self.__telegram.send(msg)

__onetime()

Set up the one-time event schedule.

:return: The schedule and the schedule_prepare objects.

Source code in models.py
def __onetime(self) -> tuple[Job | None, Job | None]:
    """
    Set up the one-time event schedule.

    :return: The schedule and the schedule_prepare objects.
    """
    time = self.time
    if self.HOUR_MINUTE_PATTERN.fullmatch(time):
        time = f'{time}:00'
    event_date = f"{self.day} {time}"
    event_date = datetime.strptime(event_date, self.DATETIME_FORMAT)
    now = datetime.now()
    if event_date >= now:
        early = self.__schedule_early(None, self.time)
        o_schedule = every().day.at(early['time'])
        o_schedule.do(self.__run_once, event_date, self.__to_task,
                      self.go_live, name='OT_go_live')

        p_time = event_date + timedelta(minutes=-self.prepare)
        if p_time >= now:
            o_prepare = every().day.at(f"{p_time:{self.TIME_WITH_SECONDS_FORMAT}}")
            o_prepare.do(self.__run_once, p_time, self.__to_task,
                         self.prepare_live, name='OT_prepare')
            return o_schedule, o_prepare
        return o_schedule, None
    return None, None

__open_obs() async

Open OBS if it is not running.

Source code in models.py
async def __open_obs(self) -> None:
    """
    Open OBS if it is not running.
    """
    try:
        proc = process_iter()
        if not [p for p in proc if p.name().lower().startswith('obs64')]:
            path = r'C:\Program Files\obs-studio\bin\64bit\obs64.exe'
            cwd = r'C:\Program Files\obs-studio\bin\64bit'
            # pylint: disable=R1732
            await asyncio.to_thread(lambda: Popen(path, cwd=cwd))
            await asyncio.sleep(5)
    except Exception as ex:  # pylint: disable=broad-except
        logging.error('OBS non sembra essere in esecuzione.', exc_info=ex)
        msg = 'OBS non sembra essere in esecuzione.'
        self.__telegram.send(msg, exception=ex)

__recurring()

Set up the recurring event schedule.

:return: The schedule and the schedule_prepare objects.

Source code in models.py
def __recurring(self) -> tuple[Job, Job]:
    """
    Set up the recurring event schedule.

    :return: The schedule and the schedule_prepare objects.
    """
    early = self.__schedule_early(None, self.time)
    r_schedule = LiveEvent.schedule_every(self.day, early['anticipate'])
    r_schedule.at(early['time'])
    r_schedule.do(self.__to_task, self.go_live, name='RE_go_live')

    early = self.__schedule_early(early['date'], self.time,
                                  self.prepare * 60)
    r_prepare = LiveEvent.schedule_every(self.day, early['anticipate'])
    r_prepare.at(early['time'])
    r_prepare.do(self.__to_task, self.prepare_live, name='RE_prepare')
    return r_schedule, r_prepare

__run_once(event_date, job_func, *args, **kvargs) staticmethod

Run a function once at a specific date and time. :param event_date: date and time to run the function at :param job_func: function to run once :param args: arguments (positional) to pass to the function :param kvargs: arguments (keyword) to pass to the function

:return: None if the function was not scheduled, CancelJob otherwise

Source code in models.py
@staticmethod
def __run_once(event_date: datetime, job_func: Callable, *args,
               **kvargs) -> Type[CancelJob] | None:
    """
    Run a function once at a specific date and time.
    :param event_date: date and time to run the function at
    :param job_func: function to run once
    :param args: arguments (positional) to pass to the function
    :param kvargs: arguments (keyword) to pass to the function

    :return: None if the function was not scheduled, CancelJob otherwise
    """
    now = datetime.now()
    if abs((now - event_date).total_seconds()) < 60:
        msg = f"Avvio singolo alle {now:{LiveEvent.DATETIME_FORMAT}}"
        logging.info(msg)
        try:
            job_func(*args, **kvargs)
            return CancelJob
        except BaseException as ex:  # pylint: disable=broad-except
            msg = f"Errore non gestito in avvio singolo: {ex}"
            logging.error(msg)
    msg = f"Avvio singolo futuro alle {event_date:{LiveEvent.DATETIME_FORMAT}}"
    logging.info(msg)
    return None

__setup_go_live() async

Set up the live-streaming in go_live method.

Source code in models.py
async def __setup_go_live(self):
    """
    Set up the live-streaming in go_live method.
    """
    if self.__streaming_config is None:
        await self.prepare_live(now=True)
    logging.info(str(self))
    StreamingStatus().set_live(self)
    if self.enable_presets:
        default_preset = DefaultConfig().get('streaming_preset')
        await call_preset(default_preset)
        await asyncio.sleep(3)

__setup_prepare_live(now=False)

Compute the parameters of the live-streaming. :param now: If True, the live-streaming is prepared immediately.

:return: The title, the description, the date and the privacy of the live-streaming.

Source code in models.py
def __setup_prepare_live(self, now: bool = False) -> (
        tuple)[str, str, datetime, str]:
    """
    Compute the parameters of the live-streaming.
    :param now: If True, the live-streaming is prepared immediately.

    :return: The title, the description, the date and the privacy of the
    live-streaming.
    """
    live_date = datetime.now(
        timezone.utc) if now else self.__schedule.next_run
    live_date = live_date.astimezone()
    title = self.title.replace('$DAY$', f"{live_date:{self.DATE_FORMAT}}")
    title = title.replace('$TIME$', self.time)
    description = self.description.replace('$DAY$', f"{live_date:{self.DATE_FORMAT}}")
    description = description.replace('$TIME$', self.time)
    privacy = self.privacy
    if privacy not in ('public', 'private', 'unlisted'):
        privacy = 'unlisted'
    return title, description, live_date, privacy

__str__()

Returns the string representation of the LiveEvent.

:return: The string representation of the LiveEvent.

Source code in models.py
def __str__(self) -> str:
    """
    Returns the string representation of the LiveEvent.

    :return: The string representation of the LiveEvent.
    """
    return str(vars(self))

__streaming_in_progress() async staticmethod

Async method that waits for the live-streaming to end.

Source code in models.py
@staticmethod
async def __streaming_in_progress():
    """
    Async method that waits for the live-streaming to end.
    """
    sec = 0
    status = StreamingStatus()
    while sec < status.get_total_time() and not status.should_stop():
        await asyncio.sleep(1)
        sec += 1
        status.set_time_left(sec)

__task_done(task)

Remove the task from the background tasks. :param task: The task to be removed.

Source code in models.py
def __task_done(self, task: asyncio.Task) -> None:
    """
    Remove the task from the background tasks.
    :param task: The task to be removed.
    """
    self.__background_tasks.discard(task)
    StreamingStatus().add_ended_task(task)

__to_task(coro, *args, **kwargs)

Add a coroutine to the background tasks. :param coro: The coroutine to be added. :param args: The arguments to be passed to the coroutine. :param kwargs: The keyword arguments to be passed to the coroutine.

Source code in models.py
def __to_task(self, coro: Callable, *args: Any, **kwargs: Any) -> None:
    """
    Add a coroutine to the background tasks.
    :param coro: The coroutine to be added.
    :param args: The arguments to be passed to the coroutine.
    :param kwargs: The keyword arguments to be passed to the coroutine.
    """
    coro = coro()
    if asyncio.iscoroutine(coro):
        task = asyncio.create_task(coro, *args, **kwargs)
        self.__background_tasks.add(task)
        task.add_done_callback(self.__task_done)

__update_event(event_data=None)

Updates the event data. :param event_data: The new data of the event.

Source code in models.py
def __update_event(self, event_data: dict[str, Any] = None) -> None:
    """
    Updates the event data.
    :param event_data: The new data of the event.
    """
    if event_data is not None:
        for key in event_data:
            setattr(self, key, event_data[key])

__youtube_credentials() staticmethod

Context manager that returns the YouTube object used to create the live broadcast and the live stream. :yield: The YouTube object used to create the live broadcast and the live stream.

Source code in models.py
@staticmethod
@contextmanager
def __youtube_credentials() -> Credentials:
    """
    Context manager that returns the YouTube object used to create the live
    broadcast and the live stream.
    :yield: The YouTube object used to create the live broadcast and the
    live stream.
    """
    credentials = DefaultConfig().youtube_credentials()
    # noinspection PyPackageRequirements
    # pylint: disable=C0415
    from googleapiclient.discovery import build
    youtube = None
    try:
        youtube = build('youtube', 'v3', credentials=credentials,
                        static_discovery=False, num_retries=20,
                        cache=MemoryCache())
        yield youtube
    finally:
        if youtube is not None:
            youtube.close()

get_config(basic=False)

Returns the configuration of the event. :param basic: If True, the basic configuration is returned, otherwise the complete configuration is returned.

:return: The configuration of the event.

Source code in models.py
def get_config(self, basic: bool = False) -> dict[str, Any]:
    """
    Returns the configuration of the event.
    :param basic: If True, the basic configuration is returned, otherwise
    the complete configuration is returned.

    :return: The configuration of the event.
    """
    config = {k: v for k, v in vars(self).items() if not k.startswith('_')}
    if basic:
        default = DefaultConfig().get('youtube_default')
        config = {k: v for k, v in config.items() if v != default.get(k)}
    return config

get_youtube_categories() cached staticmethod

Get the list of YouTube video categories.

:return: The dictionary of YouTube video categories.

Source code in models.py
@staticmethod
@cache
def get_youtube_categories() -> dict[str, str]:
    """
    Get the list of YouTube video categories.

    :return: The dictionary of YouTube video categories.
    """
    while True:
        with LiveEvent.__yt_lock:
            try:
                with LiveEvent.__youtube_credentials() as youtube:
                    response = youtube.videoCategories().list(
                        part="snippet", hl="it", regionCode="IT").execute()
                    log_debug(response)
                    return {item['id']: item['snippet']['title'] for item in
                            response['items']}
            except Exception as ex:  # pylint: disable=broad-except
                logging.error('Errore durante il recupero delle categorie',
                              exc_info=ex)
        sleep(5)

get_youtube_playlists() cached staticmethod

Get the list of YouTube video playlists.

:return: The dictionary of YouTube video playlists.

Source code in models.py
@staticmethod
@cache
def get_youtube_playlists() -> dict[str, str]:
    """
    Get the list of YouTube video playlists.

    :return: The dictionary of YouTube video playlists.
    """
    while True:
        with LiveEvent.__yt_lock:
            try:
                with LiveEvent.__youtube_credentials() as youtube:
                    response = youtube.playlists().list(
                        part="snippet", mine=True, maxResults=50).execute()
                    log_debug(response)
                    return {item['id']: item['snippet']['title'] for item in
                            response['items']}
            except Exception as ex:  # pylint: disable=broad-except
                logging.error('Errore durante il recupero delle playlist',
                              exc_info=ex)
        sleep(5)

go_live() async

Async method that starts the live-streaming.

Source code in models.py
async def go_live(self) -> None:
    """
    Async method that starts the live-streaming.
    """
    if StreamingStatus().is_live():
        return
    logging.info('Live streaming avviata')
    config = DefaultConfig()
    ws = None
    live_done = False
    try:
        await self.__setup_go_live()

        ws = simpleobsws.WebSocketClient(url=config.get('obs_url'),
                                         password=config.get('obs_pwd'))
        await ws.connect()
        await ws.wait_until_identified()
        p = {'inputName': config.get('youtube_default')['source'],
             'mediaAction': 'OBS_WEBSOCKET_MEDIA_INPUT_ACTION_RESTART'}
        await LiveEvent.obs_exec(ws, 'TriggerMediaInputAction', p)
        p = {'streamServiceType': 'rtmp_custom',
             'streamServiceSettings': {
                 'server': self.__streaming_config['rtmp_url'],
                 'key': self.__streaming_config['streaming_key']}
             }
        await LiveEvent.obs_exec(ws, 'SetStreamServiceSettings', p)
        await self.__notify_streaming_start()
        await LiveEvent.obs_exec(ws, 'StartStream')
        await self.__streaming_in_progress()
        await LiveEvent.obs_exec(ws, 'StopStream')
        await self.__notify_streaming_ended()

        live_done = True
    except Exception as ex:  # pylint: disable=broad-except
        logging.error('Errore durante la live', exc_info=ex)
        self.__telegram.send('Errore durante la live. Avviala manualmente!',
                             exception=ex)
    finally:
        if ws is not None:
            await ws.disconnect()
        StreamingStatus().set_ready(self)
        StreamingStatus().set_refresh_program(True)
        if self.enable_presets:
            await call_preset(config.get('offline_preset'))

    # Cancello la chiave di stream
    try:
        if live_done and self.__streaming_config['input_stream_id']:
            await asyncio.sleep(120)
            with self.__youtube_credentials() as youtube:
                self.__delete_key(youtube)
                logging.info('Streaming key cancellata correttamente')
    except Exception as ex:  # pylint: disable=broad-except
        logging.error('Impossibile cancellare la chiave di stream',
                      exc_info=ex)
    self.__streaming_config = None

obs_exec(ws, method, params=None) async staticmethod

Execute the given command with the given parameters on OBS. :param ws: The websocket client to use. :param method: The command to call. :param params: The parameters of the command.

:return: The response of the command.

Source code in models.py
@staticmethod
async def obs_exec(ws: simpleobsws.WebSocketClient, method: str,
                   params: dict = None) -> dict | None:
    """
    Execute the given command with the given parameters on OBS.
    :param ws: The websocket client to use.
    :param method: The command to call.
    :param params: The parameters of the command.

    :return: The response of the command.
    """
    request = simpleobsws.Request(method, {} if params is None else params)
    ret = await ws.call(request)
    if ret.ok():
        return ret.responseData
    msg = f"Errore comando su OBS\nREQUEST: {request}\nRESPONSE: {ret}"
    logging.error(msg)
    return None

prepare_live(now=False) async

Prepares the live-streaming by setting up the streaming configuration. :param now: If True, the live-streaming is prepared immediately.

:raise PrepareLiveException: If the live-streaming configuration is not valid.

Source code in models.py
async def prepare_live(self, now: bool = False) -> None:
    """
    Prepares the live-streaming by setting up the streaming configuration.
    :param now: If True, the live-streaming is prepared immediately.

    :raise PrepareLiveException: If the live-streaming configuration is not
    valid.
    """
    if StreamingStatus().is_prepared():
        return
    logging.info('Preparazione della trasmissione programmata')
    StreamingStatus().set_prepared(self)

    try:
        with self.__youtube_credentials() as yt:
            title, desc, live_dt, privacy = self.__setup_prepare_live(now)
            StreamingStatus().set_title(title)
            StreamingStatus().set_description(desc)

            self.__streaming_config = {}

            s_id = await self.__create_live_broadcast(live_dt, privacy, yt)
            self.__streaming_config['output_stream_id'] = s_id

            l_id, s_key, rtmp_url = await self.__create_live_stream(yt)
            self.__streaming_config['input_stream_id'] = l_id
            self.__streaming_config['streaming_key'] = s_key
            self.__streaming_config['rtmp_url'] = rtmp_url
            logging.info(self.__streaming_config)
            if None in self.__streaming_config.values():
                msg = ('Youtube ha restituito una configurazione non '
                       'valida per la prossima Live')
                raise PrepareLiveException(msg)

            await self.__bind_streams(l_id, s_id, yt)
            await self.__configure_video(title, desc, live_dt, yt)
            if privacy == 'public' and self.playlist != '':
                await self.__add_to_playlist(self.playlist, yt)
    except Exception as ex:  # pylint: disable=broad-except
        msg = ("Preparazione dell'evento fallita.\n"
               f"Config: {self.__streaming_config}")
        logging.error(msg, exc_info=ex)
        msg = "Preparazione dell'evento fallita"
        self.__telegram.send(msg, exception=ex)
        self.__streaming_config = None
        StreamingStatus().set_ready(self)
    await self.__open_obs()

remove_schedule()

Removes the schedule of the event.

:return: True if the schedule has been removed, False otherwise.

Source code in models.py
def remove_schedule(self) -> bool:
    """
    Removes the schedule of the event.

    :return: True if the schedule has been removed, False otherwise.
    """
    if self.__streaming_config is None:
        if self.__schedule is not None:
            cancel_job(self.__schedule)
        if self.__schedule_prepare is not None:
            cancel_job(self.__schedule_prepare)
        return True
    return False

reschedule(update=None)

Reschedules the event. :param update: The new data of the event.

:return: True if the event has been rescheduled, False otherwise.

Source code in models.py
def reschedule(self, update: dict[str, Any] = None) -> bool:
    """
    Reschedules the event.
    :param update: The new data of the event.

    :return: True if the event has been rescheduled, False otherwise.
    """
    if not self.remove_schedule():
        return False
    self.__update_event(update)
    self.__streaming_config = None
    self.time = self.__normalize_time(self.time)
    self.__configure_schedule()
    return True

schedule_every(day, anticipate=False) staticmethod

Returns the schedule object associated with the given day. :param day: The day of the week to be scheduled (i.e. 'lu', 'ma', 'me', 'gi', 've', 'sa', 'do'). :param anticipate: If True, the schedule is set to the day before the given day.

:raise ScheduleException: If the given day is not valid.

Source code in models.py
@staticmethod
def schedule_every(day: str, anticipate: bool = False) -> Job:
    """
    Returns the schedule object associated with the given day.
    :param day: The day of the week to be scheduled (i.e. 'lu', 'ma', 'me',
    'gi', 've', 'sa', 'do').
    :param anticipate: If True, the schedule is set to the day before the
    given day.

    :raise ScheduleException: If the given day is not valid.
    """
    day = day.lower()
    days = {'lu': every().monday, 'ma': every().tuesday,
            'me': every().wednesday, 'gi': every().thursday,
            've': every().friday, 'sa': every().saturday,
            'do': every().sunday}
    if day in days:
        keys = list(days.keys())
        if anticipate:
            return days.get(keys[keys.index(day) - 1])
        return days.get(day)
    raise ScheduleException(f"Giorno '{day}' non valido")

MemoryCache

Bases: Cache

Simple in-memory cache keyed by URL.

Source code in models.py
@singleton
class MemoryCache(Cache):
    """
    Simple in-memory cache keyed by URL.
    """

    def __init__(self) -> None:
        """
        Initialize the in-memory cache storage.
        """
        super().__init__()
        self.__items: dict[str, Any] = {}

    def get(self, url: str) -> Any | None:
        """
        Return the cached content for the given URL, if present.
        :param url: The url of the content.

        :return: The content associated with the given url.
        """
        return self.__items.get(url)

    def set(self, url: str, content: Any) -> None:
        """
        Store content in the cache for the given URL.
        :param url: The url of the content.
        :param content: The content to be set.
        """
        self.__items[url] = content

__init__()

Initialize the in-memory cache storage.

Source code in models.py
def __init__(self) -> None:
    """
    Initialize the in-memory cache storage.
    """
    super().__init__()
    self.__items: dict[str, Any] = {}

get(url)

Return the cached content for the given URL, if present. :param url: The url of the content.

:return: The content associated with the given url.

Source code in models.py
def get(self, url: str) -> Any | None:
    """
    Return the cached content for the given URL, if present.
    :param url: The url of the content.

    :return: The content associated with the given url.
    """
    return self.__items.get(url)

set(url, content)

Store content in the cache for the given URL. :param url: The url of the content. :param content: The content to be set.

Source code in models.py
def set(self, url: str, content: Any) -> None:
    """
    Store content in the cache for the given URL.
    :param url: The url of the content.
    :param content: The content to be set.
    """
    self.__items[url] = content

StreamingStatus

Class that stores the status of the live-streaming.

Source code in models.py
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
@singleton
class StreamingStatus:
    """
    Class that stores the status of the live-streaming.
    """

    STATUS_READY = 'ready'
    STATUS_PREPARED = 'prepared'
    STATUS_LIVE = 'live'

    def __init__(self):
        """
        Constructor of the live-streaming status class.
        """
        self.__gui_status_lock: RLock = RLock()
        self.__updated: bool = True
        self.__live_status: str = self.STATUS_READY
        self.owner: LiveEvent | None = None
        config = DefaultConfig()
        self.__title: str = config.get('youtube_default')['title']
        self.__description: str = config.get('youtube_default')['description']
        self.__time_left: int = 0
        self.__bonus_time: int = 0
        self.__stop_live_now: bool = False
        self.__obs_live_view: bool = False
        self.__refresh_program: bool = False
        self.__ended_tasks: list[asyncio.Task] = []
        self.__updates_callback: list[Callable[[], None]] = []

    def add_update_callback(self, callback: Callable[[], None]) -> None:
        """
        Add a callback to be called when an update is dispatched.
        :param callback: The callback to be added.
        """
        if not callable(callback):
            raise TypeError('callback must be callable')

        with self.__gui_status_lock:
            self.__updates_callback.append(callback)

    def remove_update_callback(self, callback: Callable[[], None]) -> None:
        """
        Remove a callback previously added.
        :param callback: The callback to be removed.
        """
        with self.__gui_status_lock:
            if callback in self.__updates_callback:
                self.__updates_callback.remove(callback)

    def __set_updated(self) -> None:
        """
        Set the status as updated.
        """
        with self.__gui_status_lock:
            self.__updated = True
            callbacks = list(self.__updates_callback)

        for callback in callbacks:
            try:
                callback()
            except Exception as ex:  # pylint: disable=broad-except
                msg = 'Errore nella callback di aggiornamento dello stato'
                logging.error(msg, exc_info=True)

    def dispatch_updates(self) -> bool:
        """
        Dispatches the updates to the GUI.

        :return: True if an update has been dispatched, False otherwise.
        """
        with self.__gui_status_lock:
            if self.__updated:
                self.__updated = False
                return True
            return False

    def is_ready(self) -> bool:
        """
        Tells if a live-streaming could be prepared.

        :return: True if a live-streaming could be prepared, False otherwise.
        """
        with self.__gui_status_lock:
            return self.__live_status == self.STATUS_READY

    def is_prepared(self) -> bool:
        """
        Tells if the live-streaming is prepared or not.

        :return: True if the live-streaming is prepared, False otherwise.
        """
        with self.__gui_status_lock:
            return self.__live_status == self.STATUS_PREPARED

    def is_live(self) -> bool:
        """
        Tells if the live-streaming is live or not.

        :return: True if the live-streaming is live, False otherwise.
        """
        with self.__gui_status_lock:
            return self.__live_status == self.STATUS_LIVE

    def is_busy(self) -> bool:
        """
        Tells if the live-streaming is busy or not (i.e. live or prepared).

        :return: True if the live-streaming is busy, False otherwise.
        """
        with self.__gui_status_lock:
            return self.__live_status in (self.STATUS_PREPARED, self.STATUS_LIVE)

    def set_ready(self, owner: LiveEvent) -> None:
        """
        Sets the live-streaming to ready state.
        :param owner: The LiveEvent owner of the live-streaming.

        :raise StreamingStatusException: If the owner of the
        live-streaming is not the same as the LiveEvent passed as parameter.
        """
        youtube_default = DefaultConfig().get('youtube_default') or {}
        with self.__gui_status_lock:
            if owner is not self.owner:
                raise StreamingStatusException('Evento non proprietario cerca '
                                               'di settare lo stato a READY')
            self.__live_status = self.STATUS_READY
            self.owner = None
            self.__title = youtube_default.get('title', '')
            self.__description = youtube_default.get('description', '')
            self.__bonus_time = 0
            self.__time_left = 0
            self.__stop_live_now = False
            self.__obs_live_view = False
        self.__set_updated()

    def set_prepared(self, owner: LiveEvent | None) -> None:
        """
        Sets the live-streaming to prepared state.
        :param owner: The LiveEvent owner of the live-streaming.

        :raise StreamingStatusException: If the owner of the
        live-streaming is not the same as the LiveEvent passed as parameter.
        """
        with self.__gui_status_lock:
            if self.__live_status != self.STATUS_READY or owner is None:
                raise StreamingStatusException('Live non settata a PREPARED '
                                               'perché già acquisita da altro '
                                               'proprietario')
            self.__live_status = self.STATUS_PREPARED
            self.owner = owner
            self.__bonus_time = 0
            self.__time_left = 0
            self.__obs_live_view = True
        self.__set_updated()

    def set_live(self, owner: LiveEvent | None) -> None:
        """
        Sets the live-streaming to live state.
        :param owner: The LiveEvent owner of the live-streaming.

        :raise StreamingStatusException: If the owner of the
        live-streaming is not the same as the LiveEvent passed as parameter.
        """
        with self.__gui_status_lock:
            if self.__live_status == self.STATUS_READY and owner is not None or \
                    self.__live_status == self.STATUS_PREPARED and owner is self.owner:
                self.__live_status = self.STATUS_LIVE
                self.owner = owner
                self.__bonus_time = 0
                self.__time_left = self.get_total_time()
                self.__obs_live_view = True
            else:
                msg = ('Live non settata a LIVE perché già acquisita da '
                       'altro proprietario')
                raise StreamingStatusException(msg)
        self.__set_updated()

    def get_live_status(self) -> str:
        """
        Gets the live-streaming status.

        :return: The live-streaming status.
        """
        with self.__gui_status_lock:
            return self.__live_status

    def switch_obs_live_view(self, status: bool | None = None) -> None:
        """
        Switches the OBS live-view status.
        :param status: The status of the OBS live-view.
        """
        with self.__gui_status_lock:
            if status is not None:
                self.__obs_live_view = status
            else:
                self.__obs_live_view = not self.__obs_live_view
        self.__set_updated()

    def get_obs_live_view(self) -> bool:
        """
        Gets the OBS live-view status.

        :return: True if the OBS live-view has to be shown, False otherwise.
        """
        with self.__gui_status_lock:
            return self.__obs_live_view

    def set_title(self, title: str) -> None:
        """
        Sets the title of the live-streaming.
        :param title: The title of the live-streaming.
        """
        with self.__gui_status_lock:
            if title is not None:
                self.__title = title
        self.__set_updated()

    def get_title(self) -> str:
        """
        Gets the title of the live-streaming.

        :return: The title of the live-streaming.
        """
        with self.__gui_status_lock:
            return self.__title

    def set_description(self, description: str) -> None:
        """
        Sets the description of the live-streaming.
        :param description: The description of the live-streaming.
        """
        with self.__gui_status_lock:
            if description is not None:
                self.__description = description
        self.__set_updated()

    def get_description(self) -> str:
        """
        Gets the description of the live-streaming.

        :return: The description of the live-streaming.
        """
        with self.__gui_status_lock:
            return self.__description

    def increase_bonus_time(self, amount: int = 10) -> None:
        """
        Increases the bonus time of the live-streaming.
        :param amount: The amount of minutes to increase the bonus time.
        """
        with self.__gui_status_lock:
            if self.__live_status in [self.STATUS_PREPARED, self.STATUS_LIVE] and amount > 0:
                self.__bonus_time += amount
                msg = f"Trasmissione incrementata di {self.__bonus_time} minuti"
                logging.info(msg)
        self.__set_updated()

    def get_bonus_time(self) -> int:
        """
        Gets the bonus time of the live-streaming.

        :return: The bonus time of the live-streaming in minutes.
        """
        with self.__gui_status_lock:
            return self.__bonus_time

    def get_total_time(self) -> int:
        """
        Gets the total time of the live-streaming (duration + bonus time).

        :return: The total time of the live-streaming in seconds.
        """
        with self.__gui_status_lock:
            if self.owner is not None:
                return self.owner.duration * 60 + self.__bonus_time * 60
            return 0

    def set_time_left(self, elapsed_time: int) -> None:
        """
        Sets the time left of the live-streaming (total time - elapsed time).
        :param elapsed_time: The elapsed time in seconds.
        """
        with self.__gui_status_lock:
            self.__time_left = self.get_total_time() - elapsed_time
            self.__time_left = max(self.__time_left, 0)
        self.__set_updated()

    def get_time_left(self) -> int:
        """
        Gets the time left for the live-streaming to end.

        :return: The time left for the live-streaming to end in seconds.
        """
        with self.__gui_status_lock:
            return self.__time_left

    def stop_now(self) -> None:
        """
        Stops the live-streaming immediately.
        """
        with self.__gui_status_lock:
            if self.__live_status == self.STATUS_LIVE:
                self.__stop_live_now = True
        self.__set_updated()

    def should_stop(self) -> bool:
        """
        Tells if the live-streaming should be stopped immediately.

        :return: True if the live-streaming should be stopped immediately, False
        otherwise.
        """
        with self.__gui_status_lock:
            return self.__stop_live_now

    def get_owner(self) -> LiveEvent:
        """
        Gets the owner of the live-streaming.

        :return: The LiveEvent owner of the live-streaming.
        """
        with self.__gui_status_lock:
            return self.owner

    def set_refresh_program(self, status: bool = False) -> None:
        """
        Sets the refresh program status.
        :param status: The status of the refresh program.
        """
        with self.__gui_status_lock:
            self.__refresh_program = status
        self.__set_updated()

    def should_refresh_program(self) -> bool:
        """
        Tells if the program has to be refreshed.

        :return: True if the program has to be refreshed, False otherwise.
        """
        with self.__gui_status_lock:
            return self.__refresh_program

    def add_ended_task(self, task: asyncio.Task) -> None:
        """
        Adds a coroutine to the list of ended tasks.
        :param task: The coroutine to be added.
        """
        with self.__gui_status_lock:
            self.__ended_tasks.append(task)

    def get_ended_task(self) -> asyncio.Task | None:
        """
        Gets the first coroutine in the list of ended tasks.

        :return: The first coroutine in the list of ended tasks.
        """
        with self.__gui_status_lock:
            if self.__ended_tasks:
                return self.__ended_tasks.pop(0)
            return None

__init__()

Constructor of the live-streaming status class.

Source code in models.py
def __init__(self):
    """
    Constructor of the live-streaming status class.
    """
    self.__gui_status_lock: RLock = RLock()
    self.__updated: bool = True
    self.__live_status: str = self.STATUS_READY
    self.owner: LiveEvent | None = None
    config = DefaultConfig()
    self.__title: str = config.get('youtube_default')['title']
    self.__description: str = config.get('youtube_default')['description']
    self.__time_left: int = 0
    self.__bonus_time: int = 0
    self.__stop_live_now: bool = False
    self.__obs_live_view: bool = False
    self.__refresh_program: bool = False
    self.__ended_tasks: list[asyncio.Task] = []
    self.__updates_callback: list[Callable[[], None]] = []

__set_updated()

Set the status as updated.

Source code in models.py
def __set_updated(self) -> None:
    """
    Set the status as updated.
    """
    with self.__gui_status_lock:
        self.__updated = True
        callbacks = list(self.__updates_callback)

    for callback in callbacks:
        try:
            callback()
        except Exception as ex:  # pylint: disable=broad-except
            msg = 'Errore nella callback di aggiornamento dello stato'
            logging.error(msg, exc_info=True)

add_ended_task(task)

Adds a coroutine to the list of ended tasks. :param task: The coroutine to be added.

Source code in models.py
def add_ended_task(self, task: asyncio.Task) -> None:
    """
    Adds a coroutine to the list of ended tasks.
    :param task: The coroutine to be added.
    """
    with self.__gui_status_lock:
        self.__ended_tasks.append(task)

add_update_callback(callback)

Add a callback to be called when an update is dispatched. :param callback: The callback to be added.

Source code in models.py
def add_update_callback(self, callback: Callable[[], None]) -> None:
    """
    Add a callback to be called when an update is dispatched.
    :param callback: The callback to be added.
    """
    if not callable(callback):
        raise TypeError('callback must be callable')

    with self.__gui_status_lock:
        self.__updates_callback.append(callback)

dispatch_updates()

Dispatches the updates to the GUI.

:return: True if an update has been dispatched, False otherwise.

Source code in models.py
def dispatch_updates(self) -> bool:
    """
    Dispatches the updates to the GUI.

    :return: True if an update has been dispatched, False otherwise.
    """
    with self.__gui_status_lock:
        if self.__updated:
            self.__updated = False
            return True
        return False

get_bonus_time()

Gets the bonus time of the live-streaming.

:return: The bonus time of the live-streaming in minutes.

Source code in models.py
def get_bonus_time(self) -> int:
    """
    Gets the bonus time of the live-streaming.

    :return: The bonus time of the live-streaming in minutes.
    """
    with self.__gui_status_lock:
        return self.__bonus_time

get_description()

Gets the description of the live-streaming.

:return: The description of the live-streaming.

Source code in models.py
def get_description(self) -> str:
    """
    Gets the description of the live-streaming.

    :return: The description of the live-streaming.
    """
    with self.__gui_status_lock:
        return self.__description

get_ended_task()

Gets the first coroutine in the list of ended tasks.

:return: The first coroutine in the list of ended tasks.

Source code in models.py
def get_ended_task(self) -> asyncio.Task | None:
    """
    Gets the first coroutine in the list of ended tasks.

    :return: The first coroutine in the list of ended tasks.
    """
    with self.__gui_status_lock:
        if self.__ended_tasks:
            return self.__ended_tasks.pop(0)
        return None

get_live_status()

Gets the live-streaming status.

:return: The live-streaming status.

Source code in models.py
def get_live_status(self) -> str:
    """
    Gets the live-streaming status.

    :return: The live-streaming status.
    """
    with self.__gui_status_lock:
        return self.__live_status

get_obs_live_view()

Gets the OBS live-view status.

:return: True if the OBS live-view has to be shown, False otherwise.

Source code in models.py
def get_obs_live_view(self) -> bool:
    """
    Gets the OBS live-view status.

    :return: True if the OBS live-view has to be shown, False otherwise.
    """
    with self.__gui_status_lock:
        return self.__obs_live_view

get_owner()

Gets the owner of the live-streaming.

:return: The LiveEvent owner of the live-streaming.

Source code in models.py
def get_owner(self) -> LiveEvent:
    """
    Gets the owner of the live-streaming.

    :return: The LiveEvent owner of the live-streaming.
    """
    with self.__gui_status_lock:
        return self.owner

get_time_left()

Gets the time left for the live-streaming to end.

:return: The time left for the live-streaming to end in seconds.

Source code in models.py
def get_time_left(self) -> int:
    """
    Gets the time left for the live-streaming to end.

    :return: The time left for the live-streaming to end in seconds.
    """
    with self.__gui_status_lock:
        return self.__time_left

get_title()

Gets the title of the live-streaming.

:return: The title of the live-streaming.

Source code in models.py
def get_title(self) -> str:
    """
    Gets the title of the live-streaming.

    :return: The title of the live-streaming.
    """
    with self.__gui_status_lock:
        return self.__title

get_total_time()

Gets the total time of the live-streaming (duration + bonus time).

:return: The total time of the live-streaming in seconds.

Source code in models.py
def get_total_time(self) -> int:
    """
    Gets the total time of the live-streaming (duration + bonus time).

    :return: The total time of the live-streaming in seconds.
    """
    with self.__gui_status_lock:
        if self.owner is not None:
            return self.owner.duration * 60 + self.__bonus_time * 60
        return 0

increase_bonus_time(amount=10)

Increases the bonus time of the live-streaming. :param amount: The amount of minutes to increase the bonus time.

Source code in models.py
def increase_bonus_time(self, amount: int = 10) -> None:
    """
    Increases the bonus time of the live-streaming.
    :param amount: The amount of minutes to increase the bonus time.
    """
    with self.__gui_status_lock:
        if self.__live_status in [self.STATUS_PREPARED, self.STATUS_LIVE] and amount > 0:
            self.__bonus_time += amount
            msg = f"Trasmissione incrementata di {self.__bonus_time} minuti"
            logging.info(msg)
    self.__set_updated()

is_busy()

Tells if the live-streaming is busy or not (i.e. live or prepared).

:return: True if the live-streaming is busy, False otherwise.

Source code in models.py
def is_busy(self) -> bool:
    """
    Tells if the live-streaming is busy or not (i.e. live or prepared).

    :return: True if the live-streaming is busy, False otherwise.
    """
    with self.__gui_status_lock:
        return self.__live_status in (self.STATUS_PREPARED, self.STATUS_LIVE)

is_live()

Tells if the live-streaming is live or not.

:return: True if the live-streaming is live, False otherwise.

Source code in models.py
def is_live(self) -> bool:
    """
    Tells if the live-streaming is live or not.

    :return: True if the live-streaming is live, False otherwise.
    """
    with self.__gui_status_lock:
        return self.__live_status == self.STATUS_LIVE

is_prepared()

Tells if the live-streaming is prepared or not.

:return: True if the live-streaming is prepared, False otherwise.

Source code in models.py
def is_prepared(self) -> bool:
    """
    Tells if the live-streaming is prepared or not.

    :return: True if the live-streaming is prepared, False otherwise.
    """
    with self.__gui_status_lock:
        return self.__live_status == self.STATUS_PREPARED

is_ready()

Tells if a live-streaming could be prepared.

:return: True if a live-streaming could be prepared, False otherwise.

Source code in models.py
def is_ready(self) -> bool:
    """
    Tells if a live-streaming could be prepared.

    :return: True if a live-streaming could be prepared, False otherwise.
    """
    with self.__gui_status_lock:
        return self.__live_status == self.STATUS_READY

remove_update_callback(callback)

Remove a callback previously added. :param callback: The callback to be removed.

Source code in models.py
def remove_update_callback(self, callback: Callable[[], None]) -> None:
    """
    Remove a callback previously added.
    :param callback: The callback to be removed.
    """
    with self.__gui_status_lock:
        if callback in self.__updates_callback:
            self.__updates_callback.remove(callback)

set_description(description)

Sets the description of the live-streaming. :param description: The description of the live-streaming.

Source code in models.py
def set_description(self, description: str) -> None:
    """
    Sets the description of the live-streaming.
    :param description: The description of the live-streaming.
    """
    with self.__gui_status_lock:
        if description is not None:
            self.__description = description
    self.__set_updated()

set_live(owner)

Sets the live-streaming to live state. :param owner: The LiveEvent owner of the live-streaming.

:raise StreamingStatusException: If the owner of the live-streaming is not the same as the LiveEvent passed as parameter.

Source code in models.py
def set_live(self, owner: LiveEvent | None) -> None:
    """
    Sets the live-streaming to live state.
    :param owner: The LiveEvent owner of the live-streaming.

    :raise StreamingStatusException: If the owner of the
    live-streaming is not the same as the LiveEvent passed as parameter.
    """
    with self.__gui_status_lock:
        if self.__live_status == self.STATUS_READY and owner is not None or \
                self.__live_status == self.STATUS_PREPARED and owner is self.owner:
            self.__live_status = self.STATUS_LIVE
            self.owner = owner
            self.__bonus_time = 0
            self.__time_left = self.get_total_time()
            self.__obs_live_view = True
        else:
            msg = ('Live non settata a LIVE perché già acquisita da '
                   'altro proprietario')
            raise StreamingStatusException(msg)
    self.__set_updated()

set_prepared(owner)

Sets the live-streaming to prepared state. :param owner: The LiveEvent owner of the live-streaming.

:raise StreamingStatusException: If the owner of the live-streaming is not the same as the LiveEvent passed as parameter.

Source code in models.py
def set_prepared(self, owner: LiveEvent | None) -> None:
    """
    Sets the live-streaming to prepared state.
    :param owner: The LiveEvent owner of the live-streaming.

    :raise StreamingStatusException: If the owner of the
    live-streaming is not the same as the LiveEvent passed as parameter.
    """
    with self.__gui_status_lock:
        if self.__live_status != self.STATUS_READY or owner is None:
            raise StreamingStatusException('Live non settata a PREPARED '
                                           'perché già acquisita da altro '
                                           'proprietario')
        self.__live_status = self.STATUS_PREPARED
        self.owner = owner
        self.__bonus_time = 0
        self.__time_left = 0
        self.__obs_live_view = True
    self.__set_updated()

set_ready(owner)

Sets the live-streaming to ready state. :param owner: The LiveEvent owner of the live-streaming.

:raise StreamingStatusException: If the owner of the live-streaming is not the same as the LiveEvent passed as parameter.

Source code in models.py
def set_ready(self, owner: LiveEvent) -> None:
    """
    Sets the live-streaming to ready state.
    :param owner: The LiveEvent owner of the live-streaming.

    :raise StreamingStatusException: If the owner of the
    live-streaming is not the same as the LiveEvent passed as parameter.
    """
    youtube_default = DefaultConfig().get('youtube_default') or {}
    with self.__gui_status_lock:
        if owner is not self.owner:
            raise StreamingStatusException('Evento non proprietario cerca '
                                           'di settare lo stato a READY')
        self.__live_status = self.STATUS_READY
        self.owner = None
        self.__title = youtube_default.get('title', '')
        self.__description = youtube_default.get('description', '')
        self.__bonus_time = 0
        self.__time_left = 0
        self.__stop_live_now = False
        self.__obs_live_view = False
    self.__set_updated()

set_refresh_program(status=False)

Sets the refresh program status. :param status: The status of the refresh program.

Source code in models.py
def set_refresh_program(self, status: bool = False) -> None:
    """
    Sets the refresh program status.
    :param status: The status of the refresh program.
    """
    with self.__gui_status_lock:
        self.__refresh_program = status
    self.__set_updated()

set_time_left(elapsed_time)

Sets the time left of the live-streaming (total time - elapsed time). :param elapsed_time: The elapsed time in seconds.

Source code in models.py
def set_time_left(self, elapsed_time: int) -> None:
    """
    Sets the time left of the live-streaming (total time - elapsed time).
    :param elapsed_time: The elapsed time in seconds.
    """
    with self.__gui_status_lock:
        self.__time_left = self.get_total_time() - elapsed_time
        self.__time_left = max(self.__time_left, 0)
    self.__set_updated()

set_title(title)

Sets the title of the live-streaming. :param title: The title of the live-streaming.

Source code in models.py
def set_title(self, title: str) -> None:
    """
    Sets the title of the live-streaming.
    :param title: The title of the live-streaming.
    """
    with self.__gui_status_lock:
        if title is not None:
            self.__title = title
    self.__set_updated()

should_refresh_program()

Tells if the program has to be refreshed.

:return: True if the program has to be refreshed, False otherwise.

Source code in models.py
def should_refresh_program(self) -> bool:
    """
    Tells if the program has to be refreshed.

    :return: True if the program has to be refreshed, False otherwise.
    """
    with self.__gui_status_lock:
        return self.__refresh_program

should_stop()

Tells if the live-streaming should be stopped immediately.

:return: True if the live-streaming should be stopped immediately, False otherwise.

Source code in models.py
def should_stop(self) -> bool:
    """
    Tells if the live-streaming should be stopped immediately.

    :return: True if the live-streaming should be stopped immediately, False
    otherwise.
    """
    with self.__gui_status_lock:
        return self.__stop_live_now

stop_now()

Stops the live-streaming immediately.

Source code in models.py
def stop_now(self) -> None:
    """
    Stops the live-streaming immediately.
    """
    with self.__gui_status_lock:
        if self.__live_status == self.STATUS_LIVE:
            self.__stop_live_now = True
    self.__set_updated()

switch_obs_live_view(status=None)

Switches the OBS live-view status. :param status: The status of the OBS live-view.

Source code in models.py
def switch_obs_live_view(self, status: bool | None = None) -> None:
    """
    Switches the OBS live-view status.
    :param status: The status of the OBS live-view.
    """
    with self.__gui_status_lock:
        if status is not None:
            self.__obs_live_view = status
        else:
            self.__obs_live_view = not self.__obs_live_view
    self.__set_updated()

call_preset(preset=None) async

Call the given preset on the camera. :param preset: The preset name to call.

Source code in models.py
async def call_preset(preset: str = None) -> None:
    """
    Call the given preset on the camera.
    :param preset: The preset name to call.
    """
    if preset is None:
        return

    logging.info("Calling preset: %s", preset)
    try:
        preset_id = str(DefaultConfig().get('presets')[preset])
        url = DefaultConfig().get('preset_url').replace('{#}', preset_id)
        user, pwd = DefaultConfig().get('camera_auth').split(':', maxsplit=1)
        async with AsyncClient() as client:
            await client.get(url, auth=(user, pwd), timeout=2.0)
        logging.info("Preset della telecamera attivato")
    except KeyError:
        logging.error("Non trovo un preset che si chiami: %s", preset)
    except Exception as ex:  # pylint: disable=broad-except
        logging.error("Preset non attivato", exc_info=ex)