From 8d42ca5581866276f9c0c430f4dcf4f81039bb35 Mon Sep 17 00:00:00 2001 From: PythonFZ Date: Fri, 30 Jul 2021 21:51:42 +0200 Subject: [PATCH 1/2] add a kwargs 'benchmark' to get the RDF time together with some other info at the end. --- mdsuite/calculators/radial_distribution_function.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/mdsuite/calculators/radial_distribution_function.py b/mdsuite/calculators/radial_distribution_function.py index fd0c8691..91501789 100644 --- a/mdsuite/calculators/radial_distribution_function.py +++ b/mdsuite/calculators/radial_distribution_function.py @@ -194,6 +194,7 @@ def __call__(self, # kwarg parsing self.use_tf_function = kwargs.pop("use_tf_function", False) self.override_n_batches = kwargs.get('batches') + self.benchmark = kwargs.get('benchmark', False) self.tqdm_limit = kwargs.pop('tqdm', 10) # if there are more batches than in that limit it will show the batch tqdm, otherwise # it will show multiple minibatch tqdms @@ -476,7 +477,8 @@ def run_minibatch_loop(): execution_time = 0 batch_tqm = self.tqdm_limit > self.n_batches - + if self.benchmark: + full_timer = timer() for idx, sample_configuration in tqdm(enumerate(np.array_split(self.sample_configurations, self.n_batches)), ncols=70, disable=batch_tqm): log.debug('Loading Data') @@ -496,7 +498,13 @@ def run_minibatch_loop(): execution_time += timer() - start log.debug('Calculation done') - + if self.benchmark: + full_time = timer() - full_timer + log.warning(f"RDF execution time: {full_time} s") + n_bins = len(self.sample_configurations) * self.experiment.number_of_atoms * ( + self.experiment.number_of_atoms - 1) / 2 + log.warning(f"Sampled {len(self.sample_configurations)} with {self.experiment.number_of_atoms} atoms") + log.warning(f"Total number of computed bins is {n_bins} - {n_bins / full_time / 1e9} GHz") self.rdf.update({key: np.array(val.numpy(), dtype=np.float) for key, val in self.rdf.items()}) log.debug(f"RDF execution time: {execution_time} s") From f9d34db025ddc72d9daa00199142723670cfefb4 Mon Sep 17 00:00:00 2001 From: PythonFZ Date: Mon, 2 Aug 2021 20:01:54 +0200 Subject: [PATCH 2/2] Further optimize the RDF calculator. --- .../radial_distribution_function.py | 333 +++++++++++------- mdsuite/utils/linalg.py | 1 + 2 files changed, 199 insertions(+), 135 deletions(-) diff --git a/mdsuite/calculators/radial_distribution_function.py b/mdsuite/calculators/radial_distribution_function.py index 91501789..73427aff 100644 --- a/mdsuite/calculators/radial_distribution_function.py +++ b/mdsuite/calculators/radial_distribution_function.py @@ -103,8 +103,6 @@ def __init__(self, experiment: Experiment): self._dtype = tf.float32 # Arguments set by the user in __call__ - self.number_of_bins = None - self.cutoff = None self.start = None self.stop = None self.number_of_configurations = None @@ -113,13 +111,20 @@ def __init__(self, experiment: Experiment): self.use_tf_function = None self.override_n_batches = None self.index_list = None - self.bin_range = None self.sample_configurations = None self.key_list = None self.rdf = None self.correct_minibatch_batching = None # split the minibatches into equal sized chunks to use maximum computing and memory resources + # For large systems this can cause ResourceExhaustedError, so it especially helps with smaller systems. + + # Properties + self._box_array = None + self._bin_range = None + self._number_of_bins = None + self._cutoff = None + self._species_combinations = None def __call__(self, plot=True, @@ -196,6 +201,9 @@ def __call__(self, self.override_n_batches = kwargs.get('batches') self.benchmark = kwargs.get('benchmark', False) self.tqdm_limit = kwargs.pop('tqdm', 10) + self.correct_minibatch_batching = kwargs.pop('correct_minibatch_batching', None) + # 100 seems to be a good value for most systems + # if there are more batches than in that limit it will show the batch tqdm, otherwise # it will show multiple minibatch tqdms @@ -217,7 +225,6 @@ def _initialize_rdf_parameters(self): ------- Updates class attributes. """ - self.bin_range = [0, self.cutoff] self.index_list = [i for i in range(len(self.species))] # Get the indices of the species self.sample_configurations = np.linspace(self.start, @@ -260,10 +267,6 @@ def _check_input(self): if self.molecules: self.species = list(self.experiment.molecules) - if self.gpu: - self.correct_minibatch_batching = 100 - # 100 seems to be a good value for most systems - def _load_positions(self, indices: list) -> tf.Tensor: """ Load the positions matrix @@ -386,112 +389,114 @@ def _correct_batch_properties(self): if self.override_n_batches is not None: self.n_batches = self.override_n_batches - def mini_calculate_histograms(self): - """Do the minibatch calculation""" + def run_minibatch_loop(self, per_atoms_ds, idx, batch_tqdm, n_atoms, positions_tensor): + """Run a minibatch loop""" + minibatch_start = tf.constant(0) + stop = tf.constant(0) + rdf = {name: tf.zeros(self.number_of_bins, dtype=tf.int32) for name in self.key_list} - def combine_dictionaries(dict_a, dict_b): - """Combine two dictionaries in a tf.function call""" - out = dict() - for key in dict_a: - out[key] = dict_a[key] + dict_b[key] - return out - - def run_minibatch_loop(): - """Run a minibatch loop""" - minibatch_start = tf.constant(0) - stop = tf.constant(0) - rdf = {name: tf.zeros(self.number_of_bins, dtype=tf.int32) for name in self.key_list} - - if self.correct_minibatch_batching is not None: - # per_atoms_ds with shape (configurations, 3) + @tf.function(experimental_relax_shapes=True) + def func(atoms, minibatch_start, n_atoms, positions_tensor, box_array): + indices = get_partial_triu_indices(n_atoms, atoms_per_batch, minibatch_start) + d_ij = self.get_dij(indices, positions_tensor, atoms, box_array) + return indices, d_ij + + if self.correct_minibatch_batching is not None: + # per_atoms_ds with shape (configurations, 3) + log.warning("Using experimental batch size correction for a better load balance over batches!") + + points = tf.range(1, 1000, dtype=tf.float32) + # I think it is impossible to reach 1000, so this should be more than enough + scaling = tf.cast(self.correct_minibatch_batching, dtype=tf.float32) * 0.51 ** points + # Be safe and go for 0.51 instead of 0.5 - this uses a geometric series to compute, when to increase the + # size of the dataset for a better load balancing between batches. + scaling = tf.cast(tf.math.ceil(scaling[scaling > 1]), dtype=tf.int32) + scaling_values = [] + for scale, fact in zip(scaling, points): + scaling_values.append(tf.ones(scale) * fact) + + scaling_values = tf.cast(tf.concat(scaling_values, axis=0), dtype=tf.int64) + + def new_ds_generator(): + """Convert for loop into generator to be able to use prefetching""" + if not batch_tqdm: + log.info("Progress bar speeds up over time. The initial estimate will always decrease!") corrected_ds = per_atoms_ds.batch(int(len(per_atoms_ds) / self.correct_minibatch_batching)) - # math.stackexchange.com/questions/107269/how-do-you-split-a-90-45-45-triangle-into-equal-area-strips - for jdx, corrected_per_atoms_ds in tqdm(enumerate(corrected_ds), ncols=70, disable=not batch_tqm, - total=self.correct_minibatch_batching, - desc=f"Mini batch {idx + 1}/{self.n_batches}"): - pre_factor = np.sqrt(jdx + 1) - new_ds = tf.data.Dataset.from_tensor_slices(corrected_per_atoms_ds) - new_ds = new_ds.batch(int(pre_factor * self.minibatch)) - log.debug(f'batch size: {int(pre_factor * self.minibatch)} ({self.minibatch} * {pre_factor})') - for atoms in new_ds: - atoms_per_batch, batch_size, _ = tf.shape(atoms) - stop += atoms_per_batch - start_time = timer() - indices = get_partial_triu_indices(n_atoms, atoms_per_batch, minibatch_start) - log.debug(f'Calculating indices took {timer() - start_time} s') - - start_time = timer() - d_ij = self.get_dij(indices, positions_tensor, atoms, - tf.cast(self.experiment.box_array, dtype=self.dtype)) - exec_time = timer() - start_time - atom_pairs_per_second = tf.cast(tf.shape(indices)[1], dtype=self.dtype) / exec_time / 10 ** 6 - atom_pairs_per_second *= tf.cast(batch_size, dtype=self.dtype) - log.debug(f'Computing d_ij took {exec_time} s ' - f'({atom_pairs_per_second:.1f} million atom pairs / s)') - - start_time = timer() - minibatch_rdf = self.compute_species_values(indices, minibatch_start, d_ij) - log.debug(f'Computing species values took {timer() - start_time} s') - - start_time = timer() - rdf = combine_dictionaries(rdf, minibatch_rdf) - log.debug(f'Updating dictionaries took {timer() - start_time} s') - - minibatch_start = stop - - return rdf + iterator = tqdm(enumerate(corrected_ds), ncols=70, disable=not batch_tqdm, + total=self.correct_minibatch_batching, + desc=f"Mini batch {idx + 1}/{self.n_batches} ({self.minibatch})") + + for jdx, corrected_per_atoms_ds in iterator: + corrected_batchsize = self.minibatch * scaling_values[jdx] + iterator.set_description(f"Mini batch {idx + 1}/{self.n_batches} ({corrected_batchsize})") + ds = tf.data.Dataset.from_tensor_slices(corrected_per_atoms_ds) + ds = ds.batch(corrected_batchsize) + for value in ds: + yield value + + new_ds = tf.data.Dataset.from_generator( + new_ds_generator, + output_signature=(tf.TensorSpec(shape=(None, None, None), dtype=self.dtype)) + ) - else: - for atoms in tqdm(per_atoms_ds.batch(self.minibatch).prefetch(tf.data.AUTOTUNE), ncols=70, - disable=not batch_tqm, desc=f"Running mini batch loop {idx + 1} / {self.n_batches}"): - # I assume this code can be removed, because the corrected version is almost always better - # If one still wants to use the uncorrected version, we could make that an option, but - # I don't see why. The only downside of the other method is, that a new dataset is created - # every loop which can be cause a slow down, but I don't think it causes memory issues. - atoms_per_batch, batch_size, _ = tf.shape(atoms) - stop += atoms_per_batch - start_time = timer() - indices = get_partial_triu_indices(n_atoms, atoms_per_batch, minibatch_start) - log.debug(f'Calculating indices took {timer() - start_time} s') - - start_time = timer() - d_ij = self.get_dij(indices, positions_tensor, atoms, - tf.cast(self.experiment.box_array, dtype=self.dtype)) - exec_time = timer() - start_time - atom_pairs_per_second = tf.cast(tf.shape(indices)[1], dtype=self.dtype) / exec_time / 10 ** 6 - atom_pairs_per_second *= tf.cast(batch_size, dtype=self.dtype) - log.debug(f'Computing d_ij took {exec_time} s ' - f'({atom_pairs_per_second:.1f} million atom pairs / s)') - - start_time = timer() - minibatch_rdf = self.compute_species_values(indices, minibatch_start, d_ij) - log.debug(f'Computing species values took {timer() - start_time} s') - - start_time = timer() - rdf = combine_dictionaries(rdf, minibatch_rdf) - log.debug(f'Updating dictionaries took {timer() - start_time} s') - - minibatch_start = stop - return rdf + for atoms in new_ds.prefetch(tf.data.AUTOTUNE): + atoms_per_batch, batch_size, _ = tf.shape(atoms) + stop += atoms_per_batch + + indices, d_ij = func(atoms, minibatch_start, n_atoms, positions_tensor, self.box_array) + minibatch_rdf = self.compute_species_values(indices, minibatch_start, d_ij) + + rdf = self.combine_dictionaries(rdf, minibatch_rdf) + + minibatch_start = stop + + return rdf + + else: + for atoms in tqdm(per_atoms_ds.batch(self.minibatch).prefetch(tf.data.AUTOTUNE), ncols=70, + disable=not batch_tqdm, desc=f"Running mini batch loop {idx + 1} / {self.n_batches}"): + # I assume this code can be removed, because the corrected version is almost always better + # If one still wants to use the uncorrected version, we could make that an option, but + # I don't see why. The only downside of the other method is, that a new dataset is created + # every loop which can be cause a slow down, but I don't think it causes memory issues. + atoms_per_batch, batch_size, _ = tf.shape(atoms) + stop += atoms_per_batch + indices = get_partial_triu_indices(n_atoms, atoms_per_batch, minibatch_start) + + d_ij = self.get_dij(indices, positions_tensor, atoms, self.box_array) + + minibatch_rdf = self.compute_species_values(indices, minibatch_start, d_ij) + + rdf = self.combine_dictionaries(rdf, minibatch_rdf) + + minibatch_start = stop + return rdf + + def mini_calculate_histograms(self): + """Do the minibatch calculation""" execution_time = 0 - batch_tqm = self.tqdm_limit > self.n_batches - if self.benchmark: - full_timer = timer() + batch_tqdm = self.tqdm_limit > self.n_batches + full_timer = timer() + data_loading_time = 0 + for idx, sample_configuration in tqdm(enumerate(np.array_split(self.sample_configurations, self.n_batches)), - ncols=70, disable=batch_tqm): + ncols=70, disable=batch_tqdm): log.debug('Loading Data') + data_loading_timer = timer() positions_tensor = self._load_positions(sample_configuration) + log.debug('Data loaded - creating dataset') per_atoms_ds = tf.data.Dataset.from_tensor_slices(positions_tensor) # create dataset of atoms from shape (n_atoms, n_timesteps, 3) n_atoms = tf.shape(positions_tensor)[0] log.debug('Starting calculation') + data_loading_time += timer() - data_loading_timer start = timer() - _rdf = run_minibatch_loop() + _rdf = self.run_minibatch_loop(per_atoms_ds, idx, batch_tqdm, n_atoms, positions_tensor) for key in self.rdf: self.rdf[key] += _rdf[key] @@ -499,12 +504,14 @@ def run_minibatch_loop(): execution_time += timer() - start log.debug('Calculation done') if self.benchmark: - full_time = timer() - full_timer + log.warning(f"Full execution time: {timer() - full_timer} s - data loading time {data_loading_time} s") + full_time = timer() - full_timer - data_loading_time log.warning(f"RDF execution time: {full_time} s") n_bins = len(self.sample_configurations) * self.experiment.number_of_atoms * ( - self.experiment.number_of_atoms - 1) / 2 - log.warning(f"Sampled {len(self.sample_configurations)} with {self.experiment.number_of_atoms} atoms") - log.warning(f"Total number of computed bins is {n_bins} - {n_bins / full_time / 1e9} GHz") + self.experiment.number_of_atoms - 1) / 2 + log.warning(f"Sampled {len(self.sample_configurations)} configurations " + f"with {self.experiment.number_of_atoms} atoms each") + log.warning(f"Total number of computed bins is {int(n_bins)} - {n_bins / full_time / 1e9:.4f} GHz") self.rdf.update({key: np.array(val.numpy(), dtype=np.float) for key, val in self.rdf.items()}) log.debug(f"RDF execution time: {execution_time} s") @@ -535,72 +542,86 @@ def compute_species_values(self, indices: tf.Tensor, start_batch, d_ij: tf.Tenso """ rdf = {name: tf.zeros(self.number_of_bins, dtype=tf.int32) for name in self.key_list} - indices = tf.transpose(indices) - particles_list = self.particles_list - - for tuples in itertools.combinations_with_replacement(self.index_list, 2): - names = self._get_species_names(tuples) - start_ = tf.concat( - [sum(particles_list[:tuples[0]]) - start_batch, sum(particles_list[:tuples[1]])], - axis=0 - ) - stop_ = start_ + tf.constant([particles_list[tuples[0]], particles_list[tuples[1]]]) + for start_, stop_, names in self.species_combinations: rdf[names] = self.bin_minibatch(start_, stop_, indices, d_ij, - tf.cast(self.bin_range, dtype=self.dtype), - tf.cast(self.number_of_bins, dtype=tf.int32), - tf.cast(self.cutoff, dtype=self.dtype)) + start_batch, + self.bin_range, + self.number_of_bins, + self.cutoff) return rdf + @property + def species_combinations(self): + if self._species_combinations is None: + particles_list = self.particles_list + start_lst = [] + stop_lst = [] + names_lst = [] + for tuples in itertools.combinations_with_replacement(self.index_list, 2): + names_lst.append(self._get_species_names(tuples)) + start_ = tf.concat( + [sum(particles_list[:tuples[0]]), sum(particles_list[:tuples[1]])], + axis=0 + ) + start_lst.append(start_) + stop_ = start_ + tf.constant([particles_list[tuples[0]], particles_list[tuples[1]]]) + stop_lst.append(stop_) + self._species_combinations = (start_lst, stop_lst, names_lst) + return zip(*self._species_combinations) + @staticmethod @tf.function(experimental_relax_shapes=True) - def bin_minibatch(start, stop, indices, d_ij, bin_range, number_of_bins, cutoff) -> tf.Tensor: + def bin_minibatch(start, stop, indices, d_ij, start_batch, bin_range, number_of_bins, cutoff) -> tf.Tensor: """Compute the minibatch histogram""" # select the indices that are within the boundaries of the current species / molecule - mask_1 = (indices[:, 0] > start[0]) & (indices[:, 0] < stop[0]) - mask_2 = (indices[:, 1] > start[1]) & (indices[:, 1] < stop[1]) + mask_1 = (indices[0, :] > start[0] - start_batch) & (indices[0, :] < stop[0] - start_batch) + mask_2 = (indices[1, :] > start[1]) & (indices[1, :] < stop[1]) - values_species = tf.boolean_mask(d_ij, mask_1 & mask_2, axis=0) - values = apply_system_cutoff(values_species, cutoff) + values = tf.boolean_mask(d_ij, mask_1 & mask_2, axis=0) + # values = apply_system_cutoff(values, cutoff) bin_data = tf.histogram_fixed_width( values=values, value_range=bin_range, - nbins=number_of_bins - ) - + nbins=number_of_bins + 1 + )[:-1] + # adding one bin at the end and removing it is almost identical to applying a cutoff but faster. + # We might have to check if it affects some calculations based on the RDF. + # Maybe we can increase the bin range by the percentage the cutoff value would take so that they + # are truly identical return bin_data @staticmethod @tf.function(experimental_relax_shapes=True) def get_dij(indices, positions_tensor, atoms, box_array): """Compute the distance matrix for the minibatch""" - start_time = timer() - log.debug(f'Calculating indices took {timer() - start_time} s') + # start_time = timer() + # log.debug(f'Calculating indices took {timer() - start_time} s') # apply the mask to this, to only get the triu values and don't compute anything twice - start_time = timer() + # start_time = timer() _positions = tf.gather(positions_tensor, indices[1], axis=0) - log.debug(f'Gathering positions_tensor took {timer() - start_time} s') + # log.debug(f'Gathering positions_tensor took {timer() - start_time} s') # for atoms_per_batch > 1, flatten the array according to the positions - start_time = timer() + # start_time = timer() atoms_position = tf.gather(atoms, indices[0], axis=0) - log.debug(f'Gathering atoms took {timer() - start_time} s') + # log.debug(f'Gathering atoms took {timer() - start_time} s') - start_time = timer() + # start_time = timer() r_ij = _positions - atoms_position - log.debug(f'Computing r_ij took {timer() - start_time} s') + # log.debug(f'Computing r_ij took {timer() - start_time} s') # apply minimum image convention - start_time = timer() + # start_time = timer() if box_array is not None: r_ij = apply_minimum_image(r_ij, box_array) - log.debug(f'Applying minimum image convention took {timer() - start_time} s') + # log.debug(f'Applying minimum image convention took {timer() - start_time} s') - start_time = timer() + # start_time = timer() d_ij = tf.linalg.norm(r_ij, axis=-1) - log.debug(f'Computing d_ij took {timer() - start_time} s') + # log.debug(f'Computing d_ij took {timer() - start_time} s') return d_ij @@ -707,7 +728,7 @@ def _piecewise(data: np.array) -> np.array: _correction_1(split_2[0]), _correction_2(split_2[1]))) - bin_width = self.cutoff / self.number_of_bins + bin_width = self.cutoff / tf.cast(self.number_of_bins, dtype=self.dtype) bin_edges = np.linspace(0.0, self.cutoff, self.number_of_bins) return _piecewise(np.array(bin_edges)) * bin_width @@ -754,3 +775,45 @@ def _update_output_signatures(self): """ pass + + @staticmethod + def combine_dictionaries(dict_a, dict_b): + """Combine two dictionaries in a tf.function call""" + out = dict() + for key in dict_a: + out[key] = dict_a[key] + dict_b[key] + return out + + @property + def box_array(self): + if self._box_array is None: + self._box_array = tf.cast(self.experiment.box_array, dtype=self.dtype) + return self._box_array + + @property + def bin_range(self): + if self._bin_range is None: + self._bin_range = tf.cast([0, self.cutoff], dtype=self.dtype) + return self._bin_range + + @property + def number_of_bins(self): + return self._number_of_bins + + @number_of_bins.setter + def number_of_bins(self, value): + try: + self._number_of_bins = tf.cast(value, dtype=tf.int32) + except ValueError: + pass + + @property + def cutoff(self): + return self._cutoff + + @cutoff.setter + def cutoff(self, value): + try: + self._cutoff = tf.cast(value, dtype=self.dtype) + except ValueError: + pass diff --git a/mdsuite/utils/linalg.py b/mdsuite/utils/linalg.py index c22acda4..83b8c502 100644 --- a/mdsuite/utils/linalg.py +++ b/mdsuite/utils/linalg.py @@ -77,6 +77,7 @@ def apply_minimum_image(r_ij, box_array): return r_ij - tf.math.rint(r_ij / box_array) * box_array +@tf.function(experimental_relax_shapes=True) def get_partial_triu_indices(n_atoms: int, m_atoms: int, idx: int) -> tf.Tensor: """Calculate the indices of a slice of the triu values