Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add logging and error handling for issue #721 #750

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion examples/foundational/07-interruptible.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@
import sys

from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.frames.frames import BotSpeakingFrame, Frame, InputAudioRawFrame, LLMMessagesFrame, TTSAudioRawFrame, TextFrame, UserStoppedSpeakingFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
Expand All @@ -30,6 +31,22 @@
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")

class DebugProcessor(FrameProcessor):
def __init__(self, name, **kwargs):
self._name = name
super().__init__(**kwargs)

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if not (
isinstance(frame, InputAudioRawFrame)
or isinstance(frame, BotSpeakingFrame)
or isinstance(frame, TTSAudioRawFrame)
or isinstance(frame, TextFrame)
):
logger.debug(f"--- {self._name}: {frame} {direction}")
await self.push_frame(frame, direction)


async def main():
async with aiohttp.ClientSession() as session:
Expand Down Expand Up @@ -63,11 +80,14 @@ async def main():

context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)

dp = DebugProcessor("dp")

pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User responses
dp,
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
Expand Down
191 changes: 191 additions & 0 deletions examples/foundational/race_bot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

import asyncio
import os
import sys
import time

import aiohttp
from loguru import logger
from runner import configure

from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
BotSpeakingFrame,
EndFrame,
Frame,
InputAudioRawFrame,
StartInterruptionFrame,
StopInterruptionFrame,
TextFrame,
TranscriptionFrame,
TTSAudioRawFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport

logger.remove(0)
logger.add(sys.stderr, level="DEBUG")


class DebugProcessor(FrameProcessor):
def __init__(self, name, **kwargs):
self._name = name
super().__init__(**kwargs)

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if not (
isinstance(frame, InputAudioRawFrame)
or isinstance(frame, BotSpeakingFrame)
or isinstance(frame, UserStoppedSpeakingFrame)
or isinstance(frame, TTSAudioRawFrame)
or isinstance(frame, TextFrame)
):
logger.debug(f"--- {self._name}: {frame} {direction}")
await self.push_frame(frame, direction)


async def main():
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)

transport = DailyTransport(
room_url,
None,
"AI Bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)

llm = OpenAILLMService(api_key=os.environ["OPENAI_API_KEY"], model="gpt-4o")

messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]

dp = DebugProcessor("dp")

context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)

runner = PipelineRunner()

task = PipelineTask(
Pipeline(
[
# transport.input(),
context_aggregator.user(),
llm,
dp,
tts,
transport.output(),
context_aggregator.assistant(),
]
),
PipelineParams(
allow_interruptions=True,
),
)

# Register an event handler so we can play the audio when the
# participant joins.
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
participant_id = participant.get("info", {}).get("participantId", "")

# Create frames for 600 seconds
start_time = time.time()
while time.time() - start_time < 300:
elapsed_time = round(time.time() - start_time)
logger.info(f"Running for {elapsed_time} seconds")
await task.queue_frame(
StartInterruptionFrame(),
)
await asyncio.sleep(1)

await task.queue_frame(
UserStartedSpeakingFrame(),
)

await asyncio.sleep(1)

await task.queue_frame(
TranscriptionFrame("Tell me more about your company.", participant_id, time.time()),
)

await asyncio.sleep(1)

await task.queue_frame(
StopInterruptionFrame(),
)

await asyncio.sleep(1)

await task.queue_frame(
UserStoppedSpeakingFrame(),
)

await asyncio.sleep(5)

await task.queue_frame(StartInterruptionFrame())
await asyncio.sleep(1)

await task.queue_frame(
UserStartedSpeakingFrame(),
)

await asyncio.sleep(1)

await task.queue_frame(
TranscriptionFrame("Give me a list of appointment dates.", participant_id, time.time()),
)

await asyncio.sleep(1)

await task.queue_frames(
StopInterruptionFrame(),
)

await asyncio.sleep(1)
await task.queue_frame(
UserStoppedSpeakingFrame(),
)
await asyncio.sleep(5)
await task.queue_frame(EndFrame())

# @transport.event_handler("on_first_participant_joined")
# async def on_first_participant_joined(transport, participant):
# await transport.capture_participant_transcription(participant["id"])
# # Kick off the conversation.
# messages.append({"role": "system", "content": "Please introduce yourself to the user."})
# await task.queue_frames([LLMMessagesFrame(messages)])

await runner.run(task)


if __name__ == "__main__":
asyncio.run(main())
6 changes: 6 additions & 0 deletions src/pipecat/services/cartesia.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import asyncio
import base64
import json
import random
import uuid
from typing import AsyncGenerator, List, Optional, Union

Expand Down Expand Up @@ -222,6 +223,10 @@ async def flush_audio(self):
async def _receive_task_handler(self):
try:
async for message in self._get_websocket():
# Randomly cancel the asyncio task 1% of the time
if random.random() < 0.01:
logger.info(f"Cancelling task for {self} due to random chance")
asyncio.current_task().cancel()
msg = json.loads(message)
if not msg or msg["context_id"] != self._context_id:
continue
Expand Down Expand Up @@ -256,6 +261,7 @@ async def _receive_task_handler(self):
logger.error(f"Cartesia error, unknown message type: {msg}")
except asyncio.CancelledError:
pass
# await self.push_error(ErrorFrame(f"{self} cancelled", True))
except Exception as e:
logger.error(f"{self} exception: {e}")

Expand Down
2 changes: 2 additions & 0 deletions src/pipecat/transports/base_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def vad_analyzer(self) -> VADAnalyzer | None:
return self._params.vad_analyzer

async def push_audio_frame(self, frame: InputAudioRawFrame):
logger.info(f"Pushing audio qsize: {self._audio_in_queue.qsize()}")
if self._params.audio_in_enabled or self._params.vad_enabled:
await self._audio_in_queue.put(frame)

Expand Down Expand Up @@ -167,6 +168,7 @@ async def _handle_vad(self, audio_frames: bytes, vad_state: VADState):
return vad_state

async def _audio_task_handler(self):
logger.info("_audio_task_handler started")
vad_state: VADState = VADState.QUIET
while True:
try:
Expand Down
1 change: 1 addition & 0 deletions src/pipecat/transports/network/websocket_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ async def _client_handler(self, websocket: websockets.WebSocketServerProtocol, p
continue

if isinstance(frame, AudioRawFrame):
logger.info("websocket_server")
await self.push_audio_frame(
InputAudioRawFrame(
audio=frame.audio,
Expand Down
Loading