From 97a8c7917bffb398f78481cd3e4812eec71e2366 Mon Sep 17 00:00:00 2001 From: John Finberg Date: Wed, 13 Nov 2024 15:49:10 -0500 Subject: [PATCH] fix: movement library execution tracking and passive localization realtime audio threading --- .../movement_library/movement_library/main.py | 18 +- .../localization.py | 4 - .../passive_sound_localization/main.py | 57 +++--- .../realtime_audio.py | 124 ------------- .../realtime_audio_streamer.py | 172 +++++++----------- 5 files changed, 114 insertions(+), 261 deletions(-) delete mode 100644 packages/passive_sound_localization/passive_sound_localization/realtime_audio.py diff --git a/packages/movement_library/movement_library/main.py b/packages/movement_library/movement_library/main.py index 90c9576..e22772e 100644 --- a/packages/movement_library/movement_library/main.py +++ b/packages/movement_library/movement_library/main.py @@ -11,12 +11,14 @@ from example_interfaces.msg import Bool from passive_sound_localization_msgs.msg import LocalizationResult +from movement_library.logger import setup_logger class MovementNode(Node): def __init__(self): super().__init__("movement_node") - self.logger = get_logger(__name__) + setup_logger() + self.logger = logging.getLogger(__name__) self.logger.info("Starting movement_node") self.cmd_vel_publisher = self.create_publisher(Twist, "rcm/cmd_vel", 1) self.enable_publisher = self.create_publisher(Bool, "rcm/enabled", 1) @@ -30,7 +32,7 @@ def __init__(self): self.create_subscription( LocalizationResult, "localization_results", self.localizer_callback, 10 ) - # self.localizationSubscription={"distance": 0.91, "angle": 180} + self.localizationSubscription={"distance": 0, "angle": 0, "executed": False} # self.create_subscription(...) self.loop_time_period = 1.0 / 10.0 @@ -56,12 +58,13 @@ def calculate_time_ang(self, angle, ang_velocity): def localizer_callback(self, msg): self.logger.info("Got a message") - angle = 360-msg.angle + # angle = 360-msg.angle + angle = msg.angle distance = msg.distance if self.executing: pass else: - self.localizationSubscription={"distance": distance, "angle": angle} + self.localizationSubscription={"distance": distance, "angle": angle, "executed": False} self.time = 0.0 self.executing = True self.logger.info(f"Got {str(angle)} {str(distance)}") @@ -70,7 +73,7 @@ def loop(self): # something to stop time from incrementing or reset it when we get a new input self.time=self.time+self.loop_time_period - self.get_logger().info("time: %d"%(self.time)) + # self.get_logger().info("time: %d"%(self.time)) enableMsg=Bool() enableMsg.data=True @@ -84,6 +87,9 @@ def loop(self): velocityMsg.angular.z=0.0 velocityMsg.linear.x=0.0 + if self.localizationSubscription["executed"]: + return + spdx = 0.3 spdang = 0.5 time_xyz = self.calculate_time_xyz(self.localizationSubscription["distance"], spdx) @@ -110,7 +116,7 @@ def loop(self): velocityMsg.linear.x=0.0 self.time=0 # not sure if needed self.executing=False - + self.localizationSubscription["executed"]=True # print(velocityMsg.linear.x) # print(velocityMsg.angular.z) diff --git a/packages/passive_sound_localization/passive_sound_localization/localization.py b/packages/passive_sound_localization/passive_sound_localization/localization.py index 22bbda9..33d4d7d 100644 --- a/packages/passive_sound_localization/passive_sound_localization/localization.py +++ b/packages/passive_sound_localization/passive_sound_localization/localization.py @@ -117,8 +117,6 @@ def localize_stream( Yields: - List of LocalizationResult objects for each processed audio chunk. """ - logger.info("Starting real-time sound source localization.") - num_mic_streams = len(multi_channel_stream) if num_mic_streams == 0: @@ -184,8 +182,6 @@ def localize_stream( logger.debug(f"Localization results for current chunk: {results}") yield results - logger.info("Real-time sound source localization completed.") - def _compute_cross_spectrum( self, mic_signals: np.ndarray[np.float32], fft_size: int = 1024 ) -> np.ndarray[np.complex128]: diff --git a/packages/passive_sound_localization/passive_sound_localization/main.py b/packages/passive_sound_localization/passive_sound_localization/main.py index 8e12b85..002b675 100644 --- a/packages/passive_sound_localization/passive_sound_localization/main.py +++ b/packages/passive_sound_localization/passive_sound_localization/main.py @@ -11,6 +11,7 @@ from rclpy.node import Node import logging import rclpy +import time from queue import Queue @@ -18,17 +19,22 @@ locations = Queue(maxsize=10) -def send_audio_continuously(client, single_channel_generator, logger): +def send_audio_continuously(client, streamer: RealtimeAudioStreamer, logger: logging.Logger): logger.info("Sending audio to OpenAI") - for single_channel_audio in single_channel_generator: - client.send_audio(single_channel_audio) + for audio_streams in streamer.audio_generator(): + if audio_streams is None: + continue + client.send_audio(streamer.resample_stream(audio_streams[0])) + time.sleep(0.01) -def receive_text_messages(client, logger): + +def receive_text_messages(client, logger: logging.Logger): logger.info("OpanAI: Listening to audio stream") while True: try: command = client.receive_text_response() + logger.info(f"Received command: {command}") if command and command.strip() == "MOVE_TO": logger.info(f"Received command: {command}") @@ -40,40 +46,49 @@ def receive_text_messages(client, logger): logger.error(f"Error receiving response: {e}") -def realtime_localization(multi_channel_stream, localizer, logger): +def realtime_localization(streamer: RealtimeAudioStreamer, localizer: SoundLocalizer, logger: logging.Logger): logger.info("Localization: Listening to audio stream") - try: - did_get = True - for audio_data in multi_channel_stream: + did_get = False + for audio_streams in streamer.audio_generator(): + try: + if audio_streams is None: + logger.error("Audio streams are None") + continue + # Stream audio data and pass it to the localizer localization_stream = localizer.localize_stream( - [audio_data[k] for k in audio_data.keys()] + audio_streams, ) for localization_results in localization_stream: if locations.full(): locations.get() + did_get = True + logger.debug(f"Putting localization results: {localization_results}") locations.put(localization_results) if did_get: locations.task_done() did_get = False - except Exception as e: - print(f"Realtime Localization error: {e}") + except Exception as e: + print(f"Realtime Localization error: {e}") -def command_executor(publisher, logger): +def command_executor(publisher, logger: logging.Logger): logger.info("Executor: listening for command") while True: try: if commands.qsize() > 0: - logger.info(f"Got command, locations: {locations}") + logger.info(f"Got command, and current location size is: {locations.qsize()}") + commands.get() commands.task_done() if locations.qsize() > 0: - publisher(locations.get()) + location = locations.get() + logger.info(f"Publishing location: {location}") + publisher(location[0]) locations.task_done() except Exception as e: @@ -129,18 +144,15 @@ def process_audio(self): self.logger.info("Processing audio...") with self.streamer as streamer, self.openai_ws_client as client: - multi_channel_stream = streamer.multi_channel_gen() - single_channel_stream = streamer.single_channel_gen() - with ThreadPoolExecutor(max_workers=4) as executor: self.logger.info("Threading log") executor.submit( - send_audio_continuously, client, single_channel_stream, self.logger + send_audio_continuously, client, streamer, self.logger ) executor.submit(receive_text_messages, client, self.logger) executor.submit( realtime_localization, - multi_channel_stream, + streamer, self.localizer, self.logger, ) @@ -150,10 +162,11 @@ def process_audio(self): def publish_results(self, localization_results): # Publish results to ROS topic + self.logger.info(f"Publishing results: {localization_results}") msg = LocalizationResult() - msg.angle = localization_results.angle - msg.distance = localization_results.distance - self.get_logger().info( + msg.angle = float(localization_results.angle) + msg.distance = float(localization_results.distance) + self.logger.info( f"Publishing: angle={msg.angle}, distance={msg.distance}" ) self.publisher.publish(msg) diff --git a/packages/passive_sound_localization/passive_sound_localization/realtime_audio.py b/packages/passive_sound_localization/passive_sound_localization/realtime_audio.py deleted file mode 100644 index 44cc162..0000000 --- a/packages/passive_sound_localization/passive_sound_localization/realtime_audio.py +++ /dev/null @@ -1,124 +0,0 @@ -from concurrent.futures import ThreadPoolExecutor -from localization import SoundLocalizer -from realtime_audio_streamer import RealtimeAudioStreamer -from models.configs import RealtimeAudioStreamerConfig -from models.configs import LocalizationConfig -from models.configs import OpenAIWebsocketConfig -from realtime_openai_websocket import OpenAIWebsocketClient - - -from queue import Queue - -import logging -from dotenv import load_dotenv - -load_dotenv() - -logger = logging.getLogger(__name__) - - -commands = Queue(maxsize=10) -locations = Queue(maxsize=10) - -# commands = [] -# locations = [] -def send_audio_continuously(client, single_channel_generator): - print("Threading...") - for single_channel_audio in single_channel_generator: - if single_channel_audio is None: - continue - client.send_audio(single_channel_audio) - - -def receive_text_messages(client, logger): - logger.info("OpenAI: Listening to audio stream") - while True: - try: - command = client.receive_text_response() - if command and command.strip() == "MOVE_TO": - print(command) - logger.info(f"Received command: {command}") - - if commands.full(): - commands.get() - commands.task_done() - commands.put(command) - # commands.append(command) - except Exception as e: - print(f"Error receiving response: {e}") - -def realtime_localization(multi_channel_stream, localizer, logger): - logger.info("Localization: Listening to audio stream") - try: - did_get = True - for audio_data in multi_channel_stream: - # Skip calculating if there's any issues with the audio data - if audio_data is None: - continue - - # Stream audio data and pass it to the localizer - localization_stream = localizer.localize_stream( - [audio_data[k] for k in audio_data.keys()] - ) - - for localization_results in localization_stream: - # locations.append(localization_results) - if locations.full(): - locations.get() - - locations.put(localization_results) - - if did_get: - locations.task_done() - did_get = False - - except Exception as e: - print(f"Realtime Localization error: {e}") - -def command_executor(publisher, logger): - logger.info("Executor: listening for command") - while True: - try: - # if len(commands) > 0: - if commands.qsize() > 0: - logger.info(f"Got command, locations: {locations}") - commands.get() - commands.task_done() - # commands.pop() - # if len(locations) > 0: - if locations.qsize() > 0: - publisher(locations.get()) - - locations.task_done() - # publisher(locations.pop()) - except Exception as e: - print(f"Command executor error: {e}") - - -def publish_results(localization_results): - print(localization_results) - -def main(): - audio_streamer_config = RealtimeAudioStreamerConfig() - localizer_config = LocalizationConfig() - websocket_config = OpenAIWebsocketConfig() - print("Running realtime audio...") - - localizer = SoundLocalizer(localizer_config) - - with ( - RealtimeAudioStreamer(audio_streamer_config) as streamer, - OpenAIWebsocketClient(websocket_config) as client - ): - multi_channel_stream = streamer.multi_channel_gen() - single_channel_stream = streamer.single_channel_gen() - - with ThreadPoolExecutor(max_workers=4) as executor: - logger.info("Threading log") - executor.submit(send_audio_continuously, client, single_channel_stream) - executor.submit(receive_text_messages, client, logger) - executor.submit(realtime_localization, multi_channel_stream, localizer, logger) - executor.submit(command_executor, lambda x: publish_results(x), logger) - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/packages/passive_sound_localization/passive_sound_localization/realtime_audio_streamer.py b/packages/passive_sound_localization/passive_sound_localization/realtime_audio_streamer.py index d81fef1..514770a 100644 --- a/packages/passive_sound_localization/passive_sound_localization/realtime_audio_streamer.py +++ b/packages/passive_sound_localization/passive_sound_localization/realtime_audio_streamer.py @@ -1,125 +1,87 @@ import logging -from typing import Dict, Generator, List, Optional -import soundcard as sc +from typing import Generator, List import numpy as np +import queue import threading from pydub import AudioSegment -from io import BytesIO +import pyaudio - -from passive_sound_localization.models.configs.realtime_streamer import ( - RealtimeAudioStreamerConfig, -) - -# from models.configs.realtime_streamer import ( -# RealtimeAudioStreamerConfig, -# ) # Only needed to run with `realtime_audio.py` +from passive_sound_localization.models.configs.realtime_streamer import RealtimeAudioStreamerConfig logger = logging.getLogger(__name__) - class RealtimeAudioStreamer: def __init__(self, config: RealtimeAudioStreamerConfig): - self.sample_rate: int = config.sample_rate - self.channels: int = config.channels - self.chunk: int = config.chunk - self.device_indices = config.device_indices - self.streams: Dict[int, np.ndarray] = {} - self.streaming: bool = False - - print(self.device_indices) + self.mic_indices = config.device_indices + self.sample_rate = config.sample_rate + self.channels = config.channels + self.chunk_size = config.chunk + self.is_running = False + self.pyaudio_instance = pyaudio.PyAudio() + self.streams = [] + self.audio_queues = [queue.Queue() for _ in self.mic_indices] + + for mic_index in range(self.pyaudio_instance.get_device_count()): + device_info = self.pyaudio_instance.get_device_info_by_index(mic_index) + logger.info(f"Device info: {device_info}") + + logger.info(f"Mic indices: {self.mic_indices}") + for mic_index in self.mic_indices: + logger.debug(f"Opening stream for mic index: {mic_index}") + stream = self.pyaudio_instance.open( + rate=self.sample_rate, + channels=self.channels, + format=pyaudio.paInt16, + input=True, + input_device_index=mic_index, + frames_per_buffer=self.chunk_size + ) + self.streams.append(stream) def __enter__(self): - microphones = sc.all_microphones() - self.streams = { - device_index: np.zeros((self.chunk, self.channels), dtype=np.float32) - for device_index in self.device_indices - } - self.streaming = True - - # Start a thread to continuously record audio - self.recording_thread = threading.Thread( - target=self.record_audio, args=(microphones,) - ) - self.recording_thread.start() - + self.is_running = True + self.start_stream_threads() return self - def record_audio(self, microphones): - while self.streaming: - for device_index in self.device_indices: - self.streams[device_index] = microphones[device_index].record( - samplerate=self.sample_rate, - channels=self.channels, - numframes=self.chunk, - ) - - def __exit__(self, *args): - self.streaming = False - self.recording_thread.join() # Wait for the recording thread to finish - - def get_stream(self, device_index: int) -> Optional[bytes]: - """Retrieve the audio stream for a specific device index.""" - if device_index in self.device_indices: - return np.nan_to_num(self.streams[device_index]).tobytes() - else: - print(f"Device index {device_index} not found.") - return None - - def multi_channel_gen(self) -> Generator[Optional[Dict[int, bytes]], None, None]: + def start_stream_threads(self): + """Start a thread for each audio stream to continuously push audio data to its queue.""" + self.stream_threads = [] + for idx, stream in enumerate(self.streams): + thread = threading.Thread(target=self.stream_to_queue, args=(idx, stream), daemon=True) + thread.start() + self.stream_threads.append(thread) + + def stream_to_queue(self, idx: int, stream): + """Reads audio data from the stream and places it in the appropriate queue.""" + while self.is_running: + try: + audio_data = stream.read(self.chunk_size, exception_on_overflow=False) + self.audio_queues[idx].put(audio_data) + except Exception as e: + logger.error(f"Error in stream_to_queue for mic {idx}: {e}") + self.audio_queues[idx].put(b"") + + def audio_generator(self) -> Generator[List[bytes], None, None]: + """Thread-safe generator that yields a list of audio data chunks from all microphones.""" + while self.is_running: + yield [q.get() for q in self.audio_queues] + + def __exit__(self, *args, **kwargs): + self.is_running = False + for stream in self.streams: + stream.stop_stream() + stream.close() + self.pyaudio_instance.terminate() + + def resample_stream(self, stream: bytes, target_sample_rate: int = 24000) -> bytes: try: - while self.streaming: - audio_arrays = [] - for device_index in self.device_indices: - audio_arrays.append(self.get_stream(device_index)) - - # Return none if any audio is None or empty bytes - if any(audio == b"" or audio is None for audio in audio_arrays): - yield None - - yield { - device_index: audio - for device_index, audio in zip(self.device_indices, audio_arrays) - } - - except Exception as e: - print(f"Error in multi_channel_gen: {e}") - - def merge_streams(self, streams: List[np.ndarray]) -> np.ndarray: - return np.sum(streams, axis=0) / len(streams) - - def resample_stream( - self, stream: bytes, target_sample_rate: int = 24000, sample_width: int = 2 - ) -> bytes: - try: - audio_data_int16 = (stream * 32767).astype(np.int16) + np_stream = np.frombuffer(stream, dtype=np.int16).astype(np.int16).tobytes() audio_segment = AudioSegment( - audio_data_int16.tobytes(), - frame_rate=self.sample_rate, - sample_width=audio_data_int16.dtype.itemsize, - channels=self.channels, + np_stream, frame_rate=self.sample_rate, sample_width=2, channels=1 ) - - # Resample the audio to 24000 Hz - audio_segment_resampled = audio_segment.set_frame_rate(target_sample_rate) - return audio_segment_resampled.get_array_of_samples().tobytes() + audio_segment = audio_segment.set_frame_rate(target_sample_rate).set_channels(1) + return audio_segment.raw_data except Exception as e: - print(f"Error in resample_stream: {e}") + logger.error(f"Error in resample_stream: {e}") return b"" - - def single_channel_gen(self) -> Generator[Optional[bytes], None, None]: - try: - while self.streaming: - stream = self.get_stream(self.device_indices[0]) - if stream == b"" or stream is None: - yield None - - resampled_stream = self.resample_stream(stream) - - if resampled_stream != b"": - yield resampled_stream - else: - yield None - except Exception as e: - print(f"Error in single_channel_gen: {e}")