Skip to content

Commit

Permalink
fix: movement library execution tracking and passive localization rea…
Browse files Browse the repository at this point in the history
…ltime audio threading
  • Loading branch information
John2360 committed Nov 13, 2024
1 parent 40eb0e5 commit 97a8c79
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 261 deletions.
18 changes: 12 additions & 6 deletions packages/movement_library/movement_library/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@

from example_interfaces.msg import Bool
from passive_sound_localization_msgs.msg import LocalizationResult
from movement_library.logger import setup_logger


class MovementNode(Node):
def __init__(self):
super().__init__("movement_node")
self.logger = get_logger(__name__)
setup_logger()
self.logger = logging.getLogger(__name__)
self.logger.info("Starting movement_node")
self.cmd_vel_publisher = self.create_publisher(Twist, "rcm/cmd_vel", 1)
self.enable_publisher = self.create_publisher(Bool, "rcm/enabled", 1)
Expand All @@ -30,7 +32,7 @@ def __init__(self):
self.create_subscription(
LocalizationResult, "localization_results", self.localizer_callback, 10
)
# self.localizationSubscription={"distance": 0.91, "angle": 180}
self.localizationSubscription={"distance": 0, "angle": 0, "executed": False}
# self.create_subscription(...)

self.loop_time_period = 1.0 / 10.0
Expand All @@ -56,12 +58,13 @@ def calculate_time_ang(self, angle, ang_velocity):

def localizer_callback(self, msg):
self.logger.info("Got a message")
angle = 360-msg.angle
# angle = 360-msg.angle
angle = msg.angle
distance = msg.distance
if self.executing:
pass
else:
self.localizationSubscription={"distance": distance, "angle": angle}
self.localizationSubscription={"distance": distance, "angle": angle, "executed": False}
self.time = 0.0
self.executing = True
self.logger.info(f"Got {str(angle)} {str(distance)}")
Expand All @@ -70,7 +73,7 @@ def loop(self):
# something to stop time from incrementing or reset it when we get a new input
self.time=self.time+self.loop_time_period

self.get_logger().info("time: %d"%(self.time))
# self.get_logger().info("time: %d"%(self.time))

enableMsg=Bool()
enableMsg.data=True
Expand All @@ -84,6 +87,9 @@ def loop(self):
velocityMsg.angular.z=0.0
velocityMsg.linear.x=0.0

if self.localizationSubscription["executed"]:
return

spdx = 0.3
spdang = 0.5
time_xyz = self.calculate_time_xyz(self.localizationSubscription["distance"], spdx)
Expand All @@ -110,7 +116,7 @@ def loop(self):
velocityMsg.linear.x=0.0
self.time=0 # not sure if needed
self.executing=False

self.localizationSubscription["executed"]=True
# print(velocityMsg.linear.x)
# print(velocityMsg.angular.z)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,6 @@ def localize_stream(
Yields:
- List of LocalizationResult objects for each processed audio chunk.
"""
logger.info("Starting real-time sound source localization.")

num_mic_streams = len(multi_channel_stream)

if num_mic_streams == 0:
Expand Down Expand Up @@ -184,8 +182,6 @@ def localize_stream(
logger.debug(f"Localization results for current chunk: {results}")
yield results

logger.info("Real-time sound source localization completed.")

def _compute_cross_spectrum(
self, mic_signals: np.ndarray[np.float32], fft_size: int = 1024
) -> np.ndarray[np.complex128]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,30 @@
from rclpy.node import Node
import logging
import rclpy
import time

from queue import Queue

commands = Queue(maxsize=10)
locations = Queue(maxsize=10)


def send_audio_continuously(client, single_channel_generator, logger):
def send_audio_continuously(client, streamer: RealtimeAudioStreamer, logger: logging.Logger):
logger.info("Sending audio to OpenAI")
for single_channel_audio in single_channel_generator:
client.send_audio(single_channel_audio)
for audio_streams in streamer.audio_generator():
if audio_streams is None:
continue

client.send_audio(streamer.resample_stream(audio_streams[0]))
time.sleep(0.01)

def receive_text_messages(client, logger):

def receive_text_messages(client, logger: logging.Logger):
logger.info("OpanAI: Listening to audio stream")
while True:
try:
command = client.receive_text_response()
logger.info(f"Received command: {command}")
if command and command.strip() == "MOVE_TO":
logger.info(f"Received command: {command}")

Expand All @@ -40,40 +46,49 @@ def receive_text_messages(client, logger):
logger.error(f"Error receiving response: {e}")


def realtime_localization(multi_channel_stream, localizer, logger):
def realtime_localization(streamer: RealtimeAudioStreamer, localizer: SoundLocalizer, logger: logging.Logger):
logger.info("Localization: Listening to audio stream")
try:
did_get = True
for audio_data in multi_channel_stream:
did_get = False
for audio_streams in streamer.audio_generator():
try:
if audio_streams is None:
logger.error("Audio streams are None")
continue

# Stream audio data and pass it to the localizer
localization_stream = localizer.localize_stream(
[audio_data[k] for k in audio_data.keys()]
audio_streams,
)

for localization_results in localization_stream:
if locations.full():
locations.get()
did_get = True

logger.debug(f"Putting localization results: {localization_results}")
locations.put(localization_results)

if did_get:
locations.task_done()
did_get = False

except Exception as e:
print(f"Realtime Localization error: {e}")
except Exception as e:
print(f"Realtime Localization error: {e}")


def command_executor(publisher, logger):
def command_executor(publisher, logger: logging.Logger):
logger.info("Executor: listening for command")
while True:
try:
if commands.qsize() > 0:
logger.info(f"Got command, locations: {locations}")
logger.info(f"Got command, and current location size is: {locations.qsize()}")

commands.get()
commands.task_done()
if locations.qsize() > 0:
publisher(locations.get())
location = locations.get()
logger.info(f"Publishing location: {location}")
publisher(location[0])

locations.task_done()
except Exception as e:
Expand Down Expand Up @@ -129,18 +144,15 @@ def process_audio(self):
self.logger.info("Processing audio...")

with self.streamer as streamer, self.openai_ws_client as client:
multi_channel_stream = streamer.multi_channel_gen()
single_channel_stream = streamer.single_channel_gen()

with ThreadPoolExecutor(max_workers=4) as executor:
self.logger.info("Threading log")
executor.submit(
send_audio_continuously, client, single_channel_stream, self.logger
send_audio_continuously, client, streamer, self.logger
)
executor.submit(receive_text_messages, client, self.logger)
executor.submit(
realtime_localization,
multi_channel_stream,
streamer,
self.localizer,
self.logger,
)
Expand All @@ -150,10 +162,11 @@ def process_audio(self):

def publish_results(self, localization_results):
# Publish results to ROS topic
self.logger.info(f"Publishing results: {localization_results}")
msg = LocalizationResult()
msg.angle = localization_results.angle
msg.distance = localization_results.distance
self.get_logger().info(
msg.angle = float(localization_results.angle)
msg.distance = float(localization_results.distance)
self.logger.info(
f"Publishing: angle={msg.angle}, distance={msg.distance}"
)
self.publisher.publish(msg)
Expand Down

This file was deleted.

Loading

0 comments on commit 97a8c79

Please sign in to comment.