From 602fb7c2109339153e9e42f35b0e5dc85bd4605b Mon Sep 17 00:00:00 2001 From: "Radheya D. Kale" Date: Tue, 21 Jun 2022 13:07:17 +0530 Subject: [PATCH 1/6] WIP: Training and testing of Resnet50 Keras models added * TODO: Need a robust way to find if the MLHandler expects image data for training/testing * TODO: Separate class is required for training and testing CV models * TODO: Code has to be made generic to dynamically select Resnet50/VGG, etc. models * TODO: Model training parameters viz. epochs, batch size have to be taken from the user with defaults set * TODO: setup and get_model functions have to be made more scalable * TODO: ModelStore __init__ has to be made scalable --- gramex/handlers/mlhandler.py | 169 +++++++++++++++++++++++++++++------ 1 file changed, 142 insertions(+), 27 deletions(-) diff --git a/gramex/handlers/mlhandler.py b/gramex/handlers/mlhandler.py index b1a9e662b..8279ad07f 100644 --- a/gramex/handlers/mlhandler.py +++ b/gramex/handlers/mlhandler.py @@ -58,6 +58,9 @@ def setup(cls, data=None, model={}, config_dir='', template=DEFAULT_TEMPLATE, ** config_dir = op.join(gramex.config.variables['GRAMEXDATA'], 'apps', 'mlhandler', slugify(cls.name)) cls.store = ml.ModelStore(config_dir) + cls.is_cv_request = False + if 'cv_model' in config_dir: + cls.is_cv_request = True cls.template = template super(MLHandler, cls).setup(**kwargs) @@ -95,7 +98,10 @@ def setup(cls, data=None, model={}, config_dir='', template=DEFAULT_TEMPLATE, ** model_params = model.get('params', {}) cls.store.dump('class', mclass) cls.store.dump('params', model_params) - if op.exists(cls.store.model_path): # If the pkl exists, load it + if cls.is_cv_request: + pass + elif hasattr(cls.store, 'model_path') and op.exists(cls.store.model_path): + # If the pkl exists, load it if op.isdir(cls.store.model_path): mclass, wrapper = ml.search_modelclass(mclass) cls.model = locate(wrapper).from_disk(mclass, cls.store.model_path) @@ -184,7 +190,38 @@ def _transform(self, data, **kwargs): return data def _predict(self, data=None, score_col=''): - self._check_model_path() + if self.is_cv_request: + from tensorflow.keras.applications.resnet50 import ResNet50 + from tensorflow.keras.preprocessing import image + from tensorflow.keras.models import load_model + from tensorflow.keras.applications.resnet50 import preprocess_input, decode_predictions + + config_dir = op.join(gramex.config.variables['GRAMEXDATA'], 'apps', 'mlhandler', + slugify(self.name)) + if op.exists(config_dir) and 'keras_metadata.pb' in os.listdir(config_dir): + model = load_model(config_dir) + else: + model = ResNet50(include_top=True, + weights="imagenet", + input_tensor=None, + input_shape=None, + pooling=None, + classes=1000) + x = image.img_to_array(data) + x = np.expand_dims(x, axis=0) + x = preprocess_input(x) + + preds = model.predict(x) + # decode the results into a list of tuples (class, description, probability) + # (one such list for each sample in the batch) + try: + results = decode_predictions(preds) + except Exception: + class_names = [] + class_names = json.load(open(op.join(config_dir, 'class_names.json'))) + results = dict(zip(class_names, preds[0])) + return results + metric = self.get_argument('_metric', False) if metric: scorer = get_scorer(metric) @@ -208,7 +245,8 @@ def _predict(self, data=None, score_col=''): def _check_model_path(self): try: klass, wrapper = ml.search_modelclass(self.store.load('class')) - self.model = locate(wrapper).from_disk(self.store.model_path, klass=klass) + if hasattr(self.store, 'model_path'): + self.model = locate(wrapper).from_disk(self.store.model_path, klass=klass) except FileNotFoundError: raise HTTPError(NOT_FOUND, f'No model found at {self.store.model_path}') @@ -239,15 +277,29 @@ def get(self, *path_args, **path_kwargs): elif '_cache' in self.args: self.write(self.store.load_data().to_json(orient='records')) else: - self._check_model_path() + if not self.is_cv_request: + self._check_model_path() if '_download' in self.args: - self.set_header('Content-Type', 'application/octet-strem') + self.set_header('Content-Type', 'application/octet-stream') self.set_header('Content-Disposition', f'attachment; filename={op.basename(self.store.model_path)}') with open(self.store.model_path, 'rb') as fout: self.write(fout.read()) elif '_model' in self.args: self.write(json.dumps(self.model.get_params(), indent=2)) + elif len(self.request.files.keys()) and \ + self.request.files['image'][0].content_type in \ + ['image/jpeg', 'image/jpg', 'image/png']: + if '_action' in self.args and self.args['_action'] == 'predict': + import cv2 + import imutils + data = imutils.resize(cv2.imdecode(np.fromstring( + self.request.files['image'][0].body, np.uint8), cv2.IMREAD_UNCHANGED), + width=224) + data = cv2.resize(data, (224, 224)) + prediction = yield gramex.service.threadpool.submit( + self._predict, data) + self.write(json.dumps(prediction, indent=2, cls=CustomJSONEncoder)) else: try: data_args = {k: v for k, v in self.args.items() if not k.startswith('_')} @@ -259,10 +311,15 @@ def get(self, *path_args, **path_kwargs): app_log.debug(err.msg) data = [] if len(data) > 0: - data = data.drop([self.store.load('target_col')], axis=1, errors='ignore') - prediction = yield gramex.service.threadpool.submit( - self._predict, data) - self.write(json.dumps(prediction, indent=2, cls=CustomJSONEncoder)) + if 'training_data' in data.keys(): + training_results = yield gramex.service.threadpool.submit( + self._train, data=data['training_data'].iloc[0]) + self.write(json.dumps(training_results, indent=2, cls=CustomJSONEncoder)) + else: + data = data.drop([self.store.load('target_col')], axis=1, errors='ignore') + prediction = yield gramex.service.threadpool.submit( + self._predict, data) + self.write(json.dumps(prediction, indent=2, cls=CustomJSONEncoder)) else: self.set_header('Content-Type', 'text/html') self.render(self.template, handler=self, data=self.store.load_data()) @@ -271,26 +328,84 @@ def get(self, *path_args, **path_kwargs): def _append(self): self._parse_data(_cache=True, append=True) - def _train(self, data=None): - target_col = self.get_argument('target_col', self.store.load('target_col')) - index_col = self.get_argument('index_col', self.store.load('index_col')) - self.store.dump('target_col', target_col) - data = self._parse_data(False) if data is None else data - data = self._filtercols(data) - data = self._filterrows(data) - self.model = get_model( - self.store.load('class'), self.store.load('params'), - data=data, target_col=target_col, - nums=self.store.load('nums'), cats=self.store.load('cats') + def _train_keras(self, data): + import tensorflow as tf + from tensorflow.keras.optimizers import Adam + from tensorflow.keras.models import Sequential + from tensorflow.python.keras.layers import Dense, Flatten + import pathlib + data_dir = pathlib.Path(data) + + config_dir = op.join(gramex.config.variables['GRAMEXDATA'], 'apps', 'mlhandler', + slugify(self.name)) + + img_height, img_width = 224, 224 + batch_size = 32 + train_ds = tf.keras.preprocessing.image_dataset_from_directory( + data_dir, + validation_split=0.2, + subset="training", + seed=123, + image_size=(img_height, img_width), + batch_size=batch_size) + + val_ds = tf.keras.preprocessing.image_dataset_from_directory( + data_dir, + validation_split=0.2, + subset="validation", + seed=123, + image_size=(img_height, img_width), + batch_size=batch_size) + + class_names = train_ds.class_names + keras_model = Sequential() + pretrained_model = tf.keras.applications.ResNet50(include_top=False, + input_shape=(224, 224, 3), + pooling='avg', classes=5, + weights='imagenet') + for layer in pretrained_model.layers: + layer.trainable = False + + keras_model.add(pretrained_model) + keras_model.add(Flatten()) + keras_model.add(Dense(512, activation='relu')) + keras_model.add(Dense(5, activation='softmax')) + keras_model.compile(optimizer=Adam(lr=0.001), + loss='sparse_categorical_crossentropy', metrics=['accuracy']) + epochs = 1 + keras_model.fit( + train_ds, + validation_data=val_ds, + epochs=epochs ) - if not isinstance(self.model, ml.SklearnTransformer): - target = data[target_col] - train = data[[c for c in data if c not in (target_col, index_col)]] - self.model.fit(train, target, self.store.model_path) - result = {'score': self.model.score(train, target)} + with open(op.join(config_dir, 'class_names.json'), 'w') as fout: + json.dump(class_names, fout) + keras_model.save(config_dir) + return class_names + + def _train(self, data=None): + if self.is_cv_request: + result = self._train_keras(data) else: - self.model.fit(data, None, self.store.model_path) - result = self.model.get_attributes() + target_col = self.get_argument('target_col', self.store.load('target_col')) + index_col = self.get_argument('index_col', self.store.load('index_col')) + self.store.dump('target_col', target_col) + data = self._parse_data(False) if data is None else data + data = self._filtercols(data) + data = self._filterrows(data) + self.model = get_model( + self.store.load('class'), self.store.load('params'), + data=data, target_col=target_col, + nums=self.store.load('nums'), cats=self.store.load('cats') + ) + if not isinstance(self.model, ml.SklearnTransformer): + target = data[target_col] + train = data[[c for c in data if c not in (target_col, index_col)]] + self.model.fit(train, target, self.store.model_path) + result = {'score': self.model.score(train, target)} + else: + self.model.fit(data, None, self.store.model_path) + result = self.model.get_attributes() return result def _retrain(self): From fe2c85e905dc81c8d593da7e4b3b3949424572a8 Mon Sep 17 00:00:00 2001 From: "Radheya D. Kale" Date: Fri, 24 Jun 2022 17:25:36 +0530 Subject: [PATCH 2/6] WIP: Created a wrapper for Keras. * Predict is working with default VGG16 and Resnet models. * TODO: Test and add all other models supported by Keras * TODO: Implement training functionality * TODO: Implement functionality to predict from trained models provided by user/trained in gramex --- gramex/handlers/mlhandler.py | 94 ++++++++++-------------------------- gramex/ml_api.py | 71 ++++++++++++++++++++++++--- 2 files changed, 89 insertions(+), 76 deletions(-) diff --git a/gramex/handlers/mlhandler.py b/gramex/handlers/mlhandler.py index 8279ad07f..bb808653d 100644 --- a/gramex/handlers/mlhandler.py +++ b/gramex/handlers/mlhandler.py @@ -58,9 +58,6 @@ def setup(cls, data=None, model={}, config_dir='', template=DEFAULT_TEMPLATE, ** config_dir = op.join(gramex.config.variables['GRAMEXDATA'], 'apps', 'mlhandler', slugify(cls.name)) cls.store = ml.ModelStore(config_dir) - cls.is_cv_request = False - if 'cv_model' in config_dir: - cls.is_cv_request = True cls.template = template super(MLHandler, cls).setup(**kwargs) @@ -98,15 +95,9 @@ def setup(cls, data=None, model={}, config_dir='', template=DEFAULT_TEMPLATE, ** model_params = model.get('params', {}) cls.store.dump('class', mclass) cls.store.dump('params', model_params) - if cls.is_cv_request: - pass - elif hasattr(cls.store, 'model_path') and op.exists(cls.store.model_path): - # If the pkl exists, load it - if op.isdir(cls.store.model_path): - mclass, wrapper = ml.search_modelclass(mclass) - cls.model = locate(wrapper).from_disk(mclass, cls.store.model_path) - else: - cls.model = get_model(cls.store.model_path, {}) + # If the pkl exists, load it + if op.isdir(cls.store.model_path): + cls.model = get_model(mclass, model_params) elif data is not None: data = cls._filtercols(data) data = cls._filterrows(data) @@ -190,38 +181,9 @@ def _transform(self, data, **kwargs): return data def _predict(self, data=None, score_col=''): - if self.is_cv_request: - from tensorflow.keras.applications.resnet50 import ResNet50 - from tensorflow.keras.preprocessing import image - from tensorflow.keras.models import load_model - from tensorflow.keras.applications.resnet50 import preprocess_input, decode_predictions - - config_dir = op.join(gramex.config.variables['GRAMEXDATA'], 'apps', 'mlhandler', - slugify(self.name)) - if op.exists(config_dir) and 'keras_metadata.pb' in os.listdir(config_dir): - model = load_model(config_dir) - else: - model = ResNet50(include_top=True, - weights="imagenet", - input_tensor=None, - input_shape=None, - pooling=None, - classes=1000) - x = image.img_to_array(data) - x = np.expand_dims(x, axis=0) - x = preprocess_input(x) - - preds = model.predict(x) - # decode the results into a list of tuples (class, description, probability) - # (one such list for each sample in the batch) - try: - results = decode_predictions(preds) - except Exception: - class_names = [] - class_names = json.load(open(op.join(config_dir, 'class_names.json'))) - results = dict(zip(class_names, preds[0])) - return results - + if type(data) == np.ndarray: + data = self.model.predict(data=data, mclass=self.store.load('class')) + return data metric = self.get_argument('_metric', False) if metric: scorer = get_scorer(metric) @@ -245,7 +207,7 @@ def _predict(self, data=None, score_col=''): def _check_model_path(self): try: klass, wrapper = ml.search_modelclass(self.store.load('class')) - if hasattr(self.store, 'model_path'): + if hasattr(self.store, 'model_path') and not op.isdir(self.store.model_path): self.model = locate(wrapper).from_disk(self.store.model_path, klass=klass) except FileNotFoundError: raise HTTPError(NOT_FOUND, f'No model found at {self.store.model_path}') @@ -277,8 +239,7 @@ def get(self, *path_args, **path_kwargs): elif '_cache' in self.args: self.write(self.store.load_data().to_json(orient='records')) else: - if not self.is_cv_request: - self._check_model_path() + self._check_model_path() if '_download' in self.args: self.set_header('Content-Type', 'application/octet-stream') self.set_header('Content-Disposition', @@ -384,28 +345,25 @@ def _train_keras(self, data): return class_names def _train(self, data=None): - if self.is_cv_request: - result = self._train_keras(data) + target_col = self.get_argument('target_col', self.store.load('target_col')) + index_col = self.get_argument('index_col', self.store.load('index_col')) + self.store.dump('target_col', target_col) + data = self._parse_data(False) if data is None else data + data = self._filtercols(data) + data = self._filterrows(data) + self.model = get_model( + self.store.load('class'), self.store.load('params'), + data=data, target_col=target_col, + nums=self.store.load('nums'), cats=self.store.load('cats') + ) + if not isinstance(self.model, ml.SklearnTransformer): + target = data[target_col] + train = data[[c for c in data if c not in (target_col, index_col)]] + self.model.fit(train, target, self.store.model_path) + result = {'score': self.model.score(train, target)} else: - target_col = self.get_argument('target_col', self.store.load('target_col')) - index_col = self.get_argument('index_col', self.store.load('index_col')) - self.store.dump('target_col', target_col) - data = self._parse_data(False) if data is None else data - data = self._filtercols(data) - data = self._filterrows(data) - self.model = get_model( - self.store.load('class'), self.store.load('params'), - data=data, target_col=target_col, - nums=self.store.load('nums'), cats=self.store.load('cats') - ) - if not isinstance(self.model, ml.SklearnTransformer): - target = data[target_col] - train = data[[c for c in data if c not in (target_col, index_col)]] - self.model.fit(train, target, self.store.model_path) - result = {'score': self.model.score(train, target)} - else: - self.model.fit(data, None, self.store.model_path) - result = self.model.get_attributes() + self.model.fit(data, None, self.store.model_path) + result = self.model.get_attributes() return result def _retrain(self): diff --git a/gramex/ml_api.py b/gramex/ml_api.py index 98d3380b0..a45d93d20 100644 --- a/gramex/ml_api.py +++ b/gramex/ml_api.py @@ -46,6 +46,10 @@ "statsmodels.tsa.statespace.sarimax", ], "gramex.ml_api.HFTransformer": ["gramex.transformers"], + "gramex.ml_api.KerasApplications": [ + "tensorflow.keras.applications.vgg16", + "tensorflow.keras.applications.resnet50" + ] } @@ -203,8 +207,12 @@ class ModelStore(cache.JSONStore): def __init__(self, path, *args, **kwargs): _mkdir(path) - self.data_store = op.join(path, "data.h5") - self.model_path = op.join(path, op.basename(path) + ".pkl") + if op.exists(op.join(path, "data.h5")): + self.data_store = op.join(path, "data.h5") + self.model_path = op.join(path, op.basename(path) + ".pkl") + else: + self.data_store = path + self.model_path = path self.path = path super(ModelStore, self).__init__(op.join(path, "config.json"), *args, **kwargs) @@ -397,6 +405,13 @@ def _predict(self, X, **kwargs): class HFTransformer(SklearnModel): + @classmethod + def from_disk(cls, path, klass): + # Load model from disk + model = op.join(path, "model") + tokenizer = op.join(path, "tokenizer") + return cls(klass(model, tokenizer)) + def __init__(self, model, params=None, data=None, **kwargs): self.model = model if params is None: @@ -404,12 +419,6 @@ def __init__(self, model, params=None, data=None, **kwargs): self.params = params self.kwargs = kwargs - @classmethod - def from_disk(cls, path, klass): - model = op.join(path, "model") - tokenizer = op.join(path, "tokenizer") - return cls(klass(model, tokenizer)) - def fit( self, X: Union[pd.DataFrame, np.ndarray], @@ -426,3 +435,49 @@ def _predict( ): text = X["text"] return self.model.predict(text) + + +class KerasApplications(AbstractModel): + def __init__(self, model, params=None, data=None, **kwargs): + self.model = model + if params is None: + params = {} + self.params = params + self.kwargs = kwargs + + @classmethod + def from_disk(cls, path, klass): + # Load model from disk + return cls + + def predict(self, data=None, **kwargs): + from tensorflow.keras.preprocessing import image + + mclass, wrapper = search_modelclass(kwargs['mclass']) + module_imp = __import__(mclass.__module__, fromlist=SEARCH_MODULES[wrapper]) + model = mclass(include_top=True, + weights="imagenet", + input_tensor=None, + input_shape=None, + pooling=None, + classes=1000) + x = image.img_to_array(data) + x = np.expand_dims(x, axis=0) + x = module_imp.preprocess_input(x) + + preds = model.predict(x) + # decode the results into a list of tuples (class, description, probability) + results = module_imp.decode_predictions(preds) + return results + + def fit(self, *args, **kwargs): + super().fit(*args, **kwargs) + + def get_params(self, **kwargs): + super().get_params(**kwargs) + + def score(self, X, y_true, **kwargs): + super().score(X, y_true, **kwargs) + + def get_attributes(self): + super().get_attributes() From fcc597a82caeeb17afdbb05f6ba11d13fa14217e Mon Sep 17 00:00:00 2001 From: "Radheya D. Kale" Date: Tue, 28 Jun 2022 12:03:27 +0530 Subject: [PATCH 3/6] WIP: Code refactoring * Training keras models would be done in ml_api in KerasApplication wrapper * _parse_multipart_form_data used for parsing images --- gramex/handlers/mlhandler.py | 98 +++++++++--------------------------- gramex/ml_api.py | 84 +++++++++++++++++++++++++------ 2 files changed, 94 insertions(+), 88 deletions(-) diff --git a/gramex/handlers/mlhandler.py b/gramex/handlers/mlhandler.py index bb808653d..d1defd44c 100644 --- a/gramex/handlers/mlhandler.py +++ b/gramex/handlers/mlhandler.py @@ -122,6 +122,8 @@ def _parse_multipart_form_data(self): for f in files: buff = BytesIO(f['body']) try: + if f['content_type'] in ['image/jpeg', 'image/jpg', 'image/png']: + return buff ext = re.sub(r'^.', '', op.splitext(f['filename'])[-1]) xdf = cache.open_callback['jsondata' if ext == 'json' else ext](buff) dfs.append(xdf) @@ -181,7 +183,8 @@ def _transform(self, data, **kwargs): return data def _predict(self, data=None, score_col=''): - if type(data) == np.ndarray: + import io + if type(data) == io.BytesIO: data = self.model.predict(data=data, mclass=self.store.load('class')) return data metric = self.get_argument('_metric', False) @@ -248,16 +251,14 @@ def get(self, *path_args, **path_kwargs): self.write(fout.read()) elif '_model' in self.args: self.write(json.dumps(self.model.get_params(), indent=2)) - elif len(self.request.files.keys()) and \ - self.request.files['image'][0].content_type in \ - ['image/jpeg', 'image/jpg', 'image/png']: - if '_action' in self.args and self.args['_action'] == 'predict': - import cv2 - import imutils - data = imutils.resize(cv2.imdecode(np.fromstring( - self.request.files['image'][0].body, np.uint8), cv2.IMREAD_UNCHANGED), - width=224) - data = cv2.resize(data, (224, 224)) + elif isinstance(self.model, ml.KerasApplication): + if 'training_data' in self.args: + data = self.args['training_data'] + training_results = yield gramex.service.threadpool.submit( + self._train, data=data) + self.write(json.dumps(training_results, indent=2, cls=CustomJSONEncoder)) + else: + data = self._parse_multipart_form_data() prediction = yield gramex.service.threadpool.submit( self._predict, data) self.write(json.dumps(prediction, indent=2, cls=CustomJSONEncoder)) @@ -289,73 +290,22 @@ def get(self, *path_args, **path_kwargs): def _append(self): self._parse_data(_cache=True, append=True) - def _train_keras(self, data): - import tensorflow as tf - from tensorflow.keras.optimizers import Adam - from tensorflow.keras.models import Sequential - from tensorflow.python.keras.layers import Dense, Flatten - import pathlib - data_dir = pathlib.Path(data) - - config_dir = op.join(gramex.config.variables['GRAMEXDATA'], 'apps', 'mlhandler', - slugify(self.name)) - - img_height, img_width = 224, 224 - batch_size = 32 - train_ds = tf.keras.preprocessing.image_dataset_from_directory( - data_dir, - validation_split=0.2, - subset="training", - seed=123, - image_size=(img_height, img_width), - batch_size=batch_size) - - val_ds = tf.keras.preprocessing.image_dataset_from_directory( - data_dir, - validation_split=0.2, - subset="validation", - seed=123, - image_size=(img_height, img_width), - batch_size=batch_size) - - class_names = train_ds.class_names - keras_model = Sequential() - pretrained_model = tf.keras.applications.ResNet50(include_top=False, - input_shape=(224, 224, 3), - pooling='avg', classes=5, - weights='imagenet') - for layer in pretrained_model.layers: - layer.trainable = False - - keras_model.add(pretrained_model) - keras_model.add(Flatten()) - keras_model.add(Dense(512, activation='relu')) - keras_model.add(Dense(5, activation='softmax')) - keras_model.compile(optimizer=Adam(lr=0.001), - loss='sparse_categorical_crossentropy', metrics=['accuracy']) - epochs = 1 - keras_model.fit( - train_ds, - validation_data=val_ds, - epochs=epochs - ) - with open(op.join(config_dir, 'class_names.json'), 'w') as fout: - json.dump(class_names, fout) - keras_model.save(config_dir) - return class_names - def _train(self, data=None): target_col = self.get_argument('target_col', self.store.load('target_col')) index_col = self.get_argument('index_col', self.store.load('index_col')) self.store.dump('target_col', target_col) - data = self._parse_data(False) if data is None else data - data = self._filtercols(data) - data = self._filterrows(data) - self.model = get_model( - self.store.load('class'), self.store.load('params'), - data=data, target_col=target_col, - nums=self.store.load('nums'), cats=self.store.load('cats') - ) + if isinstance(self.model, ml.KerasApplication): + result = self.model.fit(data, self.store.model_path) + return result + else: + data = self._parse_data(False) if data is None else data + data = self._filtercols(data) + data = self._filterrows(data) + self.model = get_model( + self.store.load('class'), self.store.load('params'), + data=data, target_col=target_col, + nums=self.store.load('nums'), cats=self.store.load('cats') + ) if not isinstance(self.model, ml.SklearnTransformer): target = data[target_col] train = data[[c for c in data if c not in (target_col, index_col)]] diff --git a/gramex/ml_api.py b/gramex/ml_api.py index a45d93d20..8044da5a3 100644 --- a/gramex/ml_api.py +++ b/gramex/ml_api.py @@ -46,9 +46,8 @@ "statsmodels.tsa.statespace.sarimax", ], "gramex.ml_api.HFTransformer": ["gramex.transformers"], - "gramex.ml_api.KerasApplications": [ - "tensorflow.keras.applications.vgg16", - "tensorflow.keras.applications.resnet50" + "gramex.ml_api.KerasApplication": [ + "tensorflow.keras.applications" ] } @@ -405,13 +404,6 @@ def _predict(self, X, **kwargs): class HFTransformer(SklearnModel): - @classmethod - def from_disk(cls, path, klass): - # Load model from disk - model = op.join(path, "model") - tokenizer = op.join(path, "tokenizer") - return cls(klass(model, tokenizer)) - def __init__(self, model, params=None, data=None, **kwargs): self.model = model if params is None: @@ -436,8 +428,15 @@ def _predict( text = X["text"] return self.model.predict(text) + @classmethod + def from_disk(cls, path, klass): + # Load model from disk + model = op.join(path, "model") + tokenizer = op.join(path, "tokenizer") + return cls(klass(model, tokenizer)) + -class KerasApplications(AbstractModel): +class KerasApplication(AbstractModel): def __init__(self, model, params=None, data=None, **kwargs): self.model = model if params is None: @@ -452,7 +451,14 @@ def from_disk(cls, path, klass): def predict(self, data=None, **kwargs): from tensorflow.keras.preprocessing import image - + import cv2 + import imutils + + data = imutils.resize(cv2.imdecode(np.fromstring(data.getvalue(), + np.uint8), + cv2.IMREAD_UNCHANGED), + width=224) + data = cv2.resize(data, (224, 224)) mclass, wrapper = search_modelclass(kwargs['mclass']) module_imp = __import__(mclass.__module__, fromlist=SEARCH_MODULES[wrapper]) model = mclass(include_top=True, @@ -470,8 +476,58 @@ def predict(self, data=None, **kwargs): results = module_imp.decode_predictions(preds) return results - def fit(self, *args, **kwargs): - super().fit(*args, **kwargs) + def fit(self, data, model_path, *args, **kwargs): + import tensorflow as tf + from tensorflow.keras.optimizers import Adam + from tensorflow.keras.models import Sequential + from tensorflow.python.keras.layers import Dense, Flatten + import pathlib + import json + + data_dir = pathlib.Path(data) + img_height, img_width = 224, 224 + batch_size = 32 + train_ds = tf.keras.preprocessing.image_dataset_from_directory( + data_dir, + validation_split=0.2, + subset="training", + seed=123, + image_size=(img_height, img_width), + batch_size=batch_size) + + val_ds = tf.keras.preprocessing.image_dataset_from_directory( + data_dir, + validation_split=0.2, + subset="validation", + seed=123, + image_size=(img_height, img_width), + batch_size=batch_size) + + class_names = train_ds.class_names + keras_model = Sequential() + pretrained_model = tf.keras.applications.ResNet50(include_top=False, + input_shape=(224, 224, 3), + pooling='avg', classes=5, + weights='imagenet') + for layer in pretrained_model.layers: + layer.trainable = False + + keras_model.add(pretrained_model) + keras_model.add(Flatten()) + keras_model.add(Dense(512, activation='relu')) + keras_model.add(Dense(5, activation='softmax')) + keras_model.compile(optimizer=Adam(lr=0.001), + loss='sparse_categorical_crossentropy', metrics=['accuracy']) + epochs = 1 + keras_model.fit( + train_ds, + validation_data=val_ds, + epochs=epochs + ) + with open(op.join(model_path, 'class_names.json'), 'w') as fout: + json.dump(class_names, fout) + keras_model.save(model_path) + return class_names def get_params(self, **kwargs): super().get_params(**kwargs) From b35a446110ac67ca1ad1ebd9af5f56546eaa47da Mon Sep 17 00:00:00 2001 From: "Radheya D. Kale" Date: Wed, 6 Jul 2022 13:30:50 +0530 Subject: [PATCH 4/6] Code optimisation * Removed unnecessary code from ModelStore --- gramex/handlers/mlhandler.py | 5 ++--- gramex/ml_api.py | 8 ++------ 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/gramex/handlers/mlhandler.py b/gramex/handlers/mlhandler.py index 17f910796..1e9dcd701 100644 --- a/gramex/handlers/mlhandler.py +++ b/gramex/handlers/mlhandler.py @@ -213,8 +213,7 @@ def _predict(self, data=None, score_col=''): def _check_model_path(self): try: klass, wrapper = ml.search_modelclass(self.store.load('class')) - if op.isdir(getattr(self.store, 'model_path', '')): - self.model = locate(wrapper).from_disk(self.store.model_path, klass=klass) + self.model = locate(wrapper).from_disk(self.store.model_path, klass=klass) except FileNotFoundError: raise HTTPError(NOT_FOUND, f'No model found at {self.store.model_path}') @@ -293,7 +292,7 @@ def _train(self, data=None): index_col = self.get_argument('index_col', self.store.load('index_col')) self.store.dump('target_col', target_col) if isinstance(self.model, ml.KerasApplication): - result = self.model.fit(data, self.store.model_path) + result = self.model.fit(data, self.kwargs['config_dir']) return result else: data = self._parse_data(False) if data is None else data diff --git a/gramex/ml_api.py b/gramex/ml_api.py index af5f93041..653ff0b44 100644 --- a/gramex/ml_api.py +++ b/gramex/ml_api.py @@ -206,12 +206,8 @@ class ModelStore(cache.JSONStore): def __init__(self, path, *args, **kwargs): _mkdir(path) - if op.exists(op.join(path, "data.h5")): - self.data_store = op.join(path, "data.h5") - self.model_path = op.join(path, op.basename(path) + ".pkl") - else: - self.data_store = path - self.model_path = path + self.data_store = op.join(path, "data.h5") + self.model_path = op.join(path, op.basename(path) + ".pkl") self.path = path super(ModelStore, self).__init__(op.join(path, "config.json"), *args, **kwargs) From f23b98a132b1577f2d92fc61965861ba2701d2fb Mon Sep 17 00:00:00 2001 From: "Radheya D. Kale" Date: Mon, 1 Aug 2022 15:24:03 +0530 Subject: [PATCH 5/6] WIP: Code refactoring * Model initialisation code moved to __init__ * Training added to POST request --- gramex/handlers/mlhandler.py | 34 +++++++++++++++++----------------- gramex/ml_api.py | 26 ++++++++++++++------------ 2 files changed, 31 insertions(+), 29 deletions(-) diff --git a/gramex/handlers/mlhandler.py b/gramex/handlers/mlhandler.py index 1e9dcd701..67a6dea23 100644 --- a/gramex/handlers/mlhandler.py +++ b/gramex/handlers/mlhandler.py @@ -98,7 +98,7 @@ def setup(cls, data=None, model={}, config_dir='', template=DEFAULT_TEMPLATE, ** if op.exists(cls.store.model_path): # If the pkl exists, load it if op.isdir(cls.store.model_path): mclass, wrapper = ml.search_modelclass(mclass) - cls.model = locate(wrapper).from_disk(mclass, cls.store.model_path) + cls.model = locate(wrapper).from_disk(path=cls.store.model_path, klass=mclass) else: cls.model = get_model(cls.store.model_path, {}) elif data is not None: @@ -187,7 +187,7 @@ def _transform(self, data, **kwargs): def _predict(self, data=None, score_col=''): import io - if type(data) == io.BytesIO: + if isinstance(data, io.BytesIO): data = self.model.predict(data=data, mclass=self.store.load('class')) return data metric = self.get_argument('_metric', False) @@ -254,16 +254,10 @@ def get(self, *path_args, **path_kwargs): elif '_model' in self.args: self.write(json.dumps(self.model.get_params(), indent=2)) elif isinstance(self.model, ml.KerasApplication): - if 'training_data' in self.args: - data = self.args['training_data'] - training_results = yield gramex.service.threadpool.submit( - self._train, data=data) - self.write(json.dumps(training_results, indent=2, cls=CustomJSONEncoder)) - else: - data = self._parse_multipart_form_data() - prediction = yield gramex.service.threadpool.submit( - self._predict, data) - self.write(json.dumps(prediction, indent=2, cls=CustomJSONEncoder)) + data = self._parse_multipart_form_data() + prediction = yield gramex.service.threadpool.submit( + self._predict, data) + self.write(json.dumps(prediction, indent=2, cls=CustomJSONEncoder)) else: try: data_args = {k: v for k, v in self.args.items() if not k.startswith('_')} @@ -326,11 +320,17 @@ def _score(self): @coroutine def post(self, *path_args, **path_kwargs): action = self.args.pop('_action', 'predict') - if action not in ACTIONS: - raise HTTPError(BAD_REQUEST, f'Action {action} not supported.') - res = yield gramex.service.threadpool.submit(getattr(self, f"_{action}")) - self.set_header('Content-Type', 'application/json') - self.write(json.dumps(res, indent=2, cls=CustomJSONEncoder)) + if 'training_data' in self.args and action == 'train': + data = self.args['training_data'] + training_results = yield gramex.service.threadpool.submit( + self._train, data=data) + self.write(json.dumps(training_results, indent=2, cls=CustomJSONEncoder)) + else: + if action not in ACTIONS: + raise HTTPError(BAD_REQUEST, f'Action {action} not supported.') + res = yield gramex.service.threadpool.submit(getattr(self, f"_{action}")) + self.set_header('Content-Type', 'application/json') + self.write(json.dumps(res, indent=2, cls=CustomJSONEncoder)) super(MLHandler, self).post(*path_args, **path_kwargs) @coroutine diff --git a/gramex/ml_api.py b/gramex/ml_api.py index 653ff0b44..c9ff5d229 100644 --- a/gramex/ml_api.py +++ b/gramex/ml_api.py @@ -207,7 +207,10 @@ class ModelStore(cache.JSONStore): def __init__(self, path, *args, **kwargs): _mkdir(path) self.data_store = op.join(path, "data.h5") - self.model_path = op.join(path, op.basename(path) + ".pkl") + if op.exists(op.join(path, op.basename(path) + ".pkl")): + self.model_path = op.join(path, op.basename(path) + ".pkl") + else: + self.model_path = path self.path = path super(ModelStore, self).__init__(op.join(path, "config.json"), *args, **kwargs) @@ -434,37 +437,36 @@ def from_disk(cls, path, klass): class KerasApplication(AbstractModel): def __init__(self, model, params=None, data=None, **kwargs): - self.model = model if params is None: params = {} self.params = params self.kwargs = kwargs + self.model = model(include_top=True, + weights="imagenet", + input_tensor=None, + input_shape=None, + pooling=None, + classes=1000) @classmethod def from_disk(cls, path, klass): # Load model from disk - return cls(path) + return cls(klass) def predict(self, data=None, **kwargs): from tensorflow.keras.preprocessing import image import PIL import io - mclass, wrapper = search_modelclass(kwargs['mclass']) - model = mclass(include_top=True, - weights="imagenet", - input_tensor=None, - input_shape=None, - pooling=None, - classes=1000) + mclass, _ = search_modelclass(kwargs['mclass']) preprocess_input = locate('preprocess_input', [mclass.__module__]) decode_predictions = locate('decode_predictions', [mclass.__module__]) data = PIL.Image.open(io.BytesIO(data.getvalue()))\ - .resize((model.input_shape[1], model.input_shape[2])) + .resize((self.model.input_shape[1], self.model.input_shape[2])) x = image.img_to_array(data) x = np.expand_dims(x, axis=0) x = preprocess_input(x) - preds = model.predict(x) + preds = self.model.predict(x) # decode the results into a list of tuples (class, description, probability) results = decode_predictions(preds) return results From 856be6b00c4ce687547a732c33939e77c85137f7 Mon Sep 17 00:00:00 2001 From: "Radheya D. Kale" Date: Tue, 2 Aug 2022 14:28:21 +0530 Subject: [PATCH 6/6] WIP --- gramex/handlers/mlhandler.py | 23 ++++++++++---- gramex/ml_api.py | 58 +++++++++++++++++++++--------------- 2 files changed, 52 insertions(+), 29 deletions(-) diff --git a/gramex/handlers/mlhandler.py b/gramex/handlers/mlhandler.py index 67a6dea23..364e8c5ac 100644 --- a/gramex/handlers/mlhandler.py +++ b/gramex/handlers/mlhandler.py @@ -138,6 +138,13 @@ def _parse_multipart_form_data(self): def _parse_application_json(self): return pd.read_json(self.request.body.decode('utf8')) + def _parse_image_jpeg(self): + from PIL import Image + buff = BytesIO(self.request.body) + return Image.open(buff) + + _parse_image_jpg = _parse_image_png = _parse_image_jpeg + def _parse_data(self, _cache=True, append=False): header = self.request.headers.get('Content-Type', '').split(';')[0] header = slugify(header).replace('-', '_') @@ -178,6 +185,9 @@ def _filterrows(cls, data, **kwargs): return data def _transform(self, data, **kwargs): + if not isinstance(data, (pd.DataFrame, pd.Series)): + return data + orgdata = self.store.load_data() for col in np.intersect1d(data.columns, orgdata.columns): data[col] = data[col].astype(orgdata[col].dtype) @@ -209,6 +219,8 @@ def _predict(self, data=None, score_col=''): except Exception as exc: app_log.exception(exc) return data + except AttributeError: + return self.model.predict(data) def _check_model_path(self): try: @@ -253,11 +265,11 @@ def get(self, *path_args, **path_kwargs): self.write(fout.read()) elif '_model' in self.args: self.write(json.dumps(self.model.get_params(), indent=2)) - elif isinstance(self.model, ml.KerasApplication): - data = self._parse_multipart_form_data() - prediction = yield gramex.service.threadpool.submit( - self._predict, data) - self.write(json.dumps(prediction, indent=2, cls=CustomJSONEncoder)) + # elif isinstance(self.model, ml.KerasApplication): + # data = self._parse_multipart_form_data() + # prediction = yield gramex.service.threadpool.submit( + # self._predict, data) + # self.write(json.dumps(prediction, indent=2, cls=CustomJSONEncoder)) else: try: data_args = {k: v for k, v in self.args.items() if not k.startswith('_')} @@ -324,6 +336,7 @@ def post(self, *path_args, **path_kwargs): data = self.args['training_data'] training_results = yield gramex.service.threadpool.submit( self._train, data=data) + self.set_header('Content-Type', 'application/json') self.write(json.dumps(training_results, indent=2, cls=CustomJSONEncoder)) else: if action not in ACTIONS: diff --git a/gramex/ml_api.py b/gramex/ml_api.py index c9ff5d229..79aa6b6a9 100644 --- a/gramex/ml_api.py +++ b/gramex/ml_api.py @@ -14,7 +14,7 @@ from sklearn.base import BaseEstimator from sklearn.compose import ColumnTransformer from sklearn.pipeline import Pipeline -from sklearn.preprocessing import OneHotEncoder, StandardScaler +from sklearn.preprocessing import OneHotEncoder, StandardScaler, LabelEncoder TRANSFORMS = { "include": [], @@ -437,38 +437,48 @@ def from_disk(cls, path, klass): class KerasApplication(AbstractModel): def __init__(self, model, params=None, data=None, **kwargs): + from tensorflow.keras.models import load_model if params is None: params = {} self.params = params self.kwargs = kwargs - self.model = model(include_top=True, - weights="imagenet", - input_tensor=None, - input_shape=None, - pooling=None, - classes=1000) + self.preprocess_input = locate('preprocess_input', [model.__module__]) + self.decode_predictions = locate('decode_predictions', [model.__module__]) + self.model_path = kwargs['path'] + + try: + self.model = load_model(self.model_path) + self.custom_model = True + except OSError: + self.custom_model = False + self.model = model(include_top=True, + weights="imagenet", + input_tensor=None, + input_shape=None, + pooling=None, + classes=1000) @classmethod def from_disk(cls, path, klass): # Load model from disk - return cls(klass) + return cls(klass, path=path) def predict(self, data=None, **kwargs): from tensorflow.keras.preprocessing import image - import PIL - import io - - mclass, _ = search_modelclass(kwargs['mclass']) - preprocess_input = locate('preprocess_input', [mclass.__module__]) - decode_predictions = locate('decode_predictions', [mclass.__module__]) - data = PIL.Image.open(io.BytesIO(data.getvalue()))\ - .resize((self.model.input_shape[1], self.model.input_shape[2])) + + _, height, width, _ = self.model.input_shape + data = data.resize((height, width)) x = image.img_to_array(data) x = np.expand_dims(x, axis=0) - x = preprocess_input(x) + x = self.preprocess_input(x) preds = self.model.predict(x) - # decode the results into a list of tuples (class, description, probability) - results = decode_predictions(preds) + if not self.custom_model: + # decode the results into a list of tuples (class, description, probability) + results = self.decode_predictions(preds)[0][0][1] + else: + # If the model is trained, provide it with the relevant class names + class_names = joblib.load(op.join(self.model_path, 'class_names.pkl')) + results = class_names.inverse_transform([np.argmax(preds)])[0] return results def fit(self, data, model_path, *args, **kwargs): @@ -477,7 +487,6 @@ def fit(self, data, model_path, *args, **kwargs): from tensorflow.keras.models import Sequential from tensorflow.python.keras.layers import Dense, Flatten import pathlib - import json data_dir = pathlib.Path(data) img_height, img_width = 224, 224 @@ -502,7 +511,7 @@ def fit(self, data, model_path, *args, **kwargs): keras_model = Sequential() pretrained_model = tf.keras.applications.ResNet50(include_top=False, input_shape=(224, 224, 3), - pooling='avg', classes=5, + pooling='avg', classes=len(class_names), weights='imagenet') for layer in pretrained_model.layers: layer.trainable = False @@ -510,7 +519,7 @@ def fit(self, data, model_path, *args, **kwargs): keras_model.add(pretrained_model) keras_model.add(Flatten()) keras_model.add(Dense(512, activation='relu')) - keras_model.add(Dense(5, activation='softmax')) + keras_model.add(Dense(len(class_names), activation='softmax')) keras_model.compile(optimizer=Adam(lr=0.001), loss='sparse_categorical_crossentropy', metrics=['accuracy']) epochs = 1 @@ -519,9 +528,10 @@ def fit(self, data, model_path, *args, **kwargs): validation_data=val_ds, epochs=epochs ) - with open(op.join(model_path, 'class_names.json'), 'w') as fout: - json.dump(class_names, fout) + le = LabelEncoder() + le.fit(class_names) keras_model.save(model_path) + joblib.dump(le, op.join(self.model_path, 'class_names.pkl')) return class_names def get_params(self, **kwargs):