diff --git a/examples/exotel-websocket/bot.py b/examples/exotel-websocket/bot.py new file mode 100644 index 00000000..87b0285c --- /dev/null +++ b/examples/exotel-websocket/bot.py @@ -0,0 +1,130 @@ +import os +import sys + +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.frames.frames import EndFrame, LLMMessagesFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext +from pipecat.services.cartesia import CartesiaTTSService +from pipecat.services.elevenlabs import ElevenLabsTTSService +from pipecat.services.openai import OpenAILLMService +from pipecat.services.deepgram import DeepgramSTTService +from deepgram.clients.listen.v1.websocket.options import LiveOptions +from pipecat.transports.network.fastapi_websocket import ( + FastAPIWebsocketTransport, + FastAPIWebsocketParams, +) +from pipecat.serializers.twilio import TwilioFrameSerializer +# from exotel import ExotelFrameSerializer +from pipecat.serializers.livekit import LivekitFrameSerializer +from pipecat.serializers.protobuf import ProtobufFrameSerializer + +from loguru import logger + +from dotenv import load_dotenv +from custom.transport.ExotelWebsocketTransport import ExotelWebsocketTransport, ExotelWebsocketParams +from custom.serializers.exotel import ExotelFrameSerializer + +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + + +async def run_bot(websocket_client, stream_sid): + transport = FastAPIWebsocketTransport( + # transport = ExotelWebsocketTransport( + websocket=websocket_client, + input_name="input.pcm", + output_name="output.wav", + params=FastAPIWebsocketParams( + # params=ExotelWebsocketParams( + audio_out_enabled=True, + add_wav_header=True, + audio_in_enabled=True, + audio_in_channels=1, + audio_out_sample_rate=8000, + audio_out_bitrate=128000, + camera_out_enabled=False, + vad_enabled=True, + vad_analyzer=SileroVADAnalyzer(), + vad_audio_passthrough=True, + serializer=TwilioFrameSerializer(stream_sid=stream_sid, params=TwilioFrameSerializer.InputParams( + sample_rate=8000, + )), + # serializer=ExotelFrameSerializer(stream_sid=stream_sid, params=ExotelFrameSerializer.InputParams( + # sample_rate=8000, + # )), + ), + ) + + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") + stt = DeepgramSTTService( + api_key=os.getenv("DEEPGRAM_API_KEY"), + live_options=LiveOptions( + encoding="linear16", + channels=1, + model='nova-2-general', + punctuate=True, + interim_results=True, + endpointing=500, + utterance_end_ms=1000, + ) + ) +# | {'event': 'start', 'stream_sid': '3357459ca698d8c765bac10b0ec418bl', 'sequence_number': '1', 'start': {'stream_sid': '3357459ca698d8c765bac10b0ec418bl', 'call_sid': 'b0c8aea60aceb0007fc5a1bb75c218bl', 'account_sid': 'kritibudh1', 'from': '09992750105', 'to': '04446972319', 'media_format': {'encoding': 'base64', 'sample_rate': '8000', 'bit_rate': '128kbps'}}} + + # tts = CartesiaTTSService( + # api_key=os.getenv("CARTESIA_API_KEY"), + # voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady + # sample_rate=8000, + # encoding="pcm_s16le", + # container='raw', + # ) + + tts = ElevenLabsTTSService( + api_key=os.getenv("ELEVENLABS_API_KEY"), + voice_id=os.getenv("ELEVENLABS_VOICE_ID"), + model="eleven_multilingual_v2", + output_format="mp3_44100_64", + ) + # 'mp3_22050_32', 'mp3_44100_32', 'mp3_44100_64', + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in an audio call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.", + }, + ] + + context = OpenAILLMContext(messages) + context_aggregator = llm.create_context_aggregator(context) + + pipeline = Pipeline( + [ + transport.input(), # Websocket input from client + stt, # Speech-To-Text + context_aggregator.user(), + llm, # LLM + tts, # Text-To-Speech + transport.output(), # Websocket output to client + context_aggregator.assistant(), + ] + ) + + task = PipelineTask( + pipeline, params=PipelineParams(allow_interruptions=True)) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + messages.append( + {"role": "system", "content": "Please introduce yourself to the user."}) + await task.queue_frames([LLMMessagesFrame(messages)]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + await task.queue_frames([EndFrame()]) + + runner = PipelineRunner(handle_sigint=False) + await runner.run(task) diff --git a/examples/exotel-websocket/custom/serializers/exotel.py b/examples/exotel-websocket/custom/serializers/exotel.py new file mode 100644 index 00000000..3c7a04e2 --- /dev/null +++ b/examples/exotel-websocket/custom/serializers/exotel.py @@ -0,0 +1,53 @@ +from typing import Optional, Dict, Any +import json +import base64 +from dataclasses import dataclass + +from pydantic import BaseModel +from pipecat.audio.utils import ulaw_to_pcm, pcm_to_ulaw +from pipecat.frames.frames import AudioRawFrame, Frame, StartInterruptionFrame + +from pipecat.serializers.base_serializer import FrameSerializer +from pipecat.frames.frames import Frame +from livekit.rtc import AudioFrame + + +@dataclass +class ExotelFrameSerializer(FrameSerializer): + class InputParams(BaseModel): + sample_rate: int = 8000 + + def __init__(self, stream_sid: str, params: InputParams = InputParams()): + self._stream_sid = stream_sid + self._params = params + + def serialize(self, frame: Frame) -> str | bytes | None: + if isinstance(frame, AudioRawFrame): + data = frame.audio + # Convert PCM to ULaw for Exotel streaming requirement + serialized_data = pcm_to_ulaw( + data, frame.sample_rate, self._params.sample_rate) + payload = base64.b64encode(serialized_data).decode("utf-8") + answer = { + "event": "media", + "streamSid": self._stream_sid, + "media": {"payload": payload} + } + return json.dumps(answer) + elif isinstance(frame, StartInterruptionFrame): + answer = {"event": "clear", "streamSid": self._stream_sid} + return json.dumps(answer) + return None + + def deserialize(self, data: str | bytes) -> Frame | None: + message = json.loads(data) + if message["event"] != "media": + return None + else: + payload_base64 = message["media"]["payload"] + payload = base64.b64decode(payload_base64) + deserialized_data = ulaw_to_pcm( + payload, self._params.sample_rate, self._params.sample_rate) + audio_frame = AudioRawFrame( + audio=deserialized_data, num_channels=1, sample_rate=self._params.sample_rate) + return audio_frame diff --git a/examples/exotel-websocket/custom/transport/ExotelWebsocketTransport.py b/examples/exotel-websocket/custom/transport/ExotelWebsocketTransport.py new file mode 100644 index 00000000..6346f1bd --- /dev/null +++ b/examples/exotel-websocket/custom/transport/ExotelWebsocketTransport.py @@ -0,0 +1,221 @@ +import asyncio +import base64 +import json +import io +import time +import wave + +from typing import Awaitable, Callable +from pydantic.main import BaseModel + +from pipecat.frames.frames import ( + AudioRawFrame, + CancelFrame, + EndFrame, + Frame, + InputAudioRawFrame, + StartFrame, + StartInterruptionFrame, +) +from pipecat.processors.frame_processor import FrameDirection +from pipecat.serializers.base_serializer import FrameSerializer +from pipecat.transports.base_input import BaseInputTransport +from pipecat.transports.base_output import BaseOutputTransport +from pipecat.transports.base_transport import BaseTransport, TransportParams + +from loguru import logger + +try: + from fastapi import WebSocket + from starlette.websockets import WebSocketState +except ModuleNotFoundError as e: + logger.error(f"Exception: {e}") + logger.error( + "In order to use FastAPI websockets, you need to `pip install pipecat-ai[websocket]`." + ) + raise Exception(f"Missing module: {e}") + + +class ExotelWebsocketParams(TransportParams): + add_wav_header: bool = False + serializer: FrameSerializer + + +class ExotelWebsocketCallbacks(BaseModel): + on_client_connected: Callable[[WebSocket], Awaitable[None]] + on_client_disconnected: Callable[[WebSocket], Awaitable[None]] + + +class ExotelWebsocketInputTransport(BaseInputTransport): + def __init__( + self, + websocket: WebSocket, + params: ExotelWebsocketParams, + callbacks: ExotelWebsocketCallbacks, + **kwargs, + ): + super().__init__(params, **kwargs) + + self._websocket = websocket + self._params = params + self._callbacks = callbacks + + async def start(self, frame: StartFrame): + await super().start(frame) + await self._callbacks.on_client_connected(self._websocket) + self._receive_task = self.get_event_loop().create_task(self._receive_messages()) + + async def stop(self, frame: EndFrame): + await super().stop(frame) + if self._websocket.client_state != WebSocketState.DISCONNECTED: + await self._websocket.close() + + async def cancel(self, frame: CancelFrame): + await super().cancel(frame) + if self._websocket.client_state != WebSocketState.DISCONNECTED: + await self._websocket.close() + + async def _receive_messages(self): + async for message in self._websocket.iter_text(): + message_data = json.loads(message) + event = message_data.get("event") + + if event == "connected": + logger.debug("Exotel WebSocket connected event received.") + # Handle the connected event, e.g., initiate state or log information. + + elif event == "start": + logger.debug("Start event received.") + # Handle the start event, potentially log stream information. + + elif event == "media": + # logger.debug("Media event received, deserializing.") + frame = self._params.serializer.deserialize(message) + if isinstance(frame, AudioRawFrame): + await self.push_audio_frame( + InputAudioRawFrame( + audio=frame.audio, + sample_rate=frame.sample_rate, + num_channels=frame.num_channels, + ) + ) + + elif event == "dtmf": + logger.debug("DTMF event received.") + # Handle DTMF tones if necessary for the voicebot. + + elif event == "stop": + logger.debug("Stop event received.") + await self._callbacks.on_client_disconnected(self._websocket) + break + + await self._callbacks.on_client_disconnected(self._websocket) + + +class ExotelWebsocketOutputTransport(BaseOutputTransport): + def __init__(self, websocket: WebSocket, params: ExotelWebsocketParams, **kwargs): + super().__init__(params, **kwargs) + + self._websocket = websocket + self._params = params + self._send_interval = (self._audio_chunk_size / + self._params.audio_out_sample_rate) / 2 + self._next_send_time = 0 + + async def process_frame(self, frame: Frame, direction: FrameDirection): + await super().process_frame(frame, direction) + + if isinstance(frame, StartInterruptionFrame): + await self._write_frame(frame) + self._next_send_time = 0 + + async def write_raw_audio_frames(self, frames: bytes): + frame = AudioRawFrame( + audio=frames, + sample_rate=self._params.audio_out_sample_rate, + num_channels=self._params.audio_out_channels, + ) + + if self._params.add_wav_header: + content = io.BytesIO() + ww = wave.open(content, "wb") + ww.setsampwidth(2) + ww.setnchannels(frame.num_channels) + ww.setframerate(frame.sample_rate) + ww.writeframes(frame.audio) + ww.close() + content.seek(0) + wav_frame = AudioRawFrame( + content.read(), sample_rate=frame.sample_rate, num_channels=frame.num_channels + ) + frame = wav_frame + + payload = self._params.serializer.serialize(frame) + if payload and self._websocket.client_state == WebSocketState.CONNECTED: + message = { + "event": "media", + "stream_sid": "", # This should be managed based on the session + "media": { + # Placeholder timestamp + "timestamp": str(int(time.time() * 1000)), + "payload": base64.b64encode(frame.audio).decode('utf-8') + } + } + await self._websocket.send_text(json.dumps(message)) + + # Simulate a clock. + current_time = time.monotonic() + sleep_duration = max(0, self._next_send_time - current_time) + await asyncio.sleep(sleep_duration) + if sleep_duration == 0: + self._next_send_time = time.monotonic() + self._send_interval + else: + self._next_send_time += self._send_interval + + self._websocket_audio_buffer = bytes() + + async def _write_frame(self, frame: Frame): + payload = self._params.serializer.serialize(frame) + if payload and self._websocket.client_state == WebSocketState.CONNECTED: + await self._websocket.send_text(payload) + + +class ExotelWebsocketTransport(BaseTransport): + def __init__( + self, + websocket: WebSocket, + params: ExotelWebsocketParams, + input_name: str | None = None, + output_name: str | None = None, + loop: asyncio.AbstractEventLoop | None = None, + ): + super().__init__(input_name=input_name, output_name=output_name, loop=loop) + self._params = params + + self._callbacks = ExotelWebsocketCallbacks( + on_client_connected=self._on_client_connected, + on_client_disconnected=self._on_client_disconnected, + ) + + self._input = ExotelWebsocketInputTransport( + websocket, self._params, self._callbacks, name=self._input_name + ) + self._output = ExotelWebsocketOutputTransport( + websocket, self._params, name=self._output_name + ) + + # Register supported handlers. + self._register_event_handler("on_client_connected") + self._register_event_handler("on_client_disconnected") + + def input(self) -> ExotelWebsocketInputTransport: + return self._input + + def output(self) -> ExotelWebsocketOutputTransport: + return self._output + + async def _on_client_connected(self, websocket): + await self._call_event_handler("on_client_connected", websocket) + + async def _on_client_disconnected(self, websocket): + await self._call_event_handler("on_client_disconnected", websocket) diff --git a/examples/exotel-websocket/main.py b/examples/exotel-websocket/main.py new file mode 100644 index 00000000..924babc5 --- /dev/null +++ b/examples/exotel-websocket/main.py @@ -0,0 +1,53 @@ +import json + +import uvicorn + +from fastapi import FastAPI, WebSocket +from fastapi.middleware.cors import CORSMiddleware +from starlette.responses import HTMLResponse + +from bot import run_bot + +app = FastAPI() + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], # Allow all origins for testing + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + + +@app.websocket("/phonebot") +async def websocket_endpoint(websocket: WebSocket): + print(f"Incoming connection from {websocket.client}") + try: + await websocket.accept() + print("Connection accepted") + + # Wait for the "connected" event + connected_data = await websocket.receive_json() + if connected_data.get("event") != "connected": + raise ValueError("Expected 'connected' event") + + # Wait for the "start" event + start_data = await websocket.receive_json() + if start_data.get("event") != "start": + raise ValueError("Expected 'start' event") + + # Extract stream_sid from the start event + stream_sid = start_data.get("stream_sid") + if not stream_sid: + raise ValueError("Missing stream_sid in start event") + + print(f"Starting bot with stream_sid: {stream_sid}") + await run_bot(websocket, stream_sid) + + except Exception as e: + print(f"Error during websocket handling: {e}") + raise + + +if __name__ == "__main__": + uvicorn.run(app, host="0.0.0.0", port=8765)