diff --git a/.github/workflows/build_wheel.yml b/.github/workflows/build_wheel.yml index 6611e35..af13f4e 100644 --- a/.github/workflows/build_wheel.yml +++ b/.github/workflows/build_wheel.yml @@ -81,4 +81,4 @@ jobs: repo_token: "${{ secrets.GITHUB_TOKEN }}" automatic_release_tag: "${{ env.RELEASE_TAG }}" prerelease: false - files: dist/osstorchconnector* + files: dist/oss*.whl diff --git a/oss-model-connector/ossmodelconnector/__init__.py b/oss-model-connector/ossmodelconnector/__init__.py new file mode 100644 index 0000000..1e19653 --- /dev/null +++ b/oss-model-connector/ossmodelconnector/__init__.py @@ -0,0 +1,9 @@ +from ._oss_connector import ( + DataObject, + DataObjectInfo, + Connector, + new_oss_connector, +) +from .oss_model_connector import OssModelConnector + +__all__ = ["DataObject", "DataObjectInfo", "Connector", "new_oss_connector", "OssModelConnector"] diff --git a/oss-model-connector/ossmodelconnector/_oss_connector/__init__.py b/oss-model-connector/ossmodelconnector/_oss_connector/__init__.py new file mode 100644 index 0000000..859c4d8 --- /dev/null +++ b/oss-model-connector/ossmodelconnector/_oss_connector/__init__.py @@ -0,0 +1,8 @@ +from .oss_model_connector import ( + DataObject, + DataObjectInfo, + Connector, + new_oss_connector +) + +__all__ = ["DataObject", "DataObjectInfo", "Connector", "new_oss_connector"] diff --git a/oss-model-connector/ossmodelconnector/_oss_connector/oss_model_connector.pyi b/oss-model-connector/ossmodelconnector/_oss_connector/oss_model_connector.pyi new file mode 100644 index 0000000..97c2a80 --- /dev/null +++ b/oss-model-connector/ossmodelconnector/_oss_connector/oss_model_connector.pyi @@ -0,0 +1,29 @@ +from typing import List, Union, Any + +class DataObject: + key: str + + def __enter__(self) -> DataObject: ... + def __exit__(self, exc_type, exc_val, exc_tb): ... + def tell(self) -> int: ... + def seek(self, offset: int, whence: int) -> int: ... + def read(self, count: int) -> bytes: ... + def readinto(self, buf) -> int: ... + def mmap(self) -> int: ... + def close(self) -> int: ... + def size(self) -> int: ... + + +class DataObjectInfo: + key: str + size: int + + +class Connector: + def open(uri: str, prefetch: bool, userfault: bool, binary: bool) -> DataObject: ... + def prepare_directory(uri: str, dir: str, libc: bool) -> int: ... + def list(bucket: str, prefix: str, fast: bool) -> List[DataObjectInfo]: ... + + +def new_oss_connector(endpoint: str, cred: Union[str, Any], config_path: str) -> Connector: + ... diff --git a/oss-model-connector/ossmodelconnector/oss_model_connector.py b/oss-model-connector/ossmodelconnector/oss_model_connector.py new file mode 100644 index 0000000..066bd20 --- /dev/null +++ b/oss-model-connector/ossmodelconnector/oss_model_connector.py @@ -0,0 +1,161 @@ +from ._oss_connector import new_oss_connector, Connector +import ctypes +import torch +import builtins +import pathlib +from typing import Any + + +class UntypedStorageEx: + def __init__(self, file, size): + self.file = file + self.addr = memoryview((ctypes.c_ubyte * size).from_address(self.file.mmap())) + + def untyped(self): + return self + + def __getitem__(self, idx): + return self.addr[idx] + +class OssModelConnector: + """ + A connector class for interfacing with OSS for model loading, + providing high-performance methods to load models/objects/files for AI inference. + """ + + def __init__( + self, + endpoint: str, + cred_path: str = "", + config_path: str = "", + cred_provider: Any = None, + ): + """ + Initializes the connector with endpoint and optional credential information. + + Args: + endpoint(str): The OSS endpoint to connect to. + cred_path(str, optional): Path to the credential file. Defaults to "". + config_path(str, optional): Path to the configuration file. Defaults to "". + cred_provider(Any, optional): Credential provider. Defaults to None. + + Raises: + ValueError: If endpoint or credential is not provided. + """ + if not endpoint: + raise ValueError("endpoint must be non-empty") + if cred_provider is None and not cred_path: + raise ValueError("Either cred_path or cred_provider must be provided") + + self._endpoint = endpoint + if not cred_path: + self._cred_path = "" + else: + self._cred_path = cred_path + if not config_path: + self._config_path = "" + else: + self._config_path = config_path + self._cred_provider = cred_provider + + self._real_connector = None + self._hook_dir = '' + self._origin_from_file = torch.UntypedStorage.from_file + self._origin_open = builtins.open + + def __del__(self): + self.close() + @property + def _connector(self): + if self._real_connector is None: + if self._cred_provider is not None: + self._real_connector = new_oss_connector(self._endpoint, self._cred_provider, self._config_path) + else: + self._real_connector = new_oss_connector(self._endpoint, self._cred_path, self._config_path) + + return self._real_connector + + def close(self): + """ + Close the connector and release resources. + """ + if self._hook_dir: + self._hook_dir = '' + + if builtins.open == self._connector_open: + builtins.open = self._origin_open + torch.UntypedStorage.from_file = self._origin_from_file + + if torch.UntypedStorage.from_file == self._from_file_helper: + torch.UntypedStorage.from_file = self._origin_from_file + + if self._real_connector is not None: + del self._real_connector + self._real_connector = None + + def open(self, uri, binary = True): + """ + Opens an object from OSS storage. + + Args: + uri(str): The uri (oss://{bucket}/{object_name}) of the object to open. + binary(bool): Flag indicating whether to open in binary mode or not. + + Returns: + Stream-like object of the opened OSS object. + """ + return self._connector.open(uri, True, True, binary) + + def _from_file_helper(self, filename, shared, nbytes): + file = self._connector.open(filename, True, True) + return UntypedStorageEx(file, nbytes) + + def _connector_open(self, *args, **kwargs): + filename = args[0] + if isinstance(filename, pathlib.Path): + filename = str(filename) + open_mode = 'r' if len(args) == 1 else args[1] + if self._hook_dir and filename.startswith(self._hook_dir): + binary = False + if open_mode == "rb": + binary = True + try: + return self.open(filename, binary) + except: + return self._origin_open(*args, **kwargs) + else: + return self._origin_open(*args, **kwargs) + + def prepare_directory(self, uri: str, dir: str, libc_hook: bool = False): + """ + Prepare the directory from OSS storage, which can be used as directory 'dir' in vllm/transformers or other frameworks. + + Args: + uri(str): The URI (oss://{bucket}/{directory}) of the OSS directory. + dir(str): The local directory used for vllm/transformers or other frameworks. + libc_hook (bool): Flag to enable libc hooking. + + Raises: + RuntimeError: If prepare directory failed. + """ + if not dir.endswith('/'): + dir += '/' + self._connector.prepare_directory(uri, dir, libc_hook) + if not libc_hook: + builtins.open = self._connector_open + torch.UntypedStorage.from_file = self._from_file_helper + self._hook_dir = dir + + def list(self, bucket: str, prefix: str, fast: bool = False): + """ + Lists objects in a specified OSS bucket with a given prefix. + + Args: + bucket(str): The OSS bucket name. + prefix(str): The prefix filter for object listing. + fast (bool): If true, enables fast list mode. + + Returns: + List: A list of objects matching the bucket and prefix criteria. + """ + return self._connector.list(bucket, prefix, fast) diff --git a/oss-model-connector/pyproject.toml b/oss-model-connector/pyproject.toml new file mode 100644 index 0000000..8e3334c --- /dev/null +++ b/oss-model-connector/pyproject.toml @@ -0,0 +1,34 @@ +[build-system] +requires = ["setuptools", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "ossmodelconnector" +version = "1.0.0rc1" +description = "OSS model connector for AI/ML" +requires-python = ">=3.8,<3.13" +readme = "README.md" +dependencies = [ + "torch >= 2.0", +] +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "Topic :: Utilities", + "License :: OSI Approved :: MIT License", + "Operating System :: POSIX :: Linux", + + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", +] + +[tool.setuptools.packages.find] +where = ["."] +include = ["ossmodelconnector", "ossmodelconnector._oss_connector"] + +[tool.setuptools.package-data] +osstorchconnector = ["_oss_connector/*.so"] diff --git a/oss-model-connector/setup.py b/oss-model-connector/setup.py new file mode 100644 index 0000000..9cf8426 --- /dev/null +++ b/oss-model-connector/setup.py @@ -0,0 +1,56 @@ +from setuptools import setup, Extension +from setuptools.command.build_ext import build_ext +import os +import subprocess +import shutil + + +class BuildExtension(Extension): + def __init__(self, name, source_dir=''): + Extension.__init__(self, name, sources=[source_dir]) + self.source_dir = os.path.abspath(source_dir) + +class LibraryBuild(build_ext): + user_options = build_ext.user_options + [ + ('library-path=', None, 'oss_connector library path'), + ] + def initialize_options(self): + super().initialize_options() + self.library_path = None + def run(self): + if not self.library_path: + raise RuntimeError("library path is not specified by '--library-path'") + self.library_path = os.path.abspath(self.library_path) + if os.path.exists(self.library_path): + print('library path:', self.library_path) + else: + raise RuntimeError("invalid library path: " + self.library_path) + for ext in self.extensions: + self.build_extension(ext) + + def run_command(self, command, cwd): + try: + subprocess.run(command, capture_output=True, text=True, check=True, cwd=cwd) + except subprocess.CalledProcessError as e: + print(f"Command '{' '.join(command)}' failed with exit code {e.returncode}") + print(f"Stdout: {e.stdout}") + print(f"Stderr: {e.stderr}") + raise RuntimeError("Subprocess execution failed") from e + + def build_extension(self, ext): + print('name:', ext.name) + print('source path:', ext.source_dir) + print('current dir:', os.getcwd()) + + # copy .so + library_file_name = os.path.basename(self.library_path) + dest_so_path = os.path.abspath( + os.path.join(self.build_lib, 'ossmodelconnector', '_oss_connector', library_file_name)) + print('copy %s to %s' % (self.library_path, dest_so_path)) + shutil.copy(self.library_path, dest_so_path) + + +setup( + ext_modules=[BuildExtension('oss_model_connector', '.')], + cmdclass=dict(build_ext=LibraryBuild), +)