From 9cc9ba49361b7e25696d6732551cb37c0928eb4f Mon Sep 17 00:00:00 2001 From: Patrick Li Date: Sat, 2 Nov 2024 19:22:02 -0700 Subject: [PATCH 01/14] Diva audio solver --- evals/registry/solvers/diva.yaml | 4 + .../providers/diva/diva_local_gpu_solver.py | 249 ++++++++++++++++++ 2 files changed, 253 insertions(+) create mode 100644 evals/registry/solvers/diva.yaml create mode 100644 evals/solvers/providers/diva/diva_local_gpu_solver.py diff --git a/evals/registry/solvers/diva.yaml b/evals/registry/solvers/diva.yaml new file mode 100644 index 0000000000..441e1c502f --- /dev/null +++ b/evals/registry/solvers/diva.yaml @@ -0,0 +1,4 @@ +generation/gpu/diva-llama-3-v0-8b: + class: evals.solvers.providers.diva.diva_local_gpu_solver:DivaLocalGPUSolver + args: + model_name: WillHeld/DiVA-llama-3-v0-8b \ No newline at end of file diff --git a/evals/solvers/providers/diva/diva_local_gpu_solver.py b/evals/solvers/providers/diva/diva_local_gpu_solver.py new file mode 100644 index 0000000000..65528aedaf --- /dev/null +++ b/evals/solvers/providers/diva/diva_local_gpu_solver.py @@ -0,0 +1,249 @@ +import base64 +import dataclasses +import io +import logging +import queue +import threading +import time +import traceback +import torch +import torch.distributed +import torch.multiprocessing as mp +import librosa +import transformers +from urllib.request import urlopen +from concurrent import futures +from concurrent.futures import ProcessPoolExecutor +from typing import Any, Callable, Dict, List, Optional, TypeVar +from evals.solvers.solver import Solver, SolverResult +from evals.task_state import TaskState + +SAMPLE_RATE = 16000 +DEFAULT_MAX_BATCH_SIZE = 32 + +class DivaLocalGPUSolver(Solver): + """ + A solver class for running the DiVA model in parallel across multiple GPUs. + Uses BatchedProcessPoolExecutor for efficient batch processing. + + Args: + model_name: str - The model name/path (default: "WillHeld/DiVA-llama-3-v0-8b") + num_gpus: int - Number of GPUs to use (default: all available) + max_batch_size: int - Maximum batch size for inference + extra_options: Dict[str, Any] - Additional options for model generation + postprocessors: list[str] - List of postprocessors to apply + registry: Any - Registry object for the solver + """ + + def __init__( + self, + model_name: str = "WillHeld/DiVA-llama-3-v0-8b", + num_gpus: int = torch.cuda.device_count(), + max_batch_size: int = DEFAULT_MAX_BATCH_SIZE, + extra_options: Optional[Dict[str, Any]] = None, + postprocessors: list[str] = [], + registry: Any = None, + ): + super().__init__(postprocessors=postprocessors, registry=registry) + + self.model_name = model_name + self.extra_options = extra_options or {} + + # Set up multiprocessing + mp.set_start_method("spawn", force=True) + rank_queue = mp.Queue() + rank_queue.put(0) # Start with primary GPU + + self.executor = BatchedProcessPoolExecutor( + max_workers=max(1, num_gpus), + max_batch_size=int(max_batch_size), + initializer=solver_initializer, + initargs=(rank_queue, num_gpus, model_name), + batch_worker_fn=solver_worker, + ) + + def copy(self): + return self + + def _process_audio_content(self, content: list) -> tuple[list, list]: + """Process audio content from message parts.""" + audios = [] + prompts = [] + + for part in content: + if part["type"] == "audio_url": + if isinstance(part["audio_url"], dict) and "url" in part["audio_url"]: + audio_data = part["audio_url"]["url"].split(",")[1] + audio_stream = io.BytesIO(base64.b64decode(audio_data)) + else: + audio_stream = io.BytesIO(urlopen(part["audio_url"]).read()) + + audio = librosa.load(audio_stream, sr=SAMPLE_RATE)[0] + audios.append(audio) + + elif part["type"] == "text": + prompts.append(part["text"]) + + return audios, prompts + + def _solve(self, task_state: TaskState, **kwargs) -> SolverResult: + inputs = {"audios": [], "prompts": []} + + # Process the last message if it contains audio + if not isinstance(task_state.messages[-1].content, str): + audios, prompts = self._process_audio_content(task_state.messages[-1].content) + inputs["audios"].extend(audios) + inputs["prompts"].extend(prompts) if prompts else inputs["prompts"].extend([None] * len(audios)) + + # Submit to executor and get result + completion_output = self.executor.submit(inputs).result() + return SolverResult(completion_output) + + def __del__(self): + if hasattr(self, "executor"): + self.executor.shutdown() + + +def solver_initializer( + rank_queue: mp.Queue, + world_size: int, + model_name: str, +): + """Initialize the model on the specified GPU.""" + rank = rank_queue.get() + + if torch.cuda.is_available(): + device = torch.device("cuda", rank) + else: + device = torch.device("cpu") + + global model + model = transformers.AutoModel.from_pretrained( + model_name, + device_map="auto" if device.type == "cuda" else None, + trust_remote_code=True + ) + + if device.type == "cpu": + model = model.to(device) + + if rank == 0: + # Let other initializers start after model is downloaded + for i in range(1, world_size): + rank_queue.put(i) + + +def solver_worker(inputs: List[Dict[str, Any]]) -> List[str]: + """Process a batch of inputs using the model.""" + batch_audios = [] + batch_prompts = [] + + # Process each input in the batch + for input_item in inputs: + batch_audios.extend(input_item["audios"]) + batch_prompts.extend(input_item["prompts"]) + + # Generate responses using DiVA's interface + responses = model.generate( + batch_audios, + batch_prompts if any(batch_prompts) else None + ) + + return responses + + + +T_In = TypeVar("T_In") +T_Out = TypeVar("T_Out") + +# Reuse the BatchedProcessPoolExecutor and related classes from the original implementation +@dataclasses.dataclass +class BatchableWorkItem: + request: T_In + future: futures.Future + + +class BatchedProcessPoolExecutor: + def __init__( + self, + *args, + batch_worker_fn: Callable[[List[T_In]], List[T_Out]], + max_batch_size: int, + max_workers: int = 1, + **kwargs + ): + self.max_batch_size = max_batch_size + self.batch_worker_fn = batch_worker_fn + self._batch_queue = queue.Queue() + self.available_workers = threading.Semaphore(value=max_workers + 1) + self.process_pool_executor = ProcessPoolExecutor( + *args, max_workers=max_workers, **kwargs + ) + self._batch_thread = threading.Thread(target=self.batch_requests) + self._batch_thread.start() + + def submit(self, request: T_In) -> futures.Future: + item = BatchableWorkItem(request, futures.Future()) + self._batch_queue.put(item) + return item.future + + def shutdown(self): + self.process_pool_executor.shutdown() + while not self._batch_queue.empty(): + try: + item = self._batch_queue.get(block=False) + if item is not None: + item.future.set_exception(Exception("The pool has already shut down.")) + except queue.Empty: + pass + self._batch_queue.put(None) + + def batch_requests(self): + time.sleep(1) + while True: + self.available_workers.acquire() + work_items: List[BatchableWorkItem] = [self._batch_queue.get()] + + while len(work_items) < self.max_batch_size: + try: + item = self._batch_queue.get(block=False) + work_items.append(item) + except queue.Empty: + break + + if work_items[-1] is None: + if len(work_items) > 1: + logging.warn( + "There remained work items in the queue when shutting down. The items will be ignored." + ) + return + + requests = [item.request for item in work_items] + task_futures = [item.future for item in work_items] + + try: + result_future = self.process_pool_executor.submit(self.batch_worker_fn, requests) + except Exception as e: + self._handle_exception(e, task_futures) + return + + result_future.add_done_callback(_set_results_cb(task_futures, self._handle_exception)) + result_future.add_done_callback(lambda _: self.available_workers.release()) + + def _handle_exception(self, e: Exception, task_futures: List[futures.Future]): + print(traceback.format_exc()) + for f in task_futures: + if not f.done(): + f.set_exception(e) + self.shutdown() + + +def _set_results_cb(task_futures: List[futures.Future], handle_exception_cb: Callable): + def cb(batch_future: futures.Future): + try: + for f, r in zip(task_futures, batch_future.result()): + f.set_result(r) + except Exception as e: + handle_exception_cb(e, task_futures) + + return cb \ No newline at end of file From 250d22305ed6c36a2323b48f4a78bc2482022248 Mon Sep 17 00:00:00 2001 From: Patrick Li Date: Sat, 2 Nov 2024 22:40:44 -0700 Subject: [PATCH 02/14] Print out completion --- evals/solvers/providers/diva/diva_local_gpu_solver.py | 1 + 1 file changed, 1 insertion(+) diff --git a/evals/solvers/providers/diva/diva_local_gpu_solver.py b/evals/solvers/providers/diva/diva_local_gpu_solver.py index 65528aedaf..41eba15e5e 100644 --- a/evals/solvers/providers/diva/diva_local_gpu_solver.py +++ b/evals/solvers/providers/diva/diva_local_gpu_solver.py @@ -97,6 +97,7 @@ def _solve(self, task_state: TaskState, **kwargs) -> SolverResult: # Submit to executor and get result completion_output = self.executor.submit(inputs).result() + print("completion_output: \n", completion_output, "\n\n") return SolverResult(completion_output) def __del__(self): From 2c46421d5e2da1ac39c6e995bb6e146dab5ca430 Mon Sep 17 00:00:00 2001 From: Patrick Li Date: Sun, 3 Nov 2024 00:27:47 -0700 Subject: [PATCH 03/14] Update name --- evals/solvers/providers/diva/diva_local_gpu_solver.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/evals/solvers/providers/diva/diva_local_gpu_solver.py b/evals/solvers/providers/diva/diva_local_gpu_solver.py index 41eba15e5e..858c704e30 100644 --- a/evals/solvers/providers/diva/diva_local_gpu_solver.py +++ b/evals/solvers/providers/diva/diva_local_gpu_solver.py @@ -37,7 +37,7 @@ class DivaLocalGPUSolver(Solver): def __init__( self, - model_name: str = "WillHeld/DiVA-llama-3-v0-8b", + model_name: str, num_gpus: int = torch.cuda.device_count(), max_batch_size: int = DEFAULT_MAX_BATCH_SIZE, extra_options: Optional[Dict[str, Any]] = None, From bfee2ee08bd720ff6fdb429f9a9be0ae9bcc3b8c Mon Sep 17 00:00:00 2001 From: Patrick Li Date: Mon, 4 Nov 2024 11:08:07 -0800 Subject: [PATCH 04/14] UpdatE --- evals/registry/eval_sets/audio.yaml | 12 ++++++++++++ .../solvers/providers/diva/diva_local_gpu_solver.py | 9 --------- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/evals/registry/eval_sets/audio.yaml b/evals/registry/eval_sets/audio.yaml index daba301bad..3e65f327c4 100644 --- a/evals/registry/eval_sets/audio.yaml +++ b/evals/registry/eval_sets/audio.yaml @@ -22,6 +22,18 @@ audio-core: - audio-translate-covost-tr_en - audio-translate-covost-sv-SE_en +audio-core-translate: + evals: + - audio-translate-covost-en_ca + - audio-translate-covost-en_ar + - audio-translate-covost-en_tr + - audio-translate-covost-en_sv-SE + - audio-translate-covost-ru_en + - audio-translate-covost-es_en + - audio-translate-covost-zh-CN_en + - audio-translate-covost-en_de + + transcript-core: evals: - transcript-transcribe diff --git a/evals/solvers/providers/diva/diva_local_gpu_solver.py b/evals/solvers/providers/diva/diva_local_gpu_solver.py index 858c704e30..428667f42b 100644 --- a/evals/solvers/providers/diva/diva_local_gpu_solver.py +++ b/evals/solvers/providers/diva/diva_local_gpu_solver.py @@ -113,21 +113,12 @@ def solver_initializer( """Initialize the model on the specified GPU.""" rank = rank_queue.get() - if torch.cuda.is_available(): - device = torch.device("cuda", rank) - else: - device = torch.device("cpu") - global model model = transformers.AutoModel.from_pretrained( model_name, - device_map="auto" if device.type == "cuda" else None, trust_remote_code=True ) - if device.type == "cpu": - model = model.to(device) - if rank == 0: # Let other initializers start after model is downloaded for i in range(1, world_size): From 21e628b14c1db2bf2efecac356fea6db67752a49 Mon Sep 17 00:00:00 2001 From: Patrick Li Date: Mon, 4 Nov 2024 15:32:36 -0800 Subject: [PATCH 05/14] Remove omni stuff --- evals/registry/solvers/llamaomni.yaml | 4 ++++ evals/solvers/providers/llamaomni/llamaomni | 1 + .../solvers/providers/llamaomni/llamaomni_local_gpu_solver.py | 0 pyproject.toml | 1 + 4 files changed, 6 insertions(+) create mode 100644 evals/registry/solvers/llamaomni.yaml create mode 160000 evals/solvers/providers/llamaomni/llamaomni create mode 100644 evals/solvers/providers/llamaomni/llamaomni_local_gpu_solver.py diff --git a/evals/registry/solvers/llamaomni.yaml b/evals/registry/solvers/llamaomni.yaml new file mode 100644 index 0000000000..b559f903bf --- /dev/null +++ b/evals/registry/solvers/llamaomni.yaml @@ -0,0 +1,4 @@ +generation/gpu/meta-llama-3-8b-instruct: + class: evals.solvers.providers.llamaomni.llamaomni_local_gpu_solver:LlamaOmniLocalGPUSolver + args: + model_path: Llama-3.1-8B-Omni diff --git a/evals/solvers/providers/llamaomni/llamaomni b/evals/solvers/providers/llamaomni/llamaomni new file mode 160000 index 0000000000..7ec4f18868 --- /dev/null +++ b/evals/solvers/providers/llamaomni/llamaomni @@ -0,0 +1 @@ +Subproject commit 7ec4f1886853adff889cca8c29ebdb0f0b6d026d diff --git a/evals/solvers/providers/llamaomni/llamaomni_local_gpu_solver.py b/evals/solvers/providers/llamaomni/llamaomni_local_gpu_solver.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/pyproject.toml b/pyproject.toml index cc183533a6..3441348ff1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,6 +52,7 @@ dependencies = [ "transformers[torch]", "peft", "librosa", + "openai-whisper @ git+https://github.com/openai/whisper.git", ] [project.urls] From 7e41b5135b2ecfa672cab79e188ff471d40e763c Mon Sep 17 00:00:00 2001 From: Patrick Li Date: Mon, 4 Nov 2024 17:49:33 -0800 Subject: [PATCH 06/14] As far as i can get --- evals/registry/solvers/llamaomni.yaml | 4 +- .../llamaomni/llamaomni_local_gpu_solver.py | 334 ++++++++++++++++++ 2 files changed, 336 insertions(+), 2 deletions(-) diff --git a/evals/registry/solvers/llamaomni.yaml b/evals/registry/solvers/llamaomni.yaml index b559f903bf..a688f66046 100644 --- a/evals/registry/solvers/llamaomni.yaml +++ b/evals/registry/solvers/llamaomni.yaml @@ -1,4 +1,4 @@ -generation/gpu/meta-llama-3-8b-instruct: +generation/gpu/llamaomni: class: evals.solvers.providers.llamaomni.llamaomni_local_gpu_solver:LlamaOmniLocalGPUSolver args: - model_path: Llama-3.1-8B-Omni + model_name: ICTNLP/Llama-3.1-8B-Omni diff --git a/evals/solvers/providers/llamaomni/llamaomni_local_gpu_solver.py b/evals/solvers/providers/llamaomni/llamaomni_local_gpu_solver.py index e69de29bb2..6f98bb1d0b 100644 --- a/evals/solvers/providers/llamaomni/llamaomni_local_gpu_solver.py +++ b/evals/solvers/providers/llamaomni/llamaomni_local_gpu_solver.py @@ -0,0 +1,334 @@ +import base64 +import dataclasses +import io +import logging +import queue +import threading +import time +import traceback +import torch +import torch.distributed +import torch.multiprocessing as mp +import librosa +import transformers +import whisper +from urllib.request import urlopen +from concurrent import futures +from concurrent.futures import ProcessPoolExecutor +from typing import Any, Callable, Dict, List, Optional, TypeVar +from evals.solvers.solver import Solver, SolverResult +from evals.task_state import TaskState +import numpy as np +import tempfile +import os +from .llamaomni.omni_speech.model.builder import load_pretrained_model +from .llamaomni.omni_speech.datasets.preprocess import tokenizer_speech_token +from .llamaomni.omni_speech.conversation import conv_templates + +SAMPLE_RATE = 16000 +DEFAULT_MAX_BATCH_SIZE = 1 + +class LlamaOmniLocalGPUSolver(Solver): + """ + A solver class for running the LlamaOmni model on CPU or multiple GPUs. + Uses BatchedProcessPoolExecutor for efficient batch processing. + + Args: + model_name: str - The model name/path + device: str - Device to run on ('cpu' or 'cuda') + num_gpus: int - Number of GPUs to use (default: all available if device='cuda') + max_batch_size: int - Maximum batch size for inference + extra_options: Dict[str, Any] - Additional options for model generation + postprocessors: list[str] - List of postprocessors to apply + registry: Any - Registry object for the solver + """ + + def __init__( + self, + model_name: str, + device: str = "cuda" if torch.cuda.is_available() else "cpu", + num_gpus: int = None, + max_batch_size: int = DEFAULT_MAX_BATCH_SIZE, + extra_options: Optional[Dict[str, Any]] = None, + postprocessors: list[str] = [], + registry: Any = None, + ): + super().__init__(postprocessors=postprocessors, registry=registry) + + self.model_name = model_name + self.device = device + self.extra_options = extra_options or {} + + # Set number of workers based on device + if device == "cuda": + num_gpus = num_gpus or torch.cuda.device_count() + num_workers = max(1, num_gpus) + else: + num_workers = 1 + + # Set up multiprocessing + mp.set_start_method("spawn", force=True) + rank_queue = mp.Queue() + rank_queue.put(0) # Start with primary worker + + self.executor = BatchedProcessPoolExecutor( + max_workers=num_workers, + max_batch_size=int(max_batch_size), + initializer=solver_initializer, + initargs=(rank_queue, num_workers, model_name, device), + batch_worker_fn=solver_worker, + ) + + def copy(self): + return self + + def _process_audio_content(self, content: list) -> tuple[list, list]: + """Process audio content from message parts.""" + audios = [] + prompts = [] + + for part in content: + if part["type"] == "audio_url": + # Handle base64 encoded audio + audio_data = part["audio_url"]["url"].split(",")[1] + audio_bytes = base64.b64decode(audio_data) + + # Create a temporary file + with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as temp_file: + temp_file.write(audio_bytes) + temp_path = temp_file.name + + try: + # Load using whisper's load_audio function + audio_array = whisper.load_audio(temp_path) + audios.append(audio_array) + finally: + # Clean up the temporary file + os.unlink(temp_path) + + elif part["type"] == "text": + prompts.append(part["text"]) + + return audios, prompts + + def _solve(self, task_state: TaskState, **kwargs) -> SolverResult: + inputs = {"audios": [], "prompts": []} + + # Process the last message if it contains audio + if not isinstance(task_state.messages[-1].content, str): + audios, prompts = self._process_audio_content(task_state.messages[-1].content) + inputs["audios"].extend(audios) + inputs["prompts"].extend(prompts) if prompts else inputs["prompts"].extend([None] * len(audios)) + + # Submit to executor and get result + completion_output = self.executor.submit(inputs).result() + return SolverResult(completion_output) + + def __del__(self): + if hasattr(self, "executor"): + self.executor.shutdown() + + +def solver_initializer( + rank_queue: mp.Queue, + world_size: int, + model_name: str, + device: str, +): + """Initialize the LlamaOmni model on the specified device.""" + rank = rank_queue.get() + + # Set device based on configuration + if device == "cuda": + device = f"cuda:{rank}" + else: + device = "cpu" + + global model, tokenizer, context_len + + # Initialize model using load_pretrained_model + tokenizer, model, context_len = load_pretrained_model( + model_path=model_name, + model_base=None, # Can be made configurable through extra_options + is_lora=False, # Can be made configurable through extra_options + s2s=False, # Can be made configurable through extra_options + device=device + ) + + # Move model to appropriate device if needed + if device != "cuda:0": # load_pretrained_model typically uses cuda:0 by default + model = model.to(device) + + if rank == 0: + # Let other initializers start after model is downloaded + for i in range(1, world_size): + rank_queue.put(i) + + +def solver_worker(inputs: List[Dict[str, Any]]) -> List[str]: + """Process a batch of inputs using the LlamaOmni model.""" + print("inputs", inputs) + batch_audios = [] + batch_prompts = [] + + # Process each input in the batch + for input_item in inputs: + batch_audios.extend(input_item["audios"]) + batch_prompts.extend(input_item["prompts"]) + + # Create conversation prompts like in the reference code + processed_prompts = [] + for prompt in batch_prompts: + conv = conv_templates["v1"].copy() + conv.append_message(conv.roles[0], prompt) + conv.append_message(conv.roles[1], None) + processed_prompts.append(conv.get_prompt()) + + # Process input_ids with the processed prompts + input_ids_list = [] + for prompt in processed_prompts: + ids = tokenizer_speech_token(prompt, tokenizer, return_tensors='pt') + input_ids_list.append(ids) + + # Process audio similarly to reference code + speech_list = [] + for audio in batch_audios: + speech = whisper.pad_or_trim(audio.astype(np.float32)) # Ensure float32 + mel = whisper.log_mel_spectrogram(speech, n_mels=128) + # Convert to float32 tensor and permute dimensions + speech_tensor = torch.from_numpy(mel.numpy()).float().permute(1, 0) + speech_list.append(speech_tensor) + + speech_lengths = [torch.LongTensor([audio.shape[0]]) for audio in batch_audios] + + input_ids = torch.stack(input_ids_list, dim=0) + speech_tensors = torch.stack(speech_list, dim=0) + speech_lengths = torch.stack(speech_lengths, dim=0) + + device = "cuda" if torch.cuda.is_available() else "cpu" + + input_ids = input_ids.to(device=device, dtype=torch.long, non_blocking=True) + speech_tensors = speech_tensors.to(device=device, dtype=torch.float32, non_blocking=True) + speech_lengths = speech_lengths.to(device=device, dtype=torch.long, non_blocking=True) + + print("input_ids", input_ids) + print("speech_tensors", speech_tensors) + print("speech_lengths", speech_lengths) + + print("input ids shape", input_ids.shape) + print("speech tensors shape", speech_tensors.shape) + print("speech lengths shape", speech_lengths.shape) + + # Generate responses using LlamaOmni's interface + outputs = model.generate( + input_ids, + speech=speech_tensors, + speech_lengths=speech_lengths, + do_sample=True, + temperature=0.7, + top_p=0.9, + num_beams=1, + max_new_tokens=256, + use_cache=True, + pad_token_id=128004, + ) + + # Decode responses + decoded_responses = tokenizer.batch_decode(outputs, skip_special_tokens=True) + return decoded_responses + + +# Reuse the BatchedProcessPoolExecutor implementation from DiVA +T_In = TypeVar("T_In") +T_Out = TypeVar("T_Out") + +@dataclasses.dataclass +class BatchableWorkItem: + request: T_In + future: futures.Future + + +class BatchedProcessPoolExecutor: + def __init__( + self, + *args, + batch_worker_fn: Callable[[List[T_In]], List[T_Out]], + max_batch_size: int, + max_workers: int = 1, + **kwargs + ): + self.max_batch_size = max_batch_size + self.batch_worker_fn = batch_worker_fn + self._batch_queue = queue.Queue() + self.available_workers = threading.Semaphore(value=max_workers + 1) + self.process_pool_executor = ProcessPoolExecutor( + *args, max_workers=max_workers, **kwargs + ) + self._batch_thread = threading.Thread(target=self.batch_requests) + self._batch_thread.start() + + def submit(self, request: T_In) -> futures.Future: + item = BatchableWorkItem(request, futures.Future()) + self._batch_queue.put(item) + return item.future + + def shutdown(self): + self.process_pool_executor.shutdown() + while not self._batch_queue.empty(): + try: + item = self._batch_queue.get(block=False) + if item is not None: + item.future.set_exception(Exception("The pool has already shut down.")) + except queue.Empty: + pass + self._batch_queue.put(None) + + def batch_requests(self): + time.sleep(1) + while True: + self.available_workers.acquire() + work_items: List[BatchableWorkItem] = [self._batch_queue.get()] + + while len(work_items) < self.max_batch_size: + try: + item = self._batch_queue.get(block=False) + work_items.append(item) + except queue.Empty: + break + + if work_items[-1] is None: + if len(work_items) > 1: + logging.warn( + "There remained work items in the queue when shutting down. The items will be ignored." + ) + return + + requests = [item.request for item in work_items] + task_futures = [item.future for item in work_items] + + try: + result_future = self.process_pool_executor.submit(self.batch_worker_fn, requests) + except Exception as e: + self._handle_exception(e, task_futures) + return + + result_future.add_done_callback(_set_results_cb(task_futures, self._handle_exception)) + result_future.add_done_callback(lambda _: self.available_workers.release()) + + def _handle_exception(self, e: Exception, task_futures: List[futures.Future]): + print(traceback.format_exc()) + for f in task_futures: + if not f.done(): + f.set_exception(e) + self.shutdown() + + +def _set_results_cb(task_futures: List[futures.Future], handle_exception_cb: Callable): + def cb(batch_future: futures.Future): + try: + for f, r in zip(task_futures, batch_future.result()): + f.set_result(r) + except Exception as e: + handle_exception_cb(e, task_futures) + + return cb From cb7ae64d271a2d6c519ff5f913d8e42dfa209aaa Mon Sep 17 00:00:00 2001 From: Patrick Li Date: Mon, 4 Nov 2024 21:03:00 -0800 Subject: [PATCH 07/14] Update --- evals/registry/eval_sets/audio.yaml | 8 +- evals/registry/solvers/llamaomni.yaml | 9 + .../solvers/providers/google/gemini_solver.py | 11 +- .../llamaomni/llamaomni_replicate_solver.py | 160 ++++++++++++++++++ .../providers/llamaomni/requirements.txt | 1 + pyproject.toml | 1 + 6 files changed, 182 insertions(+), 8 deletions(-) create mode 100644 evals/solvers/providers/llamaomni/llamaomni_replicate_solver.py create mode 100644 evals/solvers/providers/llamaomni/requirements.txt diff --git a/evals/registry/eval_sets/audio.yaml b/evals/registry/eval_sets/audio.yaml index 3e65f327c4..53e4907cf7 100644 --- a/evals/registry/eval_sets/audio.yaml +++ b/evals/registry/eval_sets/audio.yaml @@ -22,16 +22,18 @@ audio-core: - audio-translate-covost-tr_en - audio-translate-covost-sv-SE_en -audio-core-translate: +audio-core-translate: evals: + - audio-translate-covost-en_de - audio-translate-covost-en_ca - audio-translate-covost-en_ar - audio-translate-covost-en_tr - audio-translate-covost-en_sv-SE - audio-translate-covost-ru_en - - audio-translate-covost-es_en + - audio-translate-covost-es_en - audio-translate-covost-zh-CN_en - - audio-translate-covost-en_de + - audio-translate-covost-sv-SE_en + - audio-translate-covost-tr_en transcript-core: diff --git a/evals/registry/solvers/llamaomni.yaml b/evals/registry/solvers/llamaomni.yaml index a688f66046..009e64c4ca 100644 --- a/evals/registry/solvers/llamaomni.yaml +++ b/evals/registry/solvers/llamaomni.yaml @@ -2,3 +2,12 @@ generation/gpu/llamaomni: class: evals.solvers.providers.llamaomni.llamaomni_local_gpu_solver:LlamaOmniLocalGPUSolver args: model_name: ICTNLP/Llama-3.1-8B-Omni + + +generation/replicate/llamaomni: + class: evals.solvers.providers.llamaomni.llamaomni_replicate_solver:LlamaOmniReplicateSolver + args: + model_name: ictnlp/llama-omni + model_version: 36c9bcf70a56f40d9a27445c30c769308b18180296749f86ec9b682baf7ad351 + generation_config: {} + \ No newline at end of file diff --git a/evals/solvers/providers/google/gemini_solver.py b/evals/solvers/providers/google/gemini_solver.py index 33a8a93a25..ed8f9b2a2e 100644 --- a/evals/solvers/providers/google/gemini_solver.py +++ b/evals/solvers/providers/google/gemini_solver.py @@ -147,11 +147,12 @@ def _solve( else: raise e - record_sampling( - prompt=msgs, - sampled=[solver_result.output], - model=self.model, - ) + # record_sampling( + # prompt=msgs, + # sampled=[solver_result.output], + # model=self.model, + # ) + print("completion", solver_result.output) return solver_result @staticmethod diff --git a/evals/solvers/providers/llamaomni/llamaomni_replicate_solver.py b/evals/solvers/providers/llamaomni/llamaomni_replicate_solver.py new file mode 100644 index 0000000000..5bad6ec480 --- /dev/null +++ b/evals/solvers/providers/llamaomni/llamaomni_replicate_solver.py @@ -0,0 +1,160 @@ +import copy +import os +import base64 +import replicate +from typing import Any, Dict, Union +from dataclasses import dataclass + +from evals.record import record_sampling +from evals.solvers.solver import Solver, SolverResult +from evals.task_state import Message, TaskState +from evals.utils.api_utils import create_retrying + +# Load API key from environment variable +REPLICATE_API_TOKEN = os.environ.get("REPLICATE_API_TOKEN") +if REPLICATE_API_TOKEN is None: + raise ValueError("REPLICATE_API_TOKEN environment variable not set") + +MODEL_VERSION = "36c9bcf70a56f40d9a27445c30c769308b18180296749f86ec9b682baf7ad351" + +@dataclass +class ReplicateInput: + input_audio: str # base64 encoded audio data URI + prompt: str = "" # optional text prompt + + def to_dict(self): + return { + "input_audio": self.input_audio, + "prompt": self.prompt if self.prompt else None + } + + +class LlamaOmniReplicateSolver(Solver): + """ + A solver class that uses Replicate's API to run LlamaOmni model. + """ + + def __init__( + self, + model_name: str = "ictnlp/llama-omni", + model_version: str = MODEL_VERSION, + generation_config: Dict[str, Any] = {}, + postprocessors: list[str] = [], + registry: Any = None, + ): + super().__init__(postprocessors=postprocessors) + print("args", model_name, model_version, generation_config) + self._model_name = model_name + self._model_version = model_version + self.gen_config = generation_config + + @property + def model_version(self) -> str: + return self._model_version + + @model_version.setter + def model_version(self, value: str): + self._model_version = value + + @property + def model_name(self) -> str: + return self._model_name + + @model_name.setter + def model_name(self, value: str): + self._model_name = value + + def _process_audio_content(self, content: list) -> tuple[str, str]: + """Process audio content from message parts.""" + print("Processing audio content:", type(content)) + audio_uri = None + prompt = None + + for part in content: + print("Processing part:", type(part)) + if isinstance(part, dict): # Handle dict format + if part.get("type") == "audio_url": + audio_uri = part["audio_url"]["url"] + elif part.get("type") == "text": + prompt = part["text"] + elif hasattr(part, "type"): # Handle Message object format + if part.type == "audio_url": + audio_uri = part.audio_url.url + elif part.type == "text": + prompt = part.text + + return audio_uri, prompt + + def _solve( + self, + task_state: TaskState, + **kwargs, + ) -> SolverResult: + print("\nSolving task with last message:", type(task_state.messages[-1])) + + # Process the last message if it contains audio + last_message = task_state.messages[-1] + if hasattr(last_message, "content") and not isinstance(last_message.content, str): + audio_uri, prompt = self._process_audio_content(last_message.content) + print("Extracted audio_uri:", audio_uri is not None, "prompt:", prompt is not None) + if audio_uri is None: + return SolverResult("No audio content found", error="No audio content") + else: + return SolverResult("No audio content found", error="No audio content") + + # Prepare input for Replicate API + replicate_input = ReplicateInput( + input_audio=audio_uri, + prompt=prompt + ).to_dict() + + try: + # Call Replicate API + output = replicate.run( + f"{self.model_name}:{self.model_version}", + input=replicate_input + ) + + # Extract text response + if isinstance(output, dict) and "text" in output: + solver_result = SolverResult(output["text"]) + else: + solver_result = SolverResult(str(output)) + + except Exception as e: + solver_result = SolverResult( + str(e), + error=e, + ) + + print("completion", solver_result.output) + + # # Record the sampling + # record_sampling( + # prompt=task_state.messages, + # sampled=[solver_result.output], + # model=self.model, + # ) + return solver_result + + @property + def name(self) -> str: + return f"{self.model_name}:{self.model_version}" + + @property + def model(self) -> str: + return self.name + + def __deepcopy__(self, memo): + """Create a deep copy of the solver.""" + cls = self.__class__ + result = cls.__new__(cls) + memo[id(self)] = result + + for k, v in self.__dict__.items(): + setattr(result, k, copy.deepcopy(v, memo)) + + return result + + + \ No newline at end of file diff --git a/evals/solvers/providers/llamaomni/requirements.txt b/evals/solvers/providers/llamaomni/requirements.txt new file mode 100644 index 0000000000..3c05820132 --- /dev/null +++ b/evals/solvers/providers/llamaomni/requirements.txt @@ -0,0 +1 @@ +replicate>=0.22.0 \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 3441348ff1..115a2ff57a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -53,6 +53,7 @@ dependencies = [ "peft", "librosa", "openai-whisper @ git+https://github.com/openai/whisper.git", + "replicate", ] [project.urls] From fd49b0769f390fc6075263accea3c1e5486c2c90 Mon Sep 17 00:00:00 2001 From: Patrick Li Date: Mon, 4 Nov 2024 23:21:35 -0800 Subject: [PATCH 08/14] Update --- .../llamaomni/llamaomni_local_gpu_solver.py | 31 +++++++++---------- .../llamaomni/llamaomni_replicate_solver.py | 4 --- pyproject.toml | 2 +- 3 files changed, 16 insertions(+), 21 deletions(-) diff --git a/evals/solvers/providers/llamaomni/llamaomni_local_gpu_solver.py b/evals/solvers/providers/llamaomni/llamaomni_local_gpu_solver.py index 6f98bb1d0b..dfd964926a 100644 --- a/evals/solvers/providers/llamaomni/llamaomni_local_gpu_solver.py +++ b/evals/solvers/providers/llamaomni/llamaomni_local_gpu_solver.py @@ -193,31 +193,30 @@ def solver_worker(inputs: List[Dict[str, Any]]) -> List[str]: # Process audio similarly to reference code speech_list = [] for audio in batch_audios: - speech = whisper.pad_or_trim(audio.astype(np.float32)) # Ensure float32 + # Convert to float32 at the very beginning + audio = audio.astype(np.float32) + speech = whisper.pad_or_trim(audio) mel = whisper.log_mel_spectrogram(speech, n_mels=128) - # Convert to float32 tensor and permute dimensions - speech_tensor = torch.from_numpy(mel.numpy()).float().permute(1, 0) + # Convert to tensor and ensure float32 dtype + speech_tensor = torch.tensor(mel.numpy(), dtype=torch.float32).permute(1, 0) speech_list.append(speech_tensor) - speech_lengths = [torch.LongTensor([audio.shape[0]]) for audio in batch_audios] + speech_lengths = [torch.tensor([audio.shape[0]], dtype=torch.long) for audio in batch_audios] input_ids = torch.stack(input_ids_list, dim=0) speech_tensors = torch.stack(speech_list, dim=0) speech_lengths = torch.stack(speech_lengths, dim=0) - device = "cuda" if torch.cuda.is_available() else "cpu" + device = next(model.parameters()).device # Get device from model - input_ids = input_ids.to(device=device, dtype=torch.long, non_blocking=True) - speech_tensors = speech_tensors.to(device=device, dtype=torch.float32, non_blocking=True) - speech_lengths = speech_lengths.to(device=device, dtype=torch.long, non_blocking=True) - - print("input_ids", input_ids) - print("speech_tensors", speech_tensors) - print("speech_lengths", speech_lengths) + # Move tensors to device and ensure correct dtypes + input_ids = input_ids.to(device=device, dtype=torch.long) + speech_tensors = speech_tensors.to(device=device, dtype=torch.float32) + speech_lengths = speech_lengths.to(device=device, dtype=torch.long) - print("input ids shape", input_ids.shape) - print("speech tensors shape", speech_tensors.shape) - print("speech lengths shape", speech_lengths.shape) + # Ensure model parameters are float32 + model.float() # Convert all model parameters to float32 + # Generate responses using LlamaOmni's interface outputs = model.generate( @@ -228,7 +227,7 @@ def solver_worker(inputs: List[Dict[str, Any]]) -> List[str]: temperature=0.7, top_p=0.9, num_beams=1, - max_new_tokens=256, + max_new_tokens=5, use_cache=True, pad_token_id=128004, ) diff --git a/evals/solvers/providers/llamaomni/llamaomni_replicate_solver.py b/evals/solvers/providers/llamaomni/llamaomni_replicate_solver.py index 5bad6ec480..ab964881e9 100644 --- a/evals/solvers/providers/llamaomni/llamaomni_replicate_solver.py +++ b/evals/solvers/providers/llamaomni/llamaomni_replicate_solver.py @@ -66,12 +66,10 @@ def model_name(self, value: str): def _process_audio_content(self, content: list) -> tuple[str, str]: """Process audio content from message parts.""" - print("Processing audio content:", type(content)) audio_uri = None prompt = None for part in content: - print("Processing part:", type(part)) if isinstance(part, dict): # Handle dict format if part.get("type") == "audio_url": audio_uri = part["audio_url"]["url"] @@ -90,13 +88,11 @@ def _solve( task_state: TaskState, **kwargs, ) -> SolverResult: - print("\nSolving task with last message:", type(task_state.messages[-1])) # Process the last message if it contains audio last_message = task_state.messages[-1] if hasattr(last_message, "content") and not isinstance(last_message.content, str): audio_uri, prompt = self._process_audio_content(last_message.content) - print("Extracted audio_uri:", audio_uri is not None, "prompt:", prompt is not None) if audio_uri is None: return SolverResult("No audio content found", error="No audio content") else: diff --git a/pyproject.toml b/pyproject.toml index 115a2ff57a..7394fc2e61 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,7 +52,7 @@ dependencies = [ "transformers[torch]", "peft", "librosa", - "openai-whisper @ git+https://github.com/openai/whisper.git", + "openai-whisper", "replicate", ] From b0326172a568625d9cc620ecd2969de68076d0de Mon Sep 17 00:00:00 2001 From: Patrick Li Date: Mon, 4 Nov 2024 23:41:08 -0800 Subject: [PATCH 09/14] Update --- evals/registry/solvers/llamaomni.yaml | 9 +- .../llamaomni/llamaomni_replicate_solver.py | 195 ++++++------------ 2 files changed, 72 insertions(+), 132 deletions(-) diff --git a/evals/registry/solvers/llamaomni.yaml b/evals/registry/solvers/llamaomni.yaml index 009e64c4ca..45fac57e0f 100644 --- a/evals/registry/solvers/llamaomni.yaml +++ b/evals/registry/solvers/llamaomni.yaml @@ -7,7 +7,10 @@ generation/gpu/llamaomni: generation/replicate/llamaomni: class: evals.solvers.providers.llamaomni.llamaomni_replicate_solver:LlamaOmniReplicateSolver args: - model_name: ictnlp/llama-omni - model_version: 36c9bcf70a56f40d9a27445c30c769308b18180296749f86ec9b682baf7ad351 - generation_config: {} + deployment_owner: "lipatrick" + deployment_name: "llama-omni" + extra_options: + temperature: 0.7 + top_p: 0.9 + max_new_tokens: 256 \ No newline at end of file diff --git a/evals/solvers/providers/llamaomni/llamaomni_replicate_solver.py b/evals/solvers/providers/llamaomni/llamaomni_replicate_solver.py index ab964881e9..7ff205ccc3 100644 --- a/evals/solvers/providers/llamaomni/llamaomni_replicate_solver.py +++ b/evals/solvers/providers/llamaomni/llamaomni_replicate_solver.py @@ -1,156 +1,93 @@ -import copy -import os import base64 import replicate -from typing import Any, Dict, Union -from dataclasses import dataclass - -from evals.record import record_sampling +import os +from typing import Any, Dict, Optional, List from evals.solvers.solver import Solver, SolverResult -from evals.task_state import Message, TaskState -from evals.utils.api_utils import create_retrying - -# Load API key from environment variable -REPLICATE_API_TOKEN = os.environ.get("REPLICATE_API_TOKEN") -if REPLICATE_API_TOKEN is None: - raise ValueError("REPLICATE_API_TOKEN environment variable not set") - -MODEL_VERSION = "36c9bcf70a56f40d9a27445c30c769308b18180296749f86ec9b682baf7ad351" - -@dataclass -class ReplicateInput: - input_audio: str # base64 encoded audio data URI - prompt: str = "" # optional text prompt - - def to_dict(self): - return { - "input_audio": self.input_audio, - "prompt": self.prompt if self.prompt else None - } - +from evals.task_state import TaskState class LlamaOmniReplicateSolver(Solver): """ - A solver class that uses Replicate's API to run LlamaOmni model. + A solver class for running LlamaOmni model using Replicate deployment. + + Args: + deployment_owner: str - Owner of the deployment (e.g. "lipatrick") + deployment_name: str - Name of the deployment (e.g. "llama-omni") + api_token: Optional[str] - Replicate API token. If not provided, will use REPLICATE_API_TOKEN env var + extra_options: Optional[Dict[str, Any]] - Additional options for model generation + postprocessors: list[str] - List of postprocessors to apply + registry: Any - Registry object for the solver """ def __init__( self, - model_name: str = "ictnlp/llama-omni", - model_version: str = MODEL_VERSION, - generation_config: Dict[str, Any] = {}, + deployment_owner: str, + deployment_name: str, + api_token: Optional[str] = None, + extra_options: Optional[Dict[str, Any]] = None, postprocessors: list[str] = [], registry: Any = None, ): - super().__init__(postprocessors=postprocessors) - print("args", model_name, model_version, generation_config) - self._model_name = model_name - self._model_version = model_version - self.gen_config = generation_config - - @property - def model_version(self) -> str: - return self._model_version - - @model_version.setter - def model_version(self, value: str): - self._model_version = value - - @property - def model_name(self) -> str: - return self._model_name - - @model_name.setter - def model_name(self, value: str): - self._model_name = value + super().__init__(postprocessors=postprocessors, registry=registry) + + self.deployment_owner = deployment_owner + self.deployment_name = deployment_name + self.api_token = api_token or os.environ.get("REPLICATE_API_TOKEN") + if not self.api_token: + raise ValueError("Replicate API token must be provided either through api_token parameter or REPLICATE_API_TOKEN environment variable") + + self.extra_options = extra_options or {} + self.client = replicate.Client(api_token=self.api_token) def _process_audio_content(self, content: list) -> tuple[str, str]: """Process audio content from message parts.""" - audio_uri = None + audio_data = None prompt = None for part in content: - if isinstance(part, dict): # Handle dict format - if part.get("type") == "audio_url": - audio_uri = part["audio_url"]["url"] - elif part.get("type") == "text": - prompt = part["text"] - elif hasattr(part, "type"): # Handle Message object format - if part.type == "audio_url": - audio_uri = part.audio_url.url - elif part.type == "text": - prompt = part.text + if part["type"] == "audio_url": + # Get base64 encoded audio + audio_data = part["audio_url"]["url"] + elif part["type"] == "text": + prompt = part["text"] - return audio_uri, prompt + return audio_data, prompt - def _solve( - self, - task_state: TaskState, - **kwargs, - ) -> SolverResult: - + def _solve(self, task_state: TaskState, **kwargs) -> SolverResult: # Process the last message if it contains audio - last_message = task_state.messages[-1] - if hasattr(last_message, "content") and not isinstance(last_message.content, str): - audio_uri, prompt = self._process_audio_content(last_message.content) - if audio_uri is None: - return SolverResult("No audio content found", error="No audio content") - else: - return SolverResult("No audio content found", error="No audio content") - - # Prepare input for Replicate API - replicate_input = ReplicateInput( - input_audio=audio_uri, - prompt=prompt - ).to_dict() - - try: - # Call Replicate API - output = replicate.run( - f"{self.model_name}:{self.model_version}", - input=replicate_input - ) - - # Extract text response - if isinstance(output, dict) and "text" in output: - solver_result = SolverResult(output["text"]) - else: - solver_result = SolverResult(str(output)) - - except Exception as e: - solver_result = SolverResult( - str(e), - error=e, + if not isinstance(task_state.messages[-1].content, str): + audio_data, prompt = self._process_audio_content(task_state.messages[-1].content) + + if not audio_data: + raise ValueError("No audio data found in the message") + + # Create prediction using deployment + prediction = self.client.deployments.predictions.create( + self.deployment_owner, + self.deployment_name, + input={ + "input_audio": audio_data, + "prompt": prompt or "", + **self.extra_options + } ) - - print("completion", solver_result.output) - - # # Record the sampling - # record_sampling( - # prompt=task_state.messages, - # sampled=[solver_result.output], - # model=self.model, - # ) - return solver_result - - @property - def name(self) -> str: - return f"{self.model_name}:{self.model_version}" - - @property - def model(self) -> str: - return self.name - - def __deepcopy__(self, memo): - """Create a deep copy of the solver.""" - cls = self.__class__ - result = cls.__new__(cls) - memo[id(self)] = result - - for k, v in self.__dict__.items(): - setattr(result, k, copy.deepcopy(v, memo)) - return result + # Wait for prediction to complete + prediction = self.client.wait(prediction) + print("prediction:", prediction) + + return SolverResult(prediction.output) + + return SolverResult("") + + def copy(self): + return LlamaOmniReplicateSolver( + deployment_owner=self.deployment_owner, + deployment_name=self.deployment_name, + api_token=self.api_token, + extra_options=self.extra_options, + postprocessors=self.postprocessors, + registry=self.registry + ) \ No newline at end of file From 056d2b25f1bfc3f0a1af966e4070cf508f1002d3 Mon Sep 17 00:00:00 2001 From: Patrick Li Date: Tue, 5 Nov 2024 00:00:58 -0800 Subject: [PATCH 10/14] Update --- evals/registry/solvers/llamaomni.yaml | 4 +-- .../llamaomni/llamaomni_replicate_solver.py | 33 ++++++++----------- 2 files changed, 15 insertions(+), 22 deletions(-) diff --git a/evals/registry/solvers/llamaomni.yaml b/evals/registry/solvers/llamaomni.yaml index 45fac57e0f..80b19d513b 100644 --- a/evals/registry/solvers/llamaomni.yaml +++ b/evals/registry/solvers/llamaomni.yaml @@ -10,7 +10,7 @@ generation/replicate/llamaomni: deployment_owner: "lipatrick" deployment_name: "llama-omni" extra_options: - temperature: 0.7 - top_p: 0.9 + temperature: 0 + top_p: 0 max_new_tokens: 256 \ No newline at end of file diff --git a/evals/solvers/providers/llamaomni/llamaomni_replicate_solver.py b/evals/solvers/providers/llamaomni/llamaomni_replicate_solver.py index 7ff205ccc3..a4df5b56e0 100644 --- a/evals/solvers/providers/llamaomni/llamaomni_replicate_solver.py +++ b/evals/solvers/providers/llamaomni/llamaomni_replicate_solver.py @@ -36,7 +36,9 @@ def __init__( raise ValueError("Replicate API token must be provided either through api_token parameter or REPLICATE_API_TOKEN environment variable") self.extra_options = extra_options or {} - self.client = replicate.Client(api_token=self.api_token) + + # Get deployment directly + self.deployment = replicate.deployments.get(f"{deployment_owner}/{deployment_name}") def _process_audio_content(self, content: list) -> tuple[str, str]: """Process audio content from message parts.""" @@ -60,34 +62,25 @@ def _solve(self, task_state: TaskState, **kwargs) -> SolverResult: if not audio_data: raise ValueError("No audio data found in the message") + # Create input dictionary with all parameters + input_data = { + "input_audio": audio_data, + "prompt": prompt or "", + **self.extra_options + } + # Create prediction using deployment - prediction = self.client.deployments.predictions.create( - self.deployment_owner, - self.deployment_name, - input={ - "input_audio": audio_data, - "prompt": prompt or "", - **self.extra_options - } + prediction = self.deployment.predictions.create( + input=input_data ) # Wait for prediction to complete - prediction = self.client.wait(prediction) + prediction.wait() print("prediction:", prediction) return SolverResult(prediction.output) return SolverResult("") - def copy(self): - return LlamaOmniReplicateSolver( - deployment_owner=self.deployment_owner, - deployment_name=self.deployment_name, - api_token=self.api_token, - extra_options=self.extra_options, - postprocessors=self.postprocessors, - registry=self.registry - ) - \ No newline at end of file From abcf579ea1f86b2726379005ef43aaf7122ae7bb Mon Sep 17 00:00:00 2001 From: Patrick Li Date: Tue, 5 Nov 2024 00:16:09 -0800 Subject: [PATCH 11/14] Update --- .../llamaomni/llamaomni_replicate_solver.py | 37 +++++++++++++++++-- 1 file changed, 34 insertions(+), 3 deletions(-) diff --git a/evals/solvers/providers/llamaomni/llamaomni_replicate_solver.py b/evals/solvers/providers/llamaomni/llamaomni_replicate_solver.py index a4df5b56e0..f6a6045627 100644 --- a/evals/solvers/providers/llamaomni/llamaomni_replicate_solver.py +++ b/evals/solvers/providers/llamaomni/llamaomni_replicate_solver.py @@ -1,7 +1,8 @@ import base64 +import copy import replicate import os -from typing import Any, Dict, Optional, List +from typing import Any, Dict, Optional, List, Union from evals.solvers.solver import Solver, SolverResult from evals.task_state import TaskState @@ -76,11 +77,41 @@ def _solve(self, task_state: TaskState, **kwargs) -> SolverResult: # Wait for prediction to complete prediction.wait() - print("prediction:", prediction) - return SolverResult(prediction.output) + # Extract text from output dictionary + if isinstance(prediction.output, dict) and 'text' in prediction.output: + result = prediction.output['text'] + else: + result = str(prediction.output) + + print("output:", result) + return SolverResult(result) return SolverResult("") + @property + def name(self) -> str: + return f"replicate-{self.deployment_owner}-{self.deployment_name}" + + @property + def model_version(self) -> Union[str, dict]: + return f"{self.deployment_owner}/{self.deployment_name}" + + def __deepcopy__(self, memo): + """ + Deepcopy everything except for self.deployment, which is instead shared across all copies + """ + cls = self.__class__ + result = cls.__new__(cls) + memo[id(self)] = result + + for k, v in self.__dict__.items(): + if k != "deployment": + setattr(result, k, copy.deepcopy(v, memo)) + + # Share the deployment instance across copies + result.deployment = self.deployment + return result + \ No newline at end of file From 3ee57a57bb4839e17ec3400a1d02ce628408d1ea Mon Sep 17 00:00:00 2001 From: Patrick Li Date: Fri, 8 Nov 2024 17:16:30 -0800 Subject: [PATCH 12/14] Clean up --- evals/registry/solvers/diva.yaml | 4 - .../providers/diva/diva_local_gpu_solver.py | 241 ------------- .../solvers/providers/google/gemini_solver.py | 6 - .../llamaomni/llamaomni_local_gpu_solver.py | 333 ------------------ 4 files changed, 584 deletions(-) delete mode 100644 evals/registry/solvers/diva.yaml delete mode 100644 evals/solvers/providers/diva/diva_local_gpu_solver.py delete mode 100644 evals/solvers/providers/llamaomni/llamaomni_local_gpu_solver.py diff --git a/evals/registry/solvers/diva.yaml b/evals/registry/solvers/diva.yaml deleted file mode 100644 index 441e1c502f..0000000000 --- a/evals/registry/solvers/diva.yaml +++ /dev/null @@ -1,4 +0,0 @@ -generation/gpu/diva-llama-3-v0-8b: - class: evals.solvers.providers.diva.diva_local_gpu_solver:DivaLocalGPUSolver - args: - model_name: WillHeld/DiVA-llama-3-v0-8b \ No newline at end of file diff --git a/evals/solvers/providers/diva/diva_local_gpu_solver.py b/evals/solvers/providers/diva/diva_local_gpu_solver.py deleted file mode 100644 index 428667f42b..0000000000 --- a/evals/solvers/providers/diva/diva_local_gpu_solver.py +++ /dev/null @@ -1,241 +0,0 @@ -import base64 -import dataclasses -import io -import logging -import queue -import threading -import time -import traceback -import torch -import torch.distributed -import torch.multiprocessing as mp -import librosa -import transformers -from urllib.request import urlopen -from concurrent import futures -from concurrent.futures import ProcessPoolExecutor -from typing import Any, Callable, Dict, List, Optional, TypeVar -from evals.solvers.solver import Solver, SolverResult -from evals.task_state import TaskState - -SAMPLE_RATE = 16000 -DEFAULT_MAX_BATCH_SIZE = 32 - -class DivaLocalGPUSolver(Solver): - """ - A solver class for running the DiVA model in parallel across multiple GPUs. - Uses BatchedProcessPoolExecutor for efficient batch processing. - - Args: - model_name: str - The model name/path (default: "WillHeld/DiVA-llama-3-v0-8b") - num_gpus: int - Number of GPUs to use (default: all available) - max_batch_size: int - Maximum batch size for inference - extra_options: Dict[str, Any] - Additional options for model generation - postprocessors: list[str] - List of postprocessors to apply - registry: Any - Registry object for the solver - """ - - def __init__( - self, - model_name: str, - num_gpus: int = torch.cuda.device_count(), - max_batch_size: int = DEFAULT_MAX_BATCH_SIZE, - extra_options: Optional[Dict[str, Any]] = None, - postprocessors: list[str] = [], - registry: Any = None, - ): - super().__init__(postprocessors=postprocessors, registry=registry) - - self.model_name = model_name - self.extra_options = extra_options or {} - - # Set up multiprocessing - mp.set_start_method("spawn", force=True) - rank_queue = mp.Queue() - rank_queue.put(0) # Start with primary GPU - - self.executor = BatchedProcessPoolExecutor( - max_workers=max(1, num_gpus), - max_batch_size=int(max_batch_size), - initializer=solver_initializer, - initargs=(rank_queue, num_gpus, model_name), - batch_worker_fn=solver_worker, - ) - - def copy(self): - return self - - def _process_audio_content(self, content: list) -> tuple[list, list]: - """Process audio content from message parts.""" - audios = [] - prompts = [] - - for part in content: - if part["type"] == "audio_url": - if isinstance(part["audio_url"], dict) and "url" in part["audio_url"]: - audio_data = part["audio_url"]["url"].split(",")[1] - audio_stream = io.BytesIO(base64.b64decode(audio_data)) - else: - audio_stream = io.BytesIO(urlopen(part["audio_url"]).read()) - - audio = librosa.load(audio_stream, sr=SAMPLE_RATE)[0] - audios.append(audio) - - elif part["type"] == "text": - prompts.append(part["text"]) - - return audios, prompts - - def _solve(self, task_state: TaskState, **kwargs) -> SolverResult: - inputs = {"audios": [], "prompts": []} - - # Process the last message if it contains audio - if not isinstance(task_state.messages[-1].content, str): - audios, prompts = self._process_audio_content(task_state.messages[-1].content) - inputs["audios"].extend(audios) - inputs["prompts"].extend(prompts) if prompts else inputs["prompts"].extend([None] * len(audios)) - - # Submit to executor and get result - completion_output = self.executor.submit(inputs).result() - print("completion_output: \n", completion_output, "\n\n") - return SolverResult(completion_output) - - def __del__(self): - if hasattr(self, "executor"): - self.executor.shutdown() - - -def solver_initializer( - rank_queue: mp.Queue, - world_size: int, - model_name: str, -): - """Initialize the model on the specified GPU.""" - rank = rank_queue.get() - - global model - model = transformers.AutoModel.from_pretrained( - model_name, - trust_remote_code=True - ) - - if rank == 0: - # Let other initializers start after model is downloaded - for i in range(1, world_size): - rank_queue.put(i) - - -def solver_worker(inputs: List[Dict[str, Any]]) -> List[str]: - """Process a batch of inputs using the model.""" - batch_audios = [] - batch_prompts = [] - - # Process each input in the batch - for input_item in inputs: - batch_audios.extend(input_item["audios"]) - batch_prompts.extend(input_item["prompts"]) - - # Generate responses using DiVA's interface - responses = model.generate( - batch_audios, - batch_prompts if any(batch_prompts) else None - ) - - return responses - - - -T_In = TypeVar("T_In") -T_Out = TypeVar("T_Out") - -# Reuse the BatchedProcessPoolExecutor and related classes from the original implementation -@dataclasses.dataclass -class BatchableWorkItem: - request: T_In - future: futures.Future - - -class BatchedProcessPoolExecutor: - def __init__( - self, - *args, - batch_worker_fn: Callable[[List[T_In]], List[T_Out]], - max_batch_size: int, - max_workers: int = 1, - **kwargs - ): - self.max_batch_size = max_batch_size - self.batch_worker_fn = batch_worker_fn - self._batch_queue = queue.Queue() - self.available_workers = threading.Semaphore(value=max_workers + 1) - self.process_pool_executor = ProcessPoolExecutor( - *args, max_workers=max_workers, **kwargs - ) - self._batch_thread = threading.Thread(target=self.batch_requests) - self._batch_thread.start() - - def submit(self, request: T_In) -> futures.Future: - item = BatchableWorkItem(request, futures.Future()) - self._batch_queue.put(item) - return item.future - - def shutdown(self): - self.process_pool_executor.shutdown() - while not self._batch_queue.empty(): - try: - item = self._batch_queue.get(block=False) - if item is not None: - item.future.set_exception(Exception("The pool has already shut down.")) - except queue.Empty: - pass - self._batch_queue.put(None) - - def batch_requests(self): - time.sleep(1) - while True: - self.available_workers.acquire() - work_items: List[BatchableWorkItem] = [self._batch_queue.get()] - - while len(work_items) < self.max_batch_size: - try: - item = self._batch_queue.get(block=False) - work_items.append(item) - except queue.Empty: - break - - if work_items[-1] is None: - if len(work_items) > 1: - logging.warn( - "There remained work items in the queue when shutting down. The items will be ignored." - ) - return - - requests = [item.request for item in work_items] - task_futures = [item.future for item in work_items] - - try: - result_future = self.process_pool_executor.submit(self.batch_worker_fn, requests) - except Exception as e: - self._handle_exception(e, task_futures) - return - - result_future.add_done_callback(_set_results_cb(task_futures, self._handle_exception)) - result_future.add_done_callback(lambda _: self.available_workers.release()) - - def _handle_exception(self, e: Exception, task_futures: List[futures.Future]): - print(traceback.format_exc()) - for f in task_futures: - if not f.done(): - f.set_exception(e) - self.shutdown() - - -def _set_results_cb(task_futures: List[futures.Future], handle_exception_cb: Callable): - def cb(batch_future: futures.Future): - try: - for f, r in zip(task_futures, batch_future.result()): - f.set_result(r) - except Exception as e: - handle_exception_cb(e, task_futures) - - return cb \ No newline at end of file diff --git a/evals/solvers/providers/google/gemini_solver.py b/evals/solvers/providers/google/gemini_solver.py index ed8f9b2a2e..bf1d8b63f9 100644 --- a/evals/solvers/providers/google/gemini_solver.py +++ b/evals/solvers/providers/google/gemini_solver.py @@ -147,12 +147,6 @@ def _solve( else: raise e - # record_sampling( - # prompt=msgs, - # sampled=[solver_result.output], - # model=self.model, - # ) - print("completion", solver_result.output) return solver_result @staticmethod diff --git a/evals/solvers/providers/llamaomni/llamaomni_local_gpu_solver.py b/evals/solvers/providers/llamaomni/llamaomni_local_gpu_solver.py deleted file mode 100644 index dfd964926a..0000000000 --- a/evals/solvers/providers/llamaomni/llamaomni_local_gpu_solver.py +++ /dev/null @@ -1,333 +0,0 @@ -import base64 -import dataclasses -import io -import logging -import queue -import threading -import time -import traceback -import torch -import torch.distributed -import torch.multiprocessing as mp -import librosa -import transformers -import whisper -from urllib.request import urlopen -from concurrent import futures -from concurrent.futures import ProcessPoolExecutor -from typing import Any, Callable, Dict, List, Optional, TypeVar -from evals.solvers.solver import Solver, SolverResult -from evals.task_state import TaskState -import numpy as np -import tempfile -import os -from .llamaomni.omni_speech.model.builder import load_pretrained_model -from .llamaomni.omni_speech.datasets.preprocess import tokenizer_speech_token -from .llamaomni.omni_speech.conversation import conv_templates - -SAMPLE_RATE = 16000 -DEFAULT_MAX_BATCH_SIZE = 1 - -class LlamaOmniLocalGPUSolver(Solver): - """ - A solver class for running the LlamaOmni model on CPU or multiple GPUs. - Uses BatchedProcessPoolExecutor for efficient batch processing. - - Args: - model_name: str - The model name/path - device: str - Device to run on ('cpu' or 'cuda') - num_gpus: int - Number of GPUs to use (default: all available if device='cuda') - max_batch_size: int - Maximum batch size for inference - extra_options: Dict[str, Any] - Additional options for model generation - postprocessors: list[str] - List of postprocessors to apply - registry: Any - Registry object for the solver - """ - - def __init__( - self, - model_name: str, - device: str = "cuda" if torch.cuda.is_available() else "cpu", - num_gpus: int = None, - max_batch_size: int = DEFAULT_MAX_BATCH_SIZE, - extra_options: Optional[Dict[str, Any]] = None, - postprocessors: list[str] = [], - registry: Any = None, - ): - super().__init__(postprocessors=postprocessors, registry=registry) - - self.model_name = model_name - self.device = device - self.extra_options = extra_options or {} - - # Set number of workers based on device - if device == "cuda": - num_gpus = num_gpus or torch.cuda.device_count() - num_workers = max(1, num_gpus) - else: - num_workers = 1 - - # Set up multiprocessing - mp.set_start_method("spawn", force=True) - rank_queue = mp.Queue() - rank_queue.put(0) # Start with primary worker - - self.executor = BatchedProcessPoolExecutor( - max_workers=num_workers, - max_batch_size=int(max_batch_size), - initializer=solver_initializer, - initargs=(rank_queue, num_workers, model_name, device), - batch_worker_fn=solver_worker, - ) - - def copy(self): - return self - - def _process_audio_content(self, content: list) -> tuple[list, list]: - """Process audio content from message parts.""" - audios = [] - prompts = [] - - for part in content: - if part["type"] == "audio_url": - # Handle base64 encoded audio - audio_data = part["audio_url"]["url"].split(",")[1] - audio_bytes = base64.b64decode(audio_data) - - # Create a temporary file - with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as temp_file: - temp_file.write(audio_bytes) - temp_path = temp_file.name - - try: - # Load using whisper's load_audio function - audio_array = whisper.load_audio(temp_path) - audios.append(audio_array) - finally: - # Clean up the temporary file - os.unlink(temp_path) - - elif part["type"] == "text": - prompts.append(part["text"]) - - return audios, prompts - - def _solve(self, task_state: TaskState, **kwargs) -> SolverResult: - inputs = {"audios": [], "prompts": []} - - # Process the last message if it contains audio - if not isinstance(task_state.messages[-1].content, str): - audios, prompts = self._process_audio_content(task_state.messages[-1].content) - inputs["audios"].extend(audios) - inputs["prompts"].extend(prompts) if prompts else inputs["prompts"].extend([None] * len(audios)) - - # Submit to executor and get result - completion_output = self.executor.submit(inputs).result() - return SolverResult(completion_output) - - def __del__(self): - if hasattr(self, "executor"): - self.executor.shutdown() - - -def solver_initializer( - rank_queue: mp.Queue, - world_size: int, - model_name: str, - device: str, -): - """Initialize the LlamaOmni model on the specified device.""" - rank = rank_queue.get() - - # Set device based on configuration - if device == "cuda": - device = f"cuda:{rank}" - else: - device = "cpu" - - global model, tokenizer, context_len - - # Initialize model using load_pretrained_model - tokenizer, model, context_len = load_pretrained_model( - model_path=model_name, - model_base=None, # Can be made configurable through extra_options - is_lora=False, # Can be made configurable through extra_options - s2s=False, # Can be made configurable through extra_options - device=device - ) - - # Move model to appropriate device if needed - if device != "cuda:0": # load_pretrained_model typically uses cuda:0 by default - model = model.to(device) - - if rank == 0: - # Let other initializers start after model is downloaded - for i in range(1, world_size): - rank_queue.put(i) - - -def solver_worker(inputs: List[Dict[str, Any]]) -> List[str]: - """Process a batch of inputs using the LlamaOmni model.""" - print("inputs", inputs) - batch_audios = [] - batch_prompts = [] - - # Process each input in the batch - for input_item in inputs: - batch_audios.extend(input_item["audios"]) - batch_prompts.extend(input_item["prompts"]) - - # Create conversation prompts like in the reference code - processed_prompts = [] - for prompt in batch_prompts: - conv = conv_templates["v1"].copy() - conv.append_message(conv.roles[0], prompt) - conv.append_message(conv.roles[1], None) - processed_prompts.append(conv.get_prompt()) - - # Process input_ids with the processed prompts - input_ids_list = [] - for prompt in processed_prompts: - ids = tokenizer_speech_token(prompt, tokenizer, return_tensors='pt') - input_ids_list.append(ids) - - # Process audio similarly to reference code - speech_list = [] - for audio in batch_audios: - # Convert to float32 at the very beginning - audio = audio.astype(np.float32) - speech = whisper.pad_or_trim(audio) - mel = whisper.log_mel_spectrogram(speech, n_mels=128) - # Convert to tensor and ensure float32 dtype - speech_tensor = torch.tensor(mel.numpy(), dtype=torch.float32).permute(1, 0) - speech_list.append(speech_tensor) - - speech_lengths = [torch.tensor([audio.shape[0]], dtype=torch.long) for audio in batch_audios] - - input_ids = torch.stack(input_ids_list, dim=0) - speech_tensors = torch.stack(speech_list, dim=0) - speech_lengths = torch.stack(speech_lengths, dim=0) - - device = next(model.parameters()).device # Get device from model - - # Move tensors to device and ensure correct dtypes - input_ids = input_ids.to(device=device, dtype=torch.long) - speech_tensors = speech_tensors.to(device=device, dtype=torch.float32) - speech_lengths = speech_lengths.to(device=device, dtype=torch.long) - - # Ensure model parameters are float32 - model.float() # Convert all model parameters to float32 - - - # Generate responses using LlamaOmni's interface - outputs = model.generate( - input_ids, - speech=speech_tensors, - speech_lengths=speech_lengths, - do_sample=True, - temperature=0.7, - top_p=0.9, - num_beams=1, - max_new_tokens=5, - use_cache=True, - pad_token_id=128004, - ) - - # Decode responses - decoded_responses = tokenizer.batch_decode(outputs, skip_special_tokens=True) - return decoded_responses - - -# Reuse the BatchedProcessPoolExecutor implementation from DiVA -T_In = TypeVar("T_In") -T_Out = TypeVar("T_Out") - -@dataclasses.dataclass -class BatchableWorkItem: - request: T_In - future: futures.Future - - -class BatchedProcessPoolExecutor: - def __init__( - self, - *args, - batch_worker_fn: Callable[[List[T_In]], List[T_Out]], - max_batch_size: int, - max_workers: int = 1, - **kwargs - ): - self.max_batch_size = max_batch_size - self.batch_worker_fn = batch_worker_fn - self._batch_queue = queue.Queue() - self.available_workers = threading.Semaphore(value=max_workers + 1) - self.process_pool_executor = ProcessPoolExecutor( - *args, max_workers=max_workers, **kwargs - ) - self._batch_thread = threading.Thread(target=self.batch_requests) - self._batch_thread.start() - - def submit(self, request: T_In) -> futures.Future: - item = BatchableWorkItem(request, futures.Future()) - self._batch_queue.put(item) - return item.future - - def shutdown(self): - self.process_pool_executor.shutdown() - while not self._batch_queue.empty(): - try: - item = self._batch_queue.get(block=False) - if item is not None: - item.future.set_exception(Exception("The pool has already shut down.")) - except queue.Empty: - pass - self._batch_queue.put(None) - - def batch_requests(self): - time.sleep(1) - while True: - self.available_workers.acquire() - work_items: List[BatchableWorkItem] = [self._batch_queue.get()] - - while len(work_items) < self.max_batch_size: - try: - item = self._batch_queue.get(block=False) - work_items.append(item) - except queue.Empty: - break - - if work_items[-1] is None: - if len(work_items) > 1: - logging.warn( - "There remained work items in the queue when shutting down. The items will be ignored." - ) - return - - requests = [item.request for item in work_items] - task_futures = [item.future for item in work_items] - - try: - result_future = self.process_pool_executor.submit(self.batch_worker_fn, requests) - except Exception as e: - self._handle_exception(e, task_futures) - return - - result_future.add_done_callback(_set_results_cb(task_futures, self._handle_exception)) - result_future.add_done_callback(lambda _: self.available_workers.release()) - - def _handle_exception(self, e: Exception, task_futures: List[futures.Future]): - print(traceback.format_exc()) - for f in task_futures: - if not f.done(): - f.set_exception(e) - self.shutdown() - - -def _set_results_cb(task_futures: List[futures.Future], handle_exception_cb: Callable): - def cb(batch_future: futures.Future): - try: - for f, r in zip(task_futures, batch_future.result()): - f.set_result(r) - except Exception as e: - handle_exception_cb(e, task_futures) - - return cb From cb61f2211d9373eaad7ccfe1ccb1f4d5eefedae6 Mon Sep 17 00:00:00 2001 From: Patrick Li Date: Fri, 8 Nov 2024 17:17:26 -0800 Subject: [PATCH 13/14] More cleanup --- evals/solvers/providers/google/gemini_solver.py | 5 +++++ pyproject.toml | 1 - 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/evals/solvers/providers/google/gemini_solver.py b/evals/solvers/providers/google/gemini_solver.py index bf1d8b63f9..33a8a93a25 100644 --- a/evals/solvers/providers/google/gemini_solver.py +++ b/evals/solvers/providers/google/gemini_solver.py @@ -147,6 +147,11 @@ def _solve( else: raise e + record_sampling( + prompt=msgs, + sampled=[solver_result.output], + model=self.model, + ) return solver_result @staticmethod diff --git a/pyproject.toml b/pyproject.toml index 7394fc2e61..ec3aa11059 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,7 +52,6 @@ dependencies = [ "transformers[torch]", "peft", "librosa", - "openai-whisper", "replicate", ] From 9844fd5f0fe47ee8e2ff7559b6f9ee81353a636e Mon Sep 17 00:00:00 2001 From: Patrick Li Date: Fri, 8 Nov 2024 17:22:39 -0800 Subject: [PATCH 14/14] More cleanup --- evals/registry/solvers/llamaomni.yaml | 7 +------ .../providers/llamaomni/llamaomni_replicate_solver.py | 1 - 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/evals/registry/solvers/llamaomni.yaml b/evals/registry/solvers/llamaomni.yaml index 80b19d513b..ff191c789d 100644 --- a/evals/registry/solvers/llamaomni.yaml +++ b/evals/registry/solvers/llamaomni.yaml @@ -1,9 +1,4 @@ -generation/gpu/llamaomni: - class: evals.solvers.providers.llamaomni.llamaomni_local_gpu_solver:LlamaOmniLocalGPUSolver - args: - model_name: ICTNLP/Llama-3.1-8B-Omni - - +# You should use your own Replicate deployment for this solver. generation/replicate/llamaomni: class: evals.solvers.providers.llamaomni.llamaomni_replicate_solver:LlamaOmniReplicateSolver args: diff --git a/evals/solvers/providers/llamaomni/llamaomni_replicate_solver.py b/evals/solvers/providers/llamaomni/llamaomni_replicate_solver.py index f6a6045627..3a7e6555a2 100644 --- a/evals/solvers/providers/llamaomni/llamaomni_replicate_solver.py +++ b/evals/solvers/providers/llamaomni/llamaomni_replicate_solver.py @@ -84,7 +84,6 @@ def _solve(self, task_state: TaskState, **kwargs) -> SolverResult: else: result = str(prediction.output) - print("output:", result) return SolverResult(result) return SolverResult("")