Skip to content

Commit

Permalink
Add python api for oss-model-connector
Browse files Browse the repository at this point in the history
Signed-off-by: yuchen.cc <[email protected]>
  • Loading branch information
yuchen0cc committed Dec 12, 2024
1 parent b5c3458 commit 89565f9
Show file tree
Hide file tree
Showing 7 changed files with 298 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .github/workflows/build_wheel.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,4 @@ jobs:
repo_token: "${{ secrets.GITHUB_TOKEN }}"
automatic_release_tag: "${{ env.RELEASE_TAG }}"
prerelease: false
files: dist/osstorchconnector*
files: dist/oss*.whl
9 changes: 9 additions & 0 deletions oss-model-connector/ossmodelconnector/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from ._oss_connector import (
DataObject,
DataObjectInfo,
Connector,
new_oss_connector,
)
from .oss_model_connector import OssModelConnector

__all__ = ["DataObject", "DataObjectInfo", "Connector", "new_oss_connector", "OssModelConnector"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from .oss_model_connector import (
DataObject,
DataObjectInfo,
Connector,
new_oss_connector
)

__all__ = ["DataObject", "DataObjectInfo", "Connector", "new_oss_connector"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from typing import List, Union, Any

class DataObject:
key: str

def __enter__(self) -> DataObject: ...
def __exit__(self, exc_type, exc_val, exc_tb): ...
def tell(self) -> int: ...
def seek(self, offset: int, whence: int) -> int: ...
def read(self, count: int) -> bytes: ...
def readinto(self, buf) -> int: ...
def mmap(self) -> int: ...
def close(self) -> int: ...
def size(self) -> int: ...


class DataObjectInfo:
key: str
size: int


class Connector:
def open(uri: str, prefetch: bool, userfault: bool, binary: bool) -> DataObject: ...
def prepare_directory(uri: str, dir: str, libc: bool) -> int: ...
def list(bucket: str, prefix: str, fast: bool) -> List[DataObjectInfo]: ...


def new_oss_connector(endpoint: str, cred: Union[str, Any], config_path: str) -> Connector:
...
161 changes: 161 additions & 0 deletions oss-model-connector/ossmodelconnector/oss_model_connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
from ._oss_connector import new_oss_connector, Connector
import ctypes
import torch
import builtins
import pathlib
from typing import Any


class UntypedStorageEx:
def __init__(self, file, size):
self.file = file
self.addr = memoryview((ctypes.c_ubyte * size).from_address(self.file.mmap()))

def untyped(self):
return self

def __getitem__(self, idx):
return self.addr[idx]

class OssModelConnector:
"""
A connector class for interfacing with OSS for model loading,
providing high-performance methods to load models/objects/files for AI inference.
"""

def __init__(
self,
endpoint: str,
cred_path: str = "",
config_path: str = "",
cred_provider: Any = None,
):
"""
Initializes the connector with endpoint and optional credential information.
Args:
endpoint(str): The OSS endpoint to connect to.
cred_path(str, optional): Path to the credential file. Defaults to "".
config_path(str, optional): Path to the configuration file. Defaults to "".
cred_provider(Any, optional): Credential provider. Defaults to None.
Raises:
ValueError: If endpoint or credential is not provided.
"""
if not endpoint:
raise ValueError("endpoint must be non-empty")
if cred_provider is None and not cred_path:
raise ValueError("Either cred_path or cred_provider must be provided")

self._endpoint = endpoint
if not cred_path:
self._cred_path = ""
else:
self._cred_path = cred_path
if not config_path:
self._config_path = ""
else:
self._config_path = config_path
self._cred_provider = cred_provider

self._real_connector = None
self._hook_dir = ''
self._origin_from_file = torch.UntypedStorage.from_file
self._origin_open = builtins.open

def __del__(self):
self.close()
@property
def _connector(self):
if self._real_connector is None:
if self._cred_provider is not None:
self._real_connector = new_oss_connector(self._endpoint, self._cred_provider, self._config_path)
else:
self._real_connector = new_oss_connector(self._endpoint, self._cred_path, self._config_path)

return self._real_connector

def close(self):
"""
Close the connector and release resources.
"""
if self._hook_dir:
self._hook_dir = ''

if builtins.open == self._connector_open:
builtins.open = self._origin_open
torch.UntypedStorage.from_file = self._origin_from_file

if torch.UntypedStorage.from_file == self._from_file_helper:
torch.UntypedStorage.from_file = self._origin_from_file

if self._real_connector is not None:
del self._real_connector
self._real_connector = None

def open(self, uri, binary = True):
"""
Opens an object from OSS storage.
Args:
uri(str): The uri (oss://{bucket}/{object_name}) of the object to open.
binary(bool): Flag indicating whether to open in binary mode or not.
Returns:
Stream-like object of the opened OSS object.
"""
return self._connector.open(uri, True, True, binary)

def _from_file_helper(self, filename, shared, nbytes):
file = self._connector.open(filename, True, True)
return UntypedStorageEx(file, nbytes)

def _connector_open(self, *args, **kwargs):
filename = args[0]
if isinstance(filename, pathlib.Path):
filename = str(filename)
open_mode = 'r' if len(args) == 1 else args[1]
if self._hook_dir and filename.startswith(self._hook_dir):
binary = False
if open_mode == "rb":
binary = True
try:
return self.open(filename, binary)
except:
return self._origin_open(*args, **kwargs)
else:
return self._origin_open(*args, **kwargs)

def prepare_directory(self, uri: str, dir: str, libc_hook: bool = False):
"""
Prepare the directory from OSS storage, which can be used as directory 'dir' in vllm/transformers or other frameworks.
Args:
uri(str): The URI (oss://{bucket}/{directory}) of the OSS directory.
dir(str): The local directory used for vllm/transformers or other frameworks.
libc_hook (bool): Flag to enable libc hooking.
Raises:
RuntimeError: If prepare directory failed.
"""
if not dir.endswith('/'):
dir += '/'
self._connector.prepare_directory(uri, dir, libc_hook)
if not libc_hook:
builtins.open = self._connector_open
torch.UntypedStorage.from_file = self._from_file_helper
self._hook_dir = dir

def list(self, bucket: str, prefix: str, fast: bool = False):
"""
Lists objects in a specified OSS bucket with a given prefix.
Args:
bucket(str): The OSS bucket name.
prefix(str): The prefix filter for object listing.
fast (bool): If true, enables fast list mode.
Returns:
List: A list of objects matching the bucket and prefix criteria.
"""
return self._connector.list(bucket, prefix, fast)
34 changes: 34 additions & 0 deletions oss-model-connector/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
[build-system]
requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta"

[project]
name = "ossmodelconnector"
version = "1.0.0rc1"
description = "OSS model connector for AI/ML"
requires-python = ">=3.8,<3.13"
readme = "README.md"
dependencies = [
"torch >= 2.0",
]
classifiers = [
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"Topic :: Utilities",
"License :: OSI Approved :: MIT License",
"Operating System :: POSIX :: Linux",

"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
]

[tool.setuptools.packages.find]
where = ["."]
include = ["ossmodelconnector", "ossmodelconnector._oss_connector"]

[tool.setuptools.package-data]
osstorchconnector = ["_oss_connector/*.so"]
56 changes: 56 additions & 0 deletions oss-model-connector/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from setuptools import setup, Extension
from setuptools.command.build_ext import build_ext
import os
import subprocess
import shutil


class BuildExtension(Extension):
def __init__(self, name, source_dir=''):
Extension.__init__(self, name, sources=[source_dir])
self.source_dir = os.path.abspath(source_dir)

class LibraryBuild(build_ext):
user_options = build_ext.user_options + [
('library-path=', None, 'oss_connector library path'),
]
def initialize_options(self):
super().initialize_options()
self.library_path = None
def run(self):
if not self.library_path:
raise RuntimeError("library path is not specified by '--library-path'")
self.library_path = os.path.abspath(self.library_path)
if os.path.exists(self.library_path):
print('library path:', self.library_path)
else:
raise RuntimeError("invalid library path: " + self.library_path)
for ext in self.extensions:
self.build_extension(ext)

def run_command(self, command, cwd):
try:
subprocess.run(command, capture_output=True, text=True, check=True, cwd=cwd)
except subprocess.CalledProcessError as e:
print(f"Command '{' '.join(command)}' failed with exit code {e.returncode}")
print(f"Stdout: {e.stdout}")
print(f"Stderr: {e.stderr}")
raise RuntimeError("Subprocess execution failed") from e

def build_extension(self, ext):
print('name:', ext.name)
print('source path:', ext.source_dir)
print('current dir:', os.getcwd())

# copy .so
library_file_name = os.path.basename(self.library_path)
dest_so_path = os.path.abspath(
os.path.join(self.build_lib, 'ossmodelconnector', '_oss_connector', library_file_name))
print('copy %s to %s' % (self.library_path, dest_so_path))
shutil.copy(self.library_path, dest_so_path)


setup(
ext_modules=[BuildExtension('oss_model_connector', '.')],
cmdclass=dict(build_ext=LibraryBuild),
)

0 comments on commit 89565f9

Please sign in to comment.