Source code for mlc_func.ml.force_network

""" Module that implements Force_Network, the machine learned correcting functional (MLCF)
for forces
"""

import mlc_func.elf as elf
import numpy as np
import pandas as pd
from keras.models import Sequential
from keras.layers import Dense
from keras import regularizers
import keras
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import os
import pickle
import h5py
import json
from ase.io import read

[docs]class Force_Network(): """ MLCF for force perdiction Parameters ---------- species: str chemical element symbol scaler: sklearn Scaler basis: dict basis that was used to create electronic descriptors datasets: dict datasets provided as {'X_train': np.ndarray, 'X_test': etc...} mask: list of bool used to mask the features and filter out features with low variance n_layers: int number of hidden layers, default = 3 nodes_per_layer: int nodes for each hidden layer, default = 8 b: float l2-regularization strenght, default = 0 """ def __init__(self, species, scaler, basis, datasets = {}, mask = [], n_layers = 3, nodes_per_layer = 8, b = 0): self.species = species self.n_layers = n_layers self.nodes_per_layer = nodes_per_layer self.datasets = datasets self.compiled = False self.learning_rate = 0.001 self.model = None self.optimizer = keras.optimizers.Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=None, decay=0.0, amsgrad=False) self.b = b if not len(datasets) == 0: self.X_train = datasets['X_train'] self.X_test = datasets['X_test'] self.X_valid = datasets['X_valid'] self.y_train = datasets['y_train'] self.y_test = datasets['y_test'] self.y_valid = datasets['y_valid'] else: self.X_train = None self.scaler = scaler self.basis = basis if len(mask) == 0: mask = [True]*self.X_train.shape[1] else: self.mask = mask if not (scaler == None or basis == None): self.__build_model() def __build_model(self, override = False): if self.compiled and not override: raise Exception('Model already compiled! Set override = True to proceed.') s = self.nodes_per_layer self.model = Sequential() self.model.add(Dense(units=s, activation='sigmoid', kernel_regularizer=regularizers.l2(self.b), input_dim=self.X_train.shape[1])) for _ in range(self.n_layers-1): self.model.add(Dense(units=s, activation='sigmoid', kernel_regularizer=regularizers.l2(self.b))) self.model.add(Dense(units=3, activation='linear')) def __compile_model(self, override = False): if self.compiled and not override: raise Exception('Model already compiled! Set override = True to proceed.') else: self.model.compile(loss='mean_squared_error', optimizer=self.optimizer, metrics=['accuracy']) self.compiled = True
[docs] def train(self, step_size=0.001, max_epochs=50001, b=0, early_stopping = False, batch_size = 500, epochs_per_output = 500, restart = False, tol_train = 0, tol_valid = 0): """ Train the model Parameters ----------- step_size: float step size to take during gradient descent, default=0.001 max_epochs: int max. number of epochs to train, default=50001 b: float l2-regularization early_stopping: bool use early stopping (interrupt training once valid loss increases), default=False batch_size: int number of samples per batch, default=500 epochs_per_output: int only print overview every epochs_per_output steps, default=500 restart: bool restart training from beginning (reset network), default=False tol_train: float stop training if relative value of training loss decreases by less than this value tol_valid: float stop training if relative value of validation loss decreases by less than this value Returns ---- None """ if not self.compiled or restart: self.learning_rate = step_size self.b = b self.__build_model(override = True) self.__compile_model(override=True) elif step_size != self.learning_rate or b != self.b: if not restart: raise Exception('Step size and regularization can not be changed after training was started, please set restart = True') last_train = 1e8 last_valid = 1e8 for i in range(int(max_epochs/epochs_per_output)): train_loss = np.sqrt(self.model.evaluate(self.X_train, self.y_train, verbose = 0)[0]) valid_loss = np.sqrt(self.model.evaluate(self.X_valid, self.y_valid, verbose = 0)[0]) if train_loss < last_train and valid_loss > last_valid and early_stopping: break #Early stopping elif ((last_train - train_loss)/last_train) < tol_train: return 0 elif ((last_valid - valid_loss)/last_valid) < tol_valid: return 0 else: last_train = train_loss last_valid = valid_loss print('--------Epoch = {}----------'.format(i*epochs_per_output)) print('Training loss || Validation loss') print( '{:13.6f} || {:13.6f}'.format(train_loss, valid_loss)) self.model.fit(self.X_train, self.y_train, epochs=epochs_per_output, batch_size=batch_size, verbose=0)
[docs] def predict(self, feat, processed = False): """ Get predicted forces Parameters ---------- feat: np.ndarray input features processed: bool are features processed (scaled, masked)? Returns ------- np.ndarray predicted forces """ if not processed: return self.model.predict(self.scaler.transform(feat[:,self.mask])) else: return self.model.predict(feat)
[docs] def evaluate(self, plot = False, on = 'test'): """ Evaluate model performance Parameters -------- plot: bool plot correlation plots on: str {'test','train','valid'} which set to evaluat on Returns -------- dict containing rmse, mae and max. abs. error """ X, y = {'train':[self.X_train, self.y_train], 'valid':[self.X_valid, self.y_valid], 'test':[self.X_test, self.y_test]}[on] prediction = self.predict(X, True) error = y - prediction rmse = np.sqrt(np.mean(error**2)) mae = np.mean(np.abs(error)) max = np.max(np.abs(error)) print('======== Evaluation on ' + on +' set =============\n\ RMSE = {:5.4f}\n\ MAE = {:5.4f}\n\ Max. abs. error = {:5.4f}'.format(rmse, mae, max)) if plot: plt.figure() for i, l in enumerate(['x','y','z']): plt.plot(y[:,i], prediction[:,i], ls = '', marker = '.', label = l) plt.xlabel('Expected') plt.ylabel('Predicted') plt.legend() plt.show() return {'rmse' : rmse, 'mae': mae, 'max': max}
[docs] def save_all(self, net_dir, override = False): """ Save force MLCF Parameters ----------- net_dir: str directory to save mlcf to override: bool if net_dir already contains model, allow to override? default = False Returns ------- None """ if net_dir[-1] != '/': net_dir += '/' to_save = {'mask': self.mask, 'scaler': self.scaler, 'basis': self.basis} if not os.path.exists(net_dir): os.mkdir(net_dir) if os.path.exists(net_dir + 'force_' + self.species) and not override: raise Exception('Already exists, to proceed set override = True') else: pickle.dump(to_save, open(net_dir + 'supp_' + self.species, 'wb')) self.model.save(net_dir + 'force_' + self.species)
[docs] def load_all(self, net_dir): """ Load force MLCF from net_dir Parameters ---------- net_dir: str path to directory containing MLCF """ supp = pickle.load(open(net_dir + 'supp_' + self.species, 'rb')) self.model = keras.models.load_model(net_dir + 'force_' + self.species) self.mask = supp['mask'] self.scaler = supp['scaler'] self.basis = supp['basis'] self.compiled = True
[docs] def learning_curve(self, steps = 5): """Create a learning curve by varying the training set size Parameters ----------- steps: int how many different training set sizes to use Returns --------- dict, {'N': training set size,'train': training loss, 'valid': validation loss} """ tot_len = len(self.X_train) save_X, save_y = np.array(self.X_train), np.array(self.y_train) chunk_size = np.floor(tot_len/steps).astype(int) N = [] train_rmse = [] valid_rmse = [] for s in range(steps+1): train_size = int(tot_len/(2**steps)*2**s) N.append(train_size) self.X_train = save_X[:train_size] self.y_train = save_y[:train_size] print('Size = {}'.format(len(self.X_train))) self.train(batch_size = np.ceil((len(self.X_train)/100)).astype(int), epochs_per_output = 100, tol_train = 1e-2, tol_valid = 1e-2) train_rmse.append(self.evaluate(on='train')['rmse']) valid_rmse.append(self.evaluate(on='valid')['rmse']) self.X_train = save_X self.y_train = save_y return {'N': N, 'train': train_rmse, 'valid': valid_rmse}
[docs] def predict_from_hdf5(self, path): """ Get force prediction but instead of providing features, give source path where features are found Parameters ---------- path: str path to .hdf5 file containing features Returns: ------- np.ndarray force prediction """ elfs, angles = elf.utils.hdf5_to_elfs_fast(path) if self.species in elfs: species = self.species n_samples = len(elfs[species]) elfs[species] = elfs[species].reshape(-1,elfs[species].shape[-1]) angles[species] = angles[species].reshape(-1,3) predictions = self.predict(elfs[species], False) for i, (value, a) in enumerate(zip(predictions, angles[species])): predictions[i] = elf.geom.rotate_vector(np.array([value]), a, False) return predictions.reshape(n_samples,-1,3)
[docs]def load_force_model(net_dir, species): """ Load force MLCF from net_dir for a given element Parameters ---------- net_dir: str path to directory containing MLCF species: str specifies which chemical element to load model for """ model = Force_Network(species, None, None, mask = [True]) model.load_all(net_dir) return model
[docs]def build_force_mlcf(feature_src, target_src, traj_src, species, mask = [], filters = [], automask_std = 0, autofilt_percent = 0, test_size = 0.2, random_state = 42): ''' Return a trainable force MLCF (neural network) Parameters ---------- feature_src: list list of paths to the hdf5 containing the features target_src: list list of paths to the csv files containing the target forces entries in target_scr and feature_src correspond to each other traj_src: list list of paths to the .traj/.xyz files (needed to determine species of each atom) species: string containing the species that model should be fitted for mask: list containing booleans; can be used to select which features to use. default: use all features filters: list containing list of booleans; can be used to exclude datapoints in sets (e.g. outliers) automask_std: float if mask not set exclude all features whose stdev across dataset is smaller than this value autofilt_percent: float exclude this percentile of extreme datapoints from set (only if filters not set) test_size: float relative size of hold_out (test) set random_state: int state used to perform shuffle before spliting dataset Returns ------- Force_Network ''' species = species.lower() if not len(species) == 1: raise Exception('Please specify only one species.') all_targets = [] all_features = [] if len(filters) != len(feature_src): filters = [0]*len(feature_src) basis = {} for fsrc, tsrc, trsrc, filter in zip(feature_src, target_src, traj_src, filters): # elfs = np.array(elf.utils.hdf5_to_elfs(fsrc, # grouped = True, values_only = True)[species]) # angles = np.array(elf.utils.hdf5_to_elfs(fsrc, # grouped = True, angles_only = True)[species]) elfs, angles = elf.utils.hdf5_to_elfs_fast(fsrc, species) elfs = elfs[species] angles = angles[species] with h5py.File(fsrc) as file: this_basis = json.loads(file.attrs['basis']) # Filter for species species_basis = {} for entry in this_basis: if entry[-1] == species or\ entry == 'alignment': species_basis[entry] = this_basis[entry] if len(basis) > 0 and species_basis != basis: raise Exception('Basis used across datasets not consistent') else: basis = species_basis angles = angles.reshape(-1,3) elfs = elfs.reshape(-1,elfs.shape[-1]) targets = np.genfromtxt(tsrc, delimiter = ',') if not trsrc.split('.')[-1] in ['xyz', 'traj']: raise Exception('Invalid file format for trajectory file stored at {}'.format(trsrc)) traj = read(trsrc, ':') all_symbols = np.array([t.get_chemical_symbols() for t in traj]).flatten() targets = targets[all_symbols == species.upper()] print(elfs.shape) for idx, (t, ang) in enumerate(zip(targets, angles)): targets[idx] = elf.geom.rotate_vector(np.array([t]),ang.tolist(), inverse=True) if not len(elfs) == len(targets): raise Exception('Sample sizes inconsistent.') if not isinstance(filter, list) and not isinstance(filter, np.ndarray): percentile_cutoff = autofilt_percent selection = [] for t in targets.T: lim1 = np.percentile(t, percentile_cutoff*100) lim2 = np.percentile(t, (1 - percentile_cutoff)*100) min_lim, max_lim = min(lim1,lim2), max(lim1,lim2) selection.append((t > min_lim) & (t < max_lim)) filter = [s1 & s2 & s3 for s1,s2,s3 in zip(*selection)] if len(mask) != elfs.shape[-1]: feat = np.array(elfs) mask = (np.std(feat.reshape(-1,feat.shape[-1]), axis = 0) > automask_std) all_features.append(elfs[:,mask][filter]) all_targets.append(targets[filter]) feat = np.concatenate(all_features) targets = np.concatenate(all_targets) scaler = StandardScaler() scaler.fit(feat) feat = scaler.transform(feat) X_train, X_test, y_train, y_test = train_test_split(feat, targets, shuffle =True, random_state = random_state, test_size = test_size) X_train, X_valid, y_train, y_valid = train_test_split(X_train, y_train, shuffle =True, random_state = random_state, test_size = 0.2) datasets = { 'X_train': X_train, 'X_test': X_test, 'X_valid': X_valid, 'y_train': y_train, 'y_test': y_test, 'y_valid': y_valid } return Force_Network(species, scaler, basis, datasets, mask)