From e2384e2484b1e0d4b676e1ce90c04374aad158cf Mon Sep 17 00:00:00 2001 From: James Hush Date: Tue, 26 Nov 2024 11:22:58 +0800 Subject: [PATCH 1/7] fix: add logging and error handling for issue #721 --- src/pipecat/processors/frame_processor.py | 26 +++++++++++++++++------ 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index cc3b2705..93829c8f 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -6,10 +6,11 @@ import asyncio import inspect - from enum import Enum from typing import Awaitable, Callable, Optional +from loguru import logger + from pipecat.clocks.base_clock import BaseClock from pipecat.frames.frames import ( EndFrame, @@ -24,8 +25,6 @@ from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics from pipecat.utils.utils import obj_count, obj_id -from loguru import logger - class FrameDirection(Enum): DOWNSTREAM = 1 @@ -220,11 +219,16 @@ def _register_event_handler(self, event_name: str): # async def _start_interruption(self): - # Cancel the push frame task. This will stop pushing frames downstream. - await self.__cancel_push_task() + try: + # Cancel the push frame task. This will stop pushing frames downstream. + await self.__cancel_push_task() - # Cancel the input task. This will stop processing queued frames. - await self.__cancel_input_task() + # Cancel the input task. This will stop processing queued frames. + await self.__cancel_input_task() + except Exception as e: + logger.exception(f"Uncaught exception in {self}: {e}") + await self.push_error(ErrorFrame(str(e))) + raise # Create a new input queue and task. self.__create_input_task() @@ -281,7 +285,11 @@ async def __input_frame_task_handler(self): self.__input_queue.task_done() except asyncio.CancelledError: + logger.trace(f"Cancelled input task in {self}") break + except Exception as e: + logger.exception(f"Uncaught exception in {self}: {e}") + await self.push_error(ErrorFrame(str(e))) def __create_push_task(self): self.__push_queue = asyncio.Queue() @@ -300,7 +308,11 @@ async def __push_frame_task_handler(self): running = not isinstance(frame, EndFrame) self.__push_queue.task_done() except asyncio.CancelledError: + logger.trace(f"Cancelled push task in {self}") break + except Exception as e: + logger.exception(f"Uncaught exception in {self}: {e}") + await self.push_error(ErrorFrame(str(e))) async def _call_event_handler(self, event_name: str, *args, **kwargs): try: From 1893784b898b6744b414792d1c3e02d30bc4344d Mon Sep 17 00:00:00 2001 From: James Hush Date: Wed, 27 Nov 2024 11:36:28 +0800 Subject: [PATCH 2/7] Save race bot --- examples/foundational/race_bot.py | 87 +++++++++++++++++++++++ src/pipecat/processors/frame_processor.py | 26 ++----- 2 files changed, 94 insertions(+), 19 deletions(-) create mode 100644 examples/foundational/race_bot.py diff --git a/examples/foundational/race_bot.py b/examples/foundational/race_bot.py new file mode 100644 index 00000000..af8e2834 --- /dev/null +++ b/examples/foundational/race_bot.py @@ -0,0 +1,87 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import os +import sys +import time + +import aiohttp +from loguru import logger +from runner import configure + +from pipecat.frames.frames import ( + TranscriptionFrame, + UserStartedSpeakingFrame, + UserStoppedSpeakingFrame, +) +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.services.cartesia import CartesiaTTSService +from pipecat.services.openai import OpenAILLMService +from pipecat.transports.services.daily import DailyParams, DailyTransport + +logger.add(sys.stderr, level="DEBUG") + + +async def main(): + async with aiohttp.ClientSession() as session: + (room_url, _) = await configure(session) + + transport = DailyTransport( + room_url, None, "Say One Thing", DailyParams(audio_out_enabled=True) + ) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady + ) + + llm = OpenAILLMService(api_key=os.environ["OPENAI_API_KEY"], model="gpt-4o") + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", + }, + ] + + context = OpenAILLMContext(messages) + context_aggregator = llm.create_context_aggregator(context) + + runner = PipelineRunner() + + task = PipelineTask( + Pipeline( + [ + context_aggregator.user(), + llm, + tts, + transport.output(), + context_aggregator.assistant(), + ] + ) + ) + + # Register an event handler so we can play the audio when the + # participant joins. + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + frames = [ + UserStartedSpeakingFrame(), + TranscriptionFrame("Tell a joke about dogs.", "user_id", time.time()), + UserStoppedSpeakingFrame(), + ] + + await task.queue_frames(frames) + + await runner.run(task) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 93829c8f..cc3b2705 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -6,11 +6,10 @@ import asyncio import inspect + from enum import Enum from typing import Awaitable, Callable, Optional -from loguru import logger - from pipecat.clocks.base_clock import BaseClock from pipecat.frames.frames import ( EndFrame, @@ -25,6 +24,8 @@ from pipecat.processors.metrics.frame_processor_metrics import FrameProcessorMetrics from pipecat.utils.utils import obj_count, obj_id +from loguru import logger + class FrameDirection(Enum): DOWNSTREAM = 1 @@ -219,16 +220,11 @@ def _register_event_handler(self, event_name: str): # async def _start_interruption(self): - try: - # Cancel the push frame task. This will stop pushing frames downstream. - await self.__cancel_push_task() + # Cancel the push frame task. This will stop pushing frames downstream. + await self.__cancel_push_task() - # Cancel the input task. This will stop processing queued frames. - await self.__cancel_input_task() - except Exception as e: - logger.exception(f"Uncaught exception in {self}: {e}") - await self.push_error(ErrorFrame(str(e))) - raise + # Cancel the input task. This will stop processing queued frames. + await self.__cancel_input_task() # Create a new input queue and task. self.__create_input_task() @@ -285,11 +281,7 @@ async def __input_frame_task_handler(self): self.__input_queue.task_done() except asyncio.CancelledError: - logger.trace(f"Cancelled input task in {self}") break - except Exception as e: - logger.exception(f"Uncaught exception in {self}: {e}") - await self.push_error(ErrorFrame(str(e))) def __create_push_task(self): self.__push_queue = asyncio.Queue() @@ -308,11 +300,7 @@ async def __push_frame_task_handler(self): running = not isinstance(frame, EndFrame) self.__push_queue.task_done() except asyncio.CancelledError: - logger.trace(f"Cancelled push task in {self}") break - except Exception as e: - logger.exception(f"Uncaught exception in {self}: {e}") - await self.push_error(ErrorFrame(str(e))) async def _call_event_handler(self, event_name: str, *args, **kwargs): try: From cedccdcbc0df7d955e87461280c86ba60f4564e5 Mon Sep 17 00:00:00 2001 From: James Hush Date: Wed, 27 Nov 2024 11:50:28 +0800 Subject: [PATCH 3/7] Add interruptions --- examples/foundational/race_bot.py | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/examples/foundational/race_bot.py b/examples/foundational/race_bot.py index af8e2834..3b24cc7d 100644 --- a/examples/foundational/race_bot.py +++ b/examples/foundational/race_bot.py @@ -14,6 +14,8 @@ from runner import configure from pipecat.frames.frames import ( + StartInterruptionFrame, + StopInterruptionFrame, TranscriptionFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame, @@ -72,13 +74,23 @@ async def main(): # participant joins. @transport.event_handler("on_first_participant_joined") async def on_first_participant_joined(transport, participant): - frames = [ - UserStartedSpeakingFrame(), - TranscriptionFrame("Tell a joke about dogs.", "user_id", time.time()), - UserStoppedSpeakingFrame(), - ] - - await task.queue_frames(frames) + # Create frames for 3 seconds + start_time = time.time() + while time.time() - start_time < 300: + timestamp = time.time() + frames = [ + UserStartedSpeakingFrame(), + TranscriptionFrame("Tell a joke about dogs.", "user_id", timestamp), + UserStoppedSpeakingFrame(), + ] + await task.queue_frames(frames) + await asyncio.sleep(5) # Small delay between frame sets + next_frames = [ + StartInterruptionFrame(), + TranscriptionFrame("Tell a joke about cats.", "user_id", timestamp), + StopInterruptionFrame(), + ] + await task.queue_frames(next_frames) await runner.run(task) From 632bae7eee590027acd94473d5d3cb0dda6ec434 Mon Sep 17 00:00:00 2001 From: James Hush Date: Wed, 27 Nov 2024 12:21:45 +0800 Subject: [PATCH 4/7] Interrupted? --- examples/foundational/race_bot.py | 64 ++++++++++++++++++++++++------- 1 file changed, 51 insertions(+), 13 deletions(-) diff --git a/examples/foundational/race_bot.py b/examples/foundational/race_bot.py index 3b24cc7d..62c26abd 100644 --- a/examples/foundational/race_bot.py +++ b/examples/foundational/race_bot.py @@ -14,6 +14,9 @@ from runner import configure from pipecat.frames.frames import ( + BotSpeakingFrame, + EndFrame, + Frame, StartInterruptionFrame, StopInterruptionFrame, TranscriptionFrame, @@ -24,13 +27,29 @@ from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineTask from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.services.cartesia import CartesiaTTSService from pipecat.services.openai import OpenAILLMService from pipecat.transports.services.daily import DailyParams, DailyTransport +logger.remove(0) logger.add(sys.stderr, level="DEBUG") +class DebugProcessor(FrameProcessor): + def __init__(self, name, **kwargs): + self._name = name + super().__init__(**kwargs) + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if not ( + isinstance(frame, BotSpeakingFrame) + ): + logger.debug(f"--- {self._name}: {frame} {direction}") + await self.push_frame(frame, direction) + + async def main(): async with aiohttp.ClientSession() as session: (room_url, _) = await configure(session) @@ -53,6 +72,8 @@ async def main(): }, ] + dp = DebugProcessor("dp") + context = OpenAILLMContext(messages) context_aggregator = llm.create_context_aggregator(context) @@ -61,6 +82,7 @@ async def main(): task = PipelineTask( Pipeline( [ + dp, context_aggregator.user(), llm, tts, @@ -74,23 +96,39 @@ async def main(): # participant joins. @transport.event_handler("on_first_participant_joined") async def on_first_participant_joined(transport, participant): - # Create frames for 3 seconds - start_time = time.time() - while time.time() - start_time < 300: - timestamp = time.time() - frames = [ + participant_id = participant.get("info", {}).get("participantId", "") + + await task.queue_frames( + [ UserStartedSpeakingFrame(), - TranscriptionFrame("Tell a joke about dogs.", "user_id", timestamp), + TranscriptionFrame("Tell a joke about dogs.", participant_id, time.time()), UserStoppedSpeakingFrame(), ] - await task.queue_frames(frames) + ) + # await asyncio.sleep(5) # Small delay between frame sets + + # Create frames for 60 seconds + start_time = time.time() + while time.time() - start_time < 30: + elapsed_time = round(time.time() - start_time) + logger.info(f"Running for {elapsed_time} seconds") await asyncio.sleep(5) # Small delay between frame sets - next_frames = [ - StartInterruptionFrame(), - TranscriptionFrame("Tell a joke about cats.", "user_id", timestamp), - StopInterruptionFrame(), - ] - await task.queue_frames(next_frames) + await task.queue_frames( + [ + StartInterruptionFrame(), + TranscriptionFrame("Tell a joke about cats.", participant_id, time.time()), + StopInterruptionFrame(), + ] + ) + await asyncio.sleep(5) # Small delay between frame sets + await task.queue_frames( + [ + StartInterruptionFrame(), + TranscriptionFrame("Tell a joke about dogs.", participant_id, time.time()), + StopInterruptionFrame(), + ] + ) + await task.queue_frame(EndFrame()) await runner.run(task) From 909bb305177027c08e87ebfde048b55b45dc76c0 Mon Sep 17 00:00:00 2001 From: James Hush Date: Wed, 27 Nov 2024 14:08:01 +0800 Subject: [PATCH 5/7] Better recreation --- examples/foundational/race_bot.py | 125 ++++++++++++++++++++---------- 1 file changed, 86 insertions(+), 39 deletions(-) diff --git a/examples/foundational/race_bot.py b/examples/foundational/race_bot.py index 62c26abd..1577f0ea 100644 --- a/examples/foundational/race_bot.py +++ b/examples/foundational/race_bot.py @@ -13,19 +13,16 @@ from loguru import logger from runner import configure -from pipecat.frames.frames import ( - BotSpeakingFrame, - EndFrame, - Frame, - StartInterruptionFrame, - StopInterruptionFrame, - TranscriptionFrame, - UserStartedSpeakingFrame, - UserStoppedSpeakingFrame, -) +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import (BotSpeakingFrame, EndFrame, Frame, + InputAudioRawFrame, StartInterruptionFrame, + StopInterruptionFrame, TextFrame, + TranscriptionFrame, TTSAudioRawFrame, + UserStartedSpeakingFrame, + UserStoppedSpeakingFrame) from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner -from pipecat.pipeline.task import PipelineTask +from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.services.cartesia import CartesiaTTSService @@ -44,7 +41,11 @@ def __init__(self, name, **kwargs): async def process_frame(self, frame: Frame, direction: FrameDirection): await super().process_frame(frame, direction) if not ( - isinstance(frame, BotSpeakingFrame) + isinstance(frame, InputAudioRawFrame) + or isinstance(frame, BotSpeakingFrame) + or isinstance(frame, UserStoppedSpeakingFrame) + or isinstance(frame, TTSAudioRawFrame) + or isinstance(frame, TextFrame) ): logger.debug(f"--- {self._name}: {frame} {direction}") await self.push_frame(frame, direction) @@ -55,9 +56,12 @@ async def main(): (room_url, _) = await configure(session) transport = DailyTransport( - room_url, None, "Say One Thing", DailyParams(audio_out_enabled=True) + room_url, None, "AI Bot", DailyParams( + audio_out_enabled=True, + transcription_enabled=True, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(),) ) - tts = CartesiaTTSService( api_key=os.getenv("CARTESIA_API_KEY"), voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady @@ -82,14 +86,18 @@ async def main(): task = PipelineTask( Pipeline( [ - dp, + # transport.input(), context_aggregator.user(), + dp, llm, tts, transport.output(), context_aggregator.assistant(), ] - ) + ), + PipelineParams( + allow_interruptions=True, + ), ) # Register an event handler so we can play the audio when the @@ -98,37 +106,76 @@ async def main(): async def on_first_participant_joined(transport, participant): participant_id = participant.get("info", {}).get("participantId", "") - await task.queue_frames( - [ - UserStartedSpeakingFrame(), - TranscriptionFrame("Tell a joke about dogs.", participant_id, time.time()), - UserStoppedSpeakingFrame(), - ] - ) - # await asyncio.sleep(5) # Small delay between frame sets - # Create frames for 60 seconds start_time = time.time() - while time.time() - start_time < 30: + while time.time() - start_time < 300: elapsed_time = round(time.time() - start_time) logger.info(f"Running for {elapsed_time} seconds") - await asyncio.sleep(5) # Small delay between frame sets - await task.queue_frames( - [ - StartInterruptionFrame(), - TranscriptionFrame("Tell a joke about cats.", participant_id, time.time()), - StopInterruptionFrame(), - ] + await task.queue_frame( + StartInterruptionFrame(), + ) + await asyncio.sleep(1) + + await task.queue_frame( + UserStartedSpeakingFrame(), + ) + + await asyncio.sleep(1) + + await task.queue_frame( + TranscriptionFrame("Tell a joke about dogs.", participant_id, time.time()), ) - await asyncio.sleep(5) # Small delay between frame sets + + await asyncio.sleep(1) + + await task.queue_frame( + StopInterruptionFrame(), + ) + + await asyncio.sleep(1) + + + await task.queue_frame( + UserStoppedSpeakingFrame(), + ) + + await asyncio.sleep(5) + + await task.queue_frame( + StartInterruptionFrame() + ) + await asyncio.sleep(1) + + await task.queue_frame( + UserStartedSpeakingFrame(), + ) + + await asyncio.sleep(1) + + await task.queue_frame( + TranscriptionFrame("Tell a joke about cats.", participant_id, time.time()), + ) + + await asyncio.sleep(1) + await task.queue_frames( - [ - StartInterruptionFrame(), - TranscriptionFrame("Tell a joke about dogs.", participant_id, time.time()), - StopInterruptionFrame(), - ] + StopInterruptionFrame(), + ) + + await asyncio.sleep(1) + await task.queue_frame( + UserStoppedSpeakingFrame(), ) + await asyncio.sleep(5) await task.queue_frame(EndFrame()) + + + # @transport.event_handler("on_first_participant_joined") + # async def on_first_participant_joined(transport, participant): + # await transport.capture_participant_transcription(participant["id"]) + # # Kick off the conversation. + # messages.append({"role": "system", "content": "Please introduce yourself to the user."}) + # await task.queue_frames([LLMMessagesFrame(messages)]) await runner.run(task) From f34e6bce94e5e51f9f1b9206c6a3930a6fa19319 Mon Sep 17 00:00:00 2001 From: James Hush Date: Wed, 27 Nov 2024 15:10:50 +0800 Subject: [PATCH 6/7] Switch questions --- examples/foundational/07-interruptible.py | 22 +++++- examples/foundational/race_bot.py | 83 ++++++++++++----------- 2 files changed, 66 insertions(+), 39 deletions(-) diff --git a/examples/foundational/07-interruptible.py b/examples/foundational/07-interruptible.py index 3148986a..e4577dc1 100644 --- a/examples/foundational/07-interruptible.py +++ b/examples/foundational/07-interruptible.py @@ -10,11 +10,12 @@ import sys from pipecat.audio.vad.silero import SileroVADAnalyzer -from pipecat.frames.frames import LLMMessagesFrame +from pipecat.frames.frames import BotSpeakingFrame, Frame, InputAudioRawFrame, LLMMessagesFrame, TTSAudioRawFrame, TextFrame, UserStoppedSpeakingFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.services.cartesia import CartesiaTTSService from pipecat.services.openai import OpenAILLMService from pipecat.transports.services.daily import DailyParams, DailyTransport @@ -30,6 +31,22 @@ logger.remove(0) logger.add(sys.stderr, level="DEBUG") +class DebugProcessor(FrameProcessor): + def __init__(self, name, **kwargs): + self._name = name + super().__init__(**kwargs) + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + if not ( + isinstance(frame, InputAudioRawFrame) + or isinstance(frame, BotSpeakingFrame) + or isinstance(frame, TTSAudioRawFrame) + or isinstance(frame, TextFrame) + ): + logger.debug(f"--- {self._name}: {frame} {direction}") + await self.push_frame(frame, direction) + async def main(): async with aiohttp.ClientSession() as session: @@ -63,11 +80,14 @@ async def main(): context = OpenAILLMContext(messages) context_aggregator = llm.create_context_aggregator(context) + + dp = DebugProcessor("dp") pipeline = Pipeline( [ transport.input(), # Transport user input context_aggregator.user(), # User responses + dp, llm, # LLM tts, # TTS transport.output(), # Transport bot output diff --git a/examples/foundational/race_bot.py b/examples/foundational/race_bot.py index 1577f0ea..9ee3f351 100644 --- a/examples/foundational/race_bot.py +++ b/examples/foundational/race_bot.py @@ -14,12 +14,19 @@ from runner import configure from pipecat.audio.vad.silero import SileroVADAnalyzer -from pipecat.frames.frames import (BotSpeakingFrame, EndFrame, Frame, - InputAudioRawFrame, StartInterruptionFrame, - StopInterruptionFrame, TextFrame, - TranscriptionFrame, TTSAudioRawFrame, - UserStartedSpeakingFrame, - UserStoppedSpeakingFrame) +from pipecat.frames.frames import ( + BotSpeakingFrame, + EndFrame, + Frame, + InputAudioRawFrame, + StartInterruptionFrame, + StopInterruptionFrame, + TextFrame, + TranscriptionFrame, + TTSAudioRawFrame, + UserStartedSpeakingFrame, + UserStoppedSpeakingFrame, +) from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask @@ -56,11 +63,15 @@ async def main(): (room_url, _) = await configure(session) transport = DailyTransport( - room_url, None, "AI Bot", DailyParams( - audio_out_enabled=True, + room_url, + None, + "AI Bot", + DailyParams( + audio_out_enabled=True, transcription_enabled=True, vad_enabled=True, - vad_analyzer=SileroVADAnalyzer(),) + vad_analyzer=SileroVADAnalyzer(), + ), ) tts = CartesiaTTSService( api_key=os.getenv("CARTESIA_API_KEY"), @@ -88,8 +99,8 @@ async def main(): [ # transport.input(), context_aggregator.user(), - dp, llm, + dp, tts, transport.output(), context_aggregator.assistant(), @@ -106,7 +117,7 @@ async def main(): async def on_first_participant_joined(transport, participant): participant_id = participant.get("info", {}).get("participantId", "") - # Create frames for 60 seconds + # Create frames for 600 seconds start_time = time.time() while time.time() - start_time < 300: elapsed_time = round(time.time() - start_time) @@ -114,62 +125,58 @@ async def on_first_participant_joined(transport, participant): await task.queue_frame( StartInterruptionFrame(), ) - await asyncio.sleep(1) - + await asyncio.sleep(1) + await task.queue_frame( UserStartedSpeakingFrame(), ) - - await asyncio.sleep(1) - + + await asyncio.sleep(1) + await task.queue_frame( - TranscriptionFrame("Tell a joke about dogs.", participant_id, time.time()), + TranscriptionFrame("Tell me more about your company.", participant_id, time.time()), ) - await asyncio.sleep(1) - + await asyncio.sleep(1) + await task.queue_frame( StopInterruptionFrame(), ) - await asyncio.sleep(1) + await asyncio.sleep(1) - await task.queue_frame( UserStoppedSpeakingFrame(), ) - - await asyncio.sleep(5) - - await task.queue_frame( - StartInterruptionFrame() - ) - await asyncio.sleep(1) - + + await asyncio.sleep(5) + + await task.queue_frame(StartInterruptionFrame()) + await asyncio.sleep(1) + await task.queue_frame( UserStartedSpeakingFrame(), ) - - await asyncio.sleep(1) - + + await asyncio.sleep(1) + await task.queue_frame( - TranscriptionFrame("Tell a joke about cats.", participant_id, time.time()), + TranscriptionFrame("Give me a list of appointment dates.", participant_id, time.time()), ) - await asyncio.sleep(1) - + await asyncio.sleep(1) + await task.queue_frames( StopInterruptionFrame(), ) - await asyncio.sleep(1) + await asyncio.sleep(1) await task.queue_frame( UserStoppedSpeakingFrame(), ) await asyncio.sleep(5) await task.queue_frame(EndFrame()) - - + # @transport.event_handler("on_first_participant_joined") # async def on_first_participant_joined(transport, participant): # await transport.capture_participant_transcription(participant["id"]) From 1884ff3f093d35013a5104bd70d49921b4c7cd8c Mon Sep 17 00:00:00 2001 From: James Hush Date: Wed, 27 Nov 2024 19:38:37 +0800 Subject: [PATCH 7/7] logging --- src/pipecat/services/cartesia.py | 6 ++++++ src/pipecat/transports/base_input.py | 2 ++ src/pipecat/transports/network/websocket_server.py | 1 + 3 files changed, 9 insertions(+) diff --git a/src/pipecat/services/cartesia.py b/src/pipecat/services/cartesia.py index ad4636a7..e4d0f00e 100644 --- a/src/pipecat/services/cartesia.py +++ b/src/pipecat/services/cartesia.py @@ -7,6 +7,7 @@ import asyncio import base64 import json +import random import uuid from typing import AsyncGenerator, List, Optional, Union @@ -222,6 +223,10 @@ async def flush_audio(self): async def _receive_task_handler(self): try: async for message in self._get_websocket(): + # Randomly cancel the asyncio task 1% of the time + if random.random() < 0.01: + logger.info(f"Cancelling task for {self} due to random chance") + asyncio.current_task().cancel() msg = json.loads(message) if not msg or msg["context_id"] != self._context_id: continue @@ -256,6 +261,7 @@ async def _receive_task_handler(self): logger.error(f"Cartesia error, unknown message type: {msg}") except asyncio.CancelledError: pass + # await self.push_error(ErrorFrame(f"{self} cancelled", True)) except Exception as e: logger.error(f"{self} exception: {e}") diff --git a/src/pipecat/transports/base_input.py b/src/pipecat/transports/base_input.py index 025a5bed..6fc24fde 100644 --- a/src/pipecat/transports/base_input.py +++ b/src/pipecat/transports/base_input.py @@ -71,6 +71,7 @@ def vad_analyzer(self) -> VADAnalyzer | None: return self._params.vad_analyzer async def push_audio_frame(self, frame: InputAudioRawFrame): + logger.info(f"Pushing audio qsize: {self._audio_in_queue.qsize()}") if self._params.audio_in_enabled or self._params.vad_enabled: await self._audio_in_queue.put(frame) @@ -167,6 +168,7 @@ async def _handle_vad(self, audio_frames: bytes, vad_state: VADState): return vad_state async def _audio_task_handler(self): + logger.info("_audio_task_handler started") vad_state: VADState = VADState.QUIET while True: try: diff --git a/src/pipecat/transports/network/websocket_server.py b/src/pipecat/transports/network/websocket_server.py index c0c8595e..b330f61f 100644 --- a/src/pipecat/transports/network/websocket_server.py +++ b/src/pipecat/transports/network/websocket_server.py @@ -106,6 +106,7 @@ async def _client_handler(self, websocket: websockets.WebSocketServerProtocol, p continue if isinstance(frame, AudioRawFrame): + logger.info("websocket_server") await self.push_audio_frame( InputAudioRawFrame( audio=frame.audio,