-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
1656 lines (1296 loc) · 58.6 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Licence: Attribution-NonCommercial-ShareAlike 4.0 International (CC BY-NC-SA 4.0) Written by ZapWizard (Joshua Driggs)
#!/usr/bin/python3
import atexit
import configparser
import glob
import os
import random
import sys
import time
from collections import deque
from collections import defaultdict
import datetime
import schedule
import ast
import json
import subprocess
import concurrent.futures
import signal
import mutagen
import serial
from copy import deepcopy
from subprocess import call
import re
import setup
setup.initialize()
import pygame
import settings
#Logging
import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
logger.debug("Debug message")
logger.info("Informational message")
logger.error("Error message")
# Custom signal handler for SIGINT
def signal_handler(sig, frame):
print("CTRL + C pressed. Exiting gracefully...")
exit_script() # Call cleanup function
sys.exit(0) # Exit the script
# Register the signal handler
signal.signal(signal.SIGINT, signal_handler)
# Variables
clock = pygame.time.Clock()
volume = float(0.05)
static_volume = float(0.008)
volume_prev = float(0.0)
station_num = int(0)
station_list = []
stations = []
total_station_num = int(0)
radio_band = int(0)
radio_band_list = []
radio_band_total = int(0)
active_station = None
led_state = True
on_off_state = False
prev_motor_angle = 0
motor_angle = 0
tuning_volume = 0
tuning_seperation = 5
tuning_locked = False
tuning_prev_angle = None
ADC_0_Prev_Values = []
ADC_2_Prev_Values = []
ADC_2_Prev_Value = 0
motor_angle_prev = settings.MOTOR_SETTINGS["min_angle"]
heartbeat_time = 0
pico_state = False
pico_heartbeat_time = 0
uart = None
exit_requested = False # Add a flag to indicate whether the script should exit
static_playback_active = False
large_static_files = None
static_sounds = None
static_via_music = False
# Time related
midnight_str = time.strftime( "%m/%d/%Y" ) + " 00:00:00"
midnight = int( time.mktime( time.strptime( midnight_str, "%m/%d/%Y %H:%M:%S" ) ) )
master_start_time = midnight
print("Startup: Time:", time.strftime( "%m/%d/%Y %H:%M:%S"))
print("Startup: Midnight:", str(datetime.timedelta(seconds=midnight)))
# Sound related
try:
print("Startup: Starting sound initialization")
pygame.mixer.quit()
pygame.mixer.init(frequency=44100, size=-16, channels=2, buffer=8192)
if not pygame.mixer.get_init():
print("Error: Sound failed to initialize")
except Exception as e:
sys.exit("Error: Sound setup failed" + str(e))
#defualt structure:
def get_default_station_data(path='', sub_folder_name='', band_name=''):
return {
"path": path,
"station_name": sub_folder_name,
"folder_name": band_name,
"station_files": [],
"station_ordered": False,
"station_lengths": [],
"total_length": 0,
"station_start": 0
}
def extract_number(filename):
# Use regex to match numbers anywhere in the filename, including at the end or in parentheses
match = re.findall(r'(\d+)', os.path.basename(filename))
if match:
# Convert all matched numbers to integers and return them in order
return tuple(int(num) for num in match)
else:
# Return a large tuple to ensure unexpected formats are sorted last
return (float('inf'),)
print("Startup: Starting pygame initialization")
os.environ["SDL_VIDEODRIVER"] = "dummy" # Make a fake screen
os.environ['SDL_AUDIODRIVER'] = 'alsa'
pygame.display.set_mode((1, 1))
pygame.init()
# Reserve a channels
pygame.mixer.set_reserved(1)
pygame.mixer.set_reserved(2)
pygame.mixer.set_reserved(3)
pygame.mixer.set_reserved(4)
pygame.mixer.Channel(1)
pygame.mixer.Channel(2)
pygame.mixer.Channel(3)
pygame.mixer.Channel(4)
snd_off = pygame.mixer.Sound(settings.SOUND_EFFECTS["off"])
snd_on = pygame.mixer.Sound(settings.SOUND_EFFECTS["on"])
snd_band = pygame.mixer.Sound(settings.SOUND_EFFECTS["band_change"])
snd_error = pygame.mixer.Sound(settings.SOUND_EFFECTS["error"])
def set_motor(angle, manual = False): # Tell the motor controller what angle to set, used for manual tuning
global motor_angle, tuning_locked
motor_angle = angle
if manual:
send_uart("M", motor_angle)
#print("DEBUG: set_motor, motor_angle=",motor_angle)
def load_saved_settings():
saved_ini = configparser.ConfigParser()
# Load saved settings:
try:
assert os.path.exists(settings.SAVE_FILE)
saved_ini.read(settings.SAVE_FILE, encoding=None)
except Exception as error:
print("Warning: While reading the following:", str(error))
try:
saved_volume = float(saved_ini.get('audio', 'volume'))
print("Info: Loaded saved volume:", saved_volume)
except (Exception, ValueError) as error:
saved_volume = 0.05 # Default volume
print(f"Warning: Could not read volume setting: {error}")
try:
saved_band_num = int(saved_ini.get('audio', 'band'))
print("Info: Loaded saved band:", saved_band_num)
except (Exception, ValueError) as error:
saved_band_num = 0 # Default band
print(f"Warning: Could not read radio band setting: {error}")
try:
saved_station_num = int(saved_ini.get('audio', 'station'))
print("Info: Loaded saved station:", saved_station_num)
except (Exception, ValueError) as error:
saved_station_num = 0 # Default station
print(f"Warning: Could not read station setting: {error}")
return saved_volume, saved_station_num, saved_band_num
def map_range(x, in_min, in_max, out_min, out_max):
if in_max <= 0 or in_max == in_min:
in_max = in_min + 0.001
if out_min <= out_max:
return max(min((x-in_min) * (out_max - out_min) / (in_max-in_min) + out_min, out_max), out_min)
else:
return min(max((x-in_min) * (out_max - out_min) / (in_max-in_min) + out_min, out_max), out_min)
def blink_led():
global led_state
if not settings.DISABLE_HEARTBEAT_LED:
if led_state:
os.system('echo 0 | sudo dd status=none of=/sys/class/leds/led0/brightness') # led on
led_state = False
else:
os.system('echo 1 | sudo dd status=none of=/sys/class/leds/led0/brightness') # led off
led_state = True
def set_volume_level(volume_level, direction=None):
"""
Adjust the volume level, taking into account whether music or static is playing.
"""
global volume, volume_prev, static_playback_active, static_volume
# Avoid unnecessary recalculations if the volume level hasn't changed
if volume_level == volume_prev:
return
# Increment or decrement the volume level based on the direction
if direction == "up":
volume_level += settings.VOLUME_SETTINGS["step"]
elif direction == "down":
volume_level -= settings.VOLUME_SETTINGS["step"]
# Clamp the volume level to the defined min and max range
volume = clamp(round(float(volume_level), 3), settings.VOLUME_SETTINGS["min"], 1)
# Adjust the volume for music or static
if pygame.mixer.music.get_busy() and not static_playback_active:
# Music is playing
pygame.mixer.music.set_volume(volume)
#print("Adjusted just music volume")
else:
# Static playback
static_volume = set_static_volume()
if static_playback_active:
if pygame.mixer.music.get_busy():
# Static is playing via preloaded sounds
pygame.mixer.Channel(1).set_volume(static_volume)
#print("Adjusted static music volume")
else:
# Static is playing via pygame.mixer.music
pygame.mixer.music.set_volume(static_volume)
#print("Adjusted mixer music volume")
else:
# Ensure static is muted if not active
pygame.mixer.Channel(1).set_volume(0)
# Update the previous volume level
volume_prev = volume_level
# Debugging output
#print("Volume =", volume, "Static_volume =", static_volume if static_playback_active else "N/A")
return volume
def clamp(number, minimum, maximum):
return max(min(maximum, number), minimum)
def standby(force=False):
global on_off_state, motor_angle, motor_angle_prev, volume, snd_off
if on_off_state or force:
on_off_state = False
print("Info: Going into standby")
send_uart("P", "0") # Tell the Pi Pico we are going to sleep
if active_station:
active_station.stop()
pygame.mixer.Channel(2).set_volume(settings.VOLUME_SETTINGS["effects"])
pygame.mixer.Channel(2).play(snd_off)
save_settings()
play_static(False)
def resume_from_standby(force=False):
global on_off_state, motor_angle, motor_angle_prev, volume_prev, snd_on, station_num
if not on_off_state or force:
print("Info: Resume from standby")
pygame.time.set_timer(settings.EVENTS['BLINK'], 1000)
on_off_state = True
pygame.mixer.Channel(2).set_volume(settings.VOLUME_SETTINGS["effects"])
pygame.mixer.Channel(2).play(snd_on)
volume_settings, station_number, radio_band_number = load_saved_settings()
set_volume_level(volume_settings)
print("Tuning: Loading saved station number", station_number)
select_band(radio_band_number)
select_station(get_nearest_station(motor_angle), True)
def handle_action(action):
print("Action:", action)
if action == settings.GPIO_ACTIONS["power_off"]:
save_settings()
sys.exit("GPIO called exit")
def check_gpio_input():
for gpio in setup.gpio_actions.keys():
if not setup.GPIO.input(gpio):
handle_action(setup.gpio_actions[gpio])
def handle_event(event):
global on_off_state, active_station
if event.type == pygame.QUIT:
print("Event: Quit called")
on_off_state = False
elif event.type == settings.EVENTS['SONG_END']:
# Only handle SONG_END if it's associated with active music playback
if active_station and active_station.state == active_station.STATES['playing']:
print("Info: Song ended, Playing next song")
active_station.next_song()
#else:
#print("DEBUG: Ignoring SONG_END triggered by non-music playback")
elif event.type == settings.EVENTS['PLAYPAUSE']:
print("Event: Play / Pause")
active_station.pause_play()
elif event.type == settings.EVENTS['BLINK']:
blink_led()
else:
print("Event:", event)
def get_audio_length_mutagen(file_path):
try:
audio = mutagen.File(file_path)
if audio is not None and hasattr(audio.info, 'length'):
length = audio.info.length
#print(f"CACHE: {file_path}, Length: {length}")
return length
else:
raise ValueError(f"Mutagen could not read duration of {file_path}")
except Exception as e:
print(f"ERROR: While getting duration of {file_path} with mutagen: {e}")
return 0
def has_valid_ogg_files(path):
return any(file.name.endswith('.ogg') for file in os.scandir(path))
def get_radio_bands(radio_folder):
global master_start_time
print("Startup: Starting folder search in:", radio_folder)
if settings.RESET_CACHE:
print("WARNING: Cache rebuilding enabled. This can take a while!")
radio_bands = []
for folder in sorted(os.scandir(radio_folder), key=lambda f: f.name.lower()):
if folder.is_dir():
sub_radio_bands = []
for sub_folder in sorted(os.scandir(folder.path), key=lambda f: f.name.lower()):
if sub_folder.is_dir() and has_valid_ogg_files(sub_folder.path):
# Set force_rebuild to True if needed
station_data = handle_station_folder(sub_folder, folder.name, force_rebuild=settings.RESET_CACHE)
if station_data:
sub_radio_bands.append(station_data)
if sub_radio_bands:
radio_bands.append({
"folder_name": folder.name,
"stations": sub_radio_bands
})
print("Debug: Found band folder:", folder.name, "with", len(sub_radio_bands), "Stations" )
if not radio_bands:
print("Warning: No valid radio bands found. Returning an empty list.")
print(f"Debug: Loaded radio bands - Total: {len(radio_bands)}")
return radio_bands
def handle_station_folder(sub_folder, band_name, force_rebuild=False):
path = sub_folder.path
station_ini_file = os.path.join(path, "station.ini")
rebuild_needed = settings.RESET_CACHE or force_rebuild or not os.path.exists(station_ini_file)
station_data = {}
# Normalize and sort the current .ogg files
current_files = sorted(os.path.basename(f).strip().lower() for f in glob.glob(os.path.join(path, "*.ogg")))
if not rebuild_needed:
try:
station_ini_parser = configparser.ConfigParser()
station_ini_parser.read(station_ini_file)
if station_ini_parser.has_section("cache"):
station_data_str = station_ini_parser.get("cache", "station_data")
station_data = json.loads(station_data_str)
# Normalize and sort the cached file list
cached_files = sorted(os.path.basename(f).strip().lower() for f in station_data.get("station_files", []))
# Compare paths
cached_path = station_data.get("path")
if cached_path != path:
print(f"DEBUG: Path mismatch in {sub_folder.name}. Cached: {cached_path}, Current: {path}")
rebuild_needed = True
# Compare file lists
if current_files != cached_files:
print(f"DEBUG: File mismatch in {sub_folder.name}.")
#print(f"DEBUG: Current files ({len(current_files)}): {current_files}")
#print(f"DEBUG: Cached files ({len(cached_files)}): {cached_files}")
rebuild_needed = True
except Exception as error:
print(f"ERROR: Exception reading cache for {sub_folder.name}: {error}")
rebuild_needed = True
if rebuild_needed:
print(f"DEBUG: Rebuilding cache for {sub_folder.name}.")
#print(f"DEBUG: Rebuilding cache for {sub_folder.name}. Current files: {current_files}")
station_data = rebuild_station_cache(path, sub_folder.name, band_name)
return station_data
def validate_station_data(data):
return data.get("station_files") and isinstance(data["station_files"], list) and data["station_files"]
def rebuild_station_cache(path, sub_folder_name, band_name):
print(f"INFO: Starting Data Cache Rebuild for {sub_folder_name}")
station_ini_file = os.path.join(path, "station.ini")
station_files = glob.glob(os.path.join(path, "*.ogg"))
if not station_files:
print(f"Warning: No audio files found in {sub_folder_name}. Skipping station.")
return get_default_station_data(path, sub_folder_name, band_name)
# Sort files using the updated extract_number function
station_files = sorted(station_files, key=extract_number)
# Get audio lengths in parallel
station_lengths = []
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_file = {executor.submit(get_audio_length_mutagen, file_path): file_path for file_path in station_files}
for future in concurrent.futures.as_completed(future_to_file):
file_path = future_to_file[future]
try:
length = future.result()
if length > 0:
station_lengths.append(length)
else:
print(f"Warning: Invalid length for {file_path}.")
except Exception as e:
print(f"Failed to get length for {file_path}: {str(e)}")
if not station_lengths:
print(f"Warning: Could not determine lengths for any files in {sub_folder_name}. Skipping station.")
return get_default_station_data(path, sub_folder_name, band_name)
total_length = sum(station_lengths)
# Read existing settings if available
station_ini_parser = configparser.ConfigParser()
station_data = get_default_station_data(path, sub_folder_name, band_name)
if os.path.exists(station_ini_file):
try:
station_ini_parser.read(station_ini_file)
if station_ini_parser.has_section("settings"):
# Preserve existing settings
station_ordered = station_ini_parser.getboolean("settings", "station_ordered", fallback=station_data["station_ordered"])
station_start = station_ini_parser.getint("settings", "station_start", fallback=station_data["station_start"])
# Update station_data with settings from the file
station_data["station_ordered"] = station_ordered
station_data["station_start"] = station_start
except Exception as e:
print(f"Error reading settings from station.ini for {sub_folder_name}: {str(e)}")
# Ensure files are sorted by number if station is ordered
if station_data["station_ordered"]:
station_files = sorted(station_files, key=extract_number)
# Update station_data with new values
station_data.update({
"station_files": station_files,
"station_lengths": station_lengths,
"total_length": total_length,
})
# Save updated station data and settings back to the ini file
station_ini_parser["settings"] = {
"station_ordered": str(station_data["station_ordered"]),
"station_start": str(station_data["station_start"]),
}
station_ini_parser["cache"] = {
"station_data": json.dumps(station_data, indent=4)
}
try:
with open(station_ini_file, 'w') as configfile:
station_ini_parser.write(configfile)
print(f"Info: Cache for {sub_folder_name} rebuilt and saved successfully.")
except Exception as e:
print(f"Error: Failed to save cache for {sub_folder_name}: {str(e)}")
return station_data
# Find the angular location of a radio station
def get_station_pos(station_number):
global total_station_num
# Validate total_station_num
if total_station_num <= 1:
raise ValueError(f"Invalid total_station_num: {total_station_num}. Must be greater than 1.")
# Validate station_number
if station_number < 0 or station_number >= total_station_num:
raise ValueError(f"Invalid station_number: {station_number}. Must be in range 0 to {total_station_num - 1}.")
# Cache settings for clarity
min_angle = settings.MOTOR_SETTINGS["min_angle"]
max_angle = settings.MOTOR_SETTINGS["max_angle"]
end_zone = settings.TUNING_SETTINGS["end_zone"]
# Calculate and return the angular position
return round(
map_range(
station_number,
0, total_station_num - 1,
min_angle + end_zone,
max_angle - end_zone,
),
1,
)
# Determine the nearest radio station to a certain motor angle
def get_nearest_station(angle):
global total_station_num
# Validate total_station_num
if total_station_num <= 1:
raise ValueError(f"Invalid total_station_num: {total_station_num}. Must be greater than 1.")
# Cache settings for clarity
min_angle = settings.MOTOR_SETTINGS["min_angle"]
max_angle = settings.MOTOR_SETTINGS["max_angle"]
end_zone = settings.TUNING_SETTINGS["end_zone"]
# Map the angle to the nearest station index and return
return round(
map_range(
angle,
min_angle + end_zone,
max_angle - end_zone,
0,
total_station_num - 1,
)
)
def play_static(play=None):
"""
Play static sound based on the state of pygame.mixer.music.
"""
global static_sounds, large_static_files, static_playback_active, volume, static_via_music
if play is False: # Stop static playback
pygame.mixer.Channel(1).stop() # Stop preloaded static sounds
static_playback_active = False
if static_via_music:
pygame.mixer.music.stop()
pygame.event.clear(settings.EVENTS['SONG_END'])
static_via_music = False # Reset static music flag
return
if not static_playback_active and play is None: # Do nothing if static is disabled
return
if play is True: # Enable static playback
static_playback_active = True
if pygame.mixer.music.get_busy(): # If music is already active, use preloaded static sounds
if not pygame.mixer.Channel(1).get_busy(): # Play only if the channel is free
random_snd = random.choice(list(static_sounds.values()))
pygame.mixer.Channel(1).play(random_snd)
pygame.mixer.Channel(1).set_volume(set_static_volume())
else: # If no music is active, use pygame.mixer.music for longer static files
if not pygame.mixer.music.get_busy() and large_static_files:
try:
random_static_file = random.choice(large_static_files)
pygame.mixer.music.load(random_static_file)
pygame.mixer.music.set_volume(set_static_volume())
pygame.mixer.music.play()
static_via_music = True # Mark that static is being played via music
#print(f"DEBUG: Playing static via pygame.mixer.music: {random_static_file}")
except Exception as e:
print(f"ERROR: Failed to play static file via pygame.mixer.music: {e}")
def set_static_volume():
global volume
return max(round(volume * settings.VOLUME_SETTINGS["static_volume"], 3), settings.VOLUME_SETTINGS["static_min"])
def tuning(manual=False):
global motor_angle, tuning_locked, tuning_prev_angle, station_num, active_station, static_playback_active
# Skip if the angle hasn't changed and it's not a manual tuning
if motor_angle == tuning_prev_angle and not manual:
return
# Unlock tuning if it's manual tuning
if manual:
tuning_locked = False
# Find the nearest station based on the current motor angle
nearest_station_num = get_nearest_station(motor_angle)
station_angle = get_station_pos(nearest_station_num)
range_to_station = abs(station_angle - motor_angle) # Angular distance to nearest station
# Check if we're close enough to lock onto the station
if range_to_station <= settings.TUNING_SETTINGS["lock_on"]:
if not tuning_locked:
tuning_locked = True
tuning_volume = volume # Set volume to normal
select_station(nearest_station_num, False)
pygame.mixer.music.set_volume(volume)
play_static(False) # Stop static playback
static_playback_active = False # Ensure static remains stopped
# Check if we're close to a station (but not locked)
elif range_to_station < settings.TUNING_SETTINGS["near"]:
tuning_volume = clamp(round(volume / range_to_station, 3), settings.VOLUME_SETTINGS["min"], 1)
pygame.mixer.music.set_volume(tuning_volume)
if active_station:
play_static(True) # Play static with station audio
static_playback_active = True
tuning_locked = False
else:
select_station(nearest_station_num, False)
# If not near any station, play only static
else:
if active_station:
# print(f"Tuning: No active station. Nearest station: #{nearest_station_num}, Angle: {station_angle}, Needle: {motor_angle}, Separation: {tuning_seperation}")
active_station.stop()
pygame.mixer.music.stop()
active_station = None
station_num = None
play_static(True) # Play only static
static_playback_active = True
tuning_locked = False
# Update the previous angle for the next tuning cycle
tuning_prev_angle = motor_angle
def next_station():
global station_num, total_station_num
station_num = get_nearest_station(motor_angle) + 1
if station_num >= total_station_num:
station_num = total_station_num - 1 # Ensure station_num does not exceed valid index range
print(f"TUNING: At the end of the station list {station_num} / {total_station_num}")
next_band()
else:
print(f"TUNING: Next station {station_num} / {total_station_num}")
select_station(station_num, True)
def prev_station():
global station_num, total_station_num
station_num = get_nearest_station(motor_angle) - 1
if station_num < 0:
station_num = 0
print(f"Tuning: At the beginning of the station list {station_num} / {total_station_num}")
prev_band()
else:
print(f"Tuning: Previous station {station_num} / {total_station_num}")
select_station(station_num, True)
def select_band(new_band_num):
global radio_band, total_station_num, tuning_seperation
global radio_band_total, station_list, stations
global active_station, volume, snd_band, total_station_num, on_off_state
print(f"select_band: DEBUG: select_band called with new_band_num={new_band_num}, type={type(new_band_num)}")
if not isinstance(new_band_num, int):
print("select_band: ERROR: new_band_num must be an integer.")
return
if new_band_num >= radio_band_total or new_band_num < 0:
print("select_band: ERROR: Selected an invalid band number:", new_band_num, "/", radio_band_total)
return
play_static(False)
if active_station:
active_station.stop()
active_station = None
radio_band = new_band_num
print("select_band: Changing to band number", new_band_num)
try:
band_data = radio_band_list[new_band_num] # Expecting a dictionary now
folder_name = band_data.get("folder_name")
folder_path = os.path.join(settings.STATIONS_ROOT_FOLDER, folder_name)
folder_path = os.path.join(settings.STATIONS_ROOT_FOLDER, folder_name)
# Set the station list based on the updated structure
station_data_list = band_data.get("stations", [])
# Validate station data list
if not station_data_list:
print(f"select_band: ERROR: No stations found in band {new_band_num}.")
total_station_num = 0 # No stations found, prevent further actions
return # Early return to avoid using an invalid station list
# Set the total number of stations
total_station_num = len(station_data_list)
tuning_seperation = round(settings.MOTOR_SETTINGS["range"] / total_station_num, 1)
print("Info: Tuning angle separation =", tuning_seperation, "Number of stations:", total_station_num)
# Initialize the stations list with RadioClass instances
stations = []
for station_data in station_data_list:
try:
station = RadioClass(station_data)
stations.append(station)
except Exception as e:
print(f"select_band: ERROR: Failed to initialize RadioClass for station: {station_data.get('station_name', 'Unknown')}, error: {str(e)}")
print(f"select_band: Total stations loaded: {total_station_num}")
if on_off_state:
# Play band change sound
channel = pygame.mixer.Channel(4)
channel.play(snd_band)
channel.set_volume(volume * settings.VOLUME_SETTINGS["band_change"])
if settings.BAND_CALLOUT_SETTINGS["enabled"]:
# Audible Band callout
band_callout_path = os.path.join(folder_path, settings.BAND_CALLOUT_SETTINGS["file"])
if os.path.exists(band_callout_path):
band_callout_sound = pygame.mixer.Sound(band_callout_path)
channel = pygame.mixer.Channel(3)
channel.play(band_callout_sound)
channel.set_volume(volume * settings.VOLUME_SETTINGS["callout_volume"])
if settings.BAND_CALLOUT_SETTINGS["wait_for_completion"]:
while channel.get_busy(): # Wait until the callout sound is finished
time.sleep(0.01)
else:
print(f"select_band: ERROR: Band callout file does not exist at {band_callout_path}")
except IndexError:
print(f"select_band: IndexError: new_band_num {new_band_num} is out of range of the radio_band_list.")
except Exception as e:
print("select_band: Unexpected error accessing radio_band_list:", str(e))
def select_station(new_station_num, manual=False):
global station_num, active_station, motor_angle, tuning_locked, stations, total_station_num
if new_station_num == station_num and not manual: # If the station hasn't changed, do nothing
return
# Suppress SONG_END events temporarily
pygame.mixer.music.set_endevent(pygame.NOEVENT)
# Stop any active playback and reset state
play_static(False)
pygame.mixer.music.stop()
pygame.mixer.Channel(1).stop()
# Clear any lingering SONG_END events from the queue
pygame.event.clear(settings.EVENTS['SONG_END'])
if active_station:
active_station.stop()
active_station = None
# Set motor angle if manually tuning
if manual:
motor_angle = get_station_pos(new_station_num)
tuning_locked = False
set_motor(motor_angle, True)
# Change to the new station
station_num = new_station_num
active_station = stations[station_num]
# Log the change
station_name = active_station.label if hasattr(active_station, 'label') else 'Unknown'
print(f"Select_Station: '{station_name}' (Station {station_num + 1} of {total_station_num}), at angle = {motor_angle}")
# Restore SONG_END event posting for music playback
pygame.mixer.music.set_endevent(settings.EVENTS['SONG_END'])
# Clear any lingering events again after enabling SONG_END
pygame.event.clear(settings.EVENTS['SONG_END'])
# Start playback for the active station
active_station.live_playback()
def prev_station():
global station_num, motor_angle, total_station_num
# Get the nearest station to the current motor angle and move to the previous one
station_num = get_nearest_station(motor_angle) - 1
if station_num < 0:
station_num = 0
print(f"Tuning: At the beginning of the station list {station_num} / {total_station_num}")
play_error_snd()
prev_band()
else:
print(f"Tuning: Previous station {station_num} / {total_station_num}")
select_station(station_num, True)
def next_station():
global station_num, motor_angle, total_station_num
# Get the nearest station to the current motor angle and move to the next one
station_num = get_nearest_station(motor_angle) + 1
if station_num >= len(stations):
station_num = len(stations) - 1
print(f"Tuning: At the end of the station list {station_num} / {total_station_num}")
play_error_snd()
next_band()
else:
print(f"Tuning: Next station {station_num} / {total_station_num}")
select_station(station_num, True)
def prev_band():
global radio_band, radio_band_total, motor_angle
print("prev_band")
new_band = radio_band - 1
if new_band < 0:
radio_band = 0
print("Tuning: At the beginning of the band list", radio_band, "/", radio_band_total)
play_error_snd()
else:
print("Tuning: Previous radio band", radio_band, "/", radio_band_total)
select_band(new_band)
select_station(get_nearest_station(motor_angle), True)
def next_band():
global radio_band, radio_band_total, motor_angle
print("next_band")
new_band = radio_band + 1
if new_band >= radio_band_total:
radio_band = radio_band_total -1
print("Tuning: At end of the band list", radio_band, "/", radio_band_total)
play_error_snd()
else:
print("Tuning: Next radio band", radio_band, "/", radio_band_total)
select_band(new_band)
select_station(get_nearest_station(motor_angle), True)
def play_error_snd():
pygame.mixer.Channel(4).play(snd_error)
pygame.mixer.Channel(4).set_volume(volume * settings.VOLUME_SETTINGS["effects"])
def shutdown_zero():
print("Info: Doing a full shutdown")
call("sudo nohup shutdown -h now", shell=True)
def send_uart(command_type, data1, data2="", data3="", data4="", timeout=1.0):
"""
Sends a UART message with a timeout.
"""
global uart
# Validate if UART exists
if uart is None:
print("DEBUG: UART object is None. Attempting to reinitialize.")
try:
import setup # Reimport setup to access its initialization routines
setup.open_serial_connection() # Reinitialize the UART connection
uart = setup.get_uart() # Update the global UART object
if not uart or not uart.is_open:
print("ERROR: Reinitialization of UART failed.")
return
else:
print("INFO: UART reinitialized successfully.")
except Exception as e:
print(f"ERROR: Exception during UART reinitialization: {e}")
return
# Validate if UART is open
if not uart.is_open:
print("DEBUG: UART is not open. Attempting to reopen.")
try:
uart.open()
print("INFO: UART reopened successfully.")
except Exception as e:
print(f"ERROR: Failed to reopen UART: {e}")
return
# Attempt to send the message
try:
message = f"{command_type},{data1},{data2},{data3},{data4},\n"
message_bytes = bytes(message, "utf-8")
uart.write(message_bytes)
#print(f"DEBUG: UART message sent: {message.strip()}")
except serial.SerialTimeoutException as e:
print(f"ERROR: UART timeout during message send: {e}")
except serial.SerialException as e:
print(f"ERROR: SerialException during UART write: {e}")
except Exception as e:
print(f"ERROR: Unexpected exception during UART message send: {e}")
def receive_uart():
global uart
# Validate if UART is initialized
# if not uart:
# print("DEBUG: UART object is None. Skipping receive_uart.")
# return []
# Validate if UART is open
if not uart.is_open:
print("DEBUG: UART is not open. Attempting to reopen.")
try:
uart.open()
print("INFO: UART reopened successfully.")
except Exception as error:
print(f"ERROR: Failed to reopen UART: {error}")
return []
# Check for available data
# if uart.in_waiting == 0:
# print("DEBUG: No data available to read from UART.")
# return []
try:
# Read the line from UART
line = uart.readline() # Read data until "\n" or timeout
if line:
#print(f"DEBUG: Raw data received: {line}")
line = line.decode("utf-8").strip("\n") # Decode and clean the data
#print(f"DEBUG: Decoded data: {line}")
parsed_data = line.split(",") # Parse data into a list
#print(f"DEBUG: Parsed UART data: {parsed_data}")
return parsed_data
else:
#print("DEBUG: UART read returned an empty line.")
return []
except serial.SerialException as error:
print(f"ERROR: SerialException in receive_uart: {error}")
try:
uart.close() # Close UART to reset on error