import sys
import os
from ase.calculators.siesta.siesta import SiestaTrunk462 as Siesta
from mlc_func.timer import Timer
from .feature_io import FeatureGetter, DescriptorGetter
import keras
try:
from ase.calculators.siesta.parameters import Species
except ImportError:
from ase.calculators.siesta.parameters import Specie as Species #fix
try:
from mbpol_calculator import MbpolCalculator
MBPOL_AVAIL = True
except ImportError:
MBPOL_AVAIL = False
from ase.calculators.siesta.parameters import PAOBasisBlock
from ase.units import Ry
from ase.io import Trajectory
from ase import Atoms
import numpy as np
import pickle
import ipyparallel as ipp
import mlc_func.elf as elf
import json
import subprocess
from .read_input import read_input
from mlc_func.ml import load_force_model
from .mixer import Mixer
from .siesta_basis import basis_sets
[docs]def log_all(energy_siesta = None, energy_corrected = None,
forces_siesta=None, forces_corrected=None, features=None):
""" Used to log energies, forces and features during MD simultation
"""
if energy_siesta == None: #Initialize
with open('energies.dat', 'w') as file:
file.write('Siesta \t Corrected \n')
with open('forces_siesta.dat', 'w') as file:
pass
with open('forces_corrected.dat', 'w') as file:
pass
else:
with open('energies.dat', 'a') as file:
file.write('{:.4f}\t{:.4f}\n'.format(energy_siesta,
energy_corrected))
with open('forces_siesta.dat', 'a') as file:
np.savetxt(file, forces_siesta, fmt = '%.4f')
with open('forces_corrected.dat', 'a') as file:
np.savetxt(file, forces_corrected, fmt = '%.4f')
for key in features:
with open('features_{}.dat'.format(key), 'a') as file:
np.savetxt(file, features[key], fmt = '%.4f')
[docs]class Siesta_Calculator(Siesta):
""" Provides default options for the Siesta calculator, such as pre-defined
custom basis sets and functionals.
Parameters
----
basis, str
{'dz_custom','dz','qz_custom','sz','uf','szp'}, basis set to be used
xc: str
{'PBE','BH','PW92','revPBE', ...}, exchange-correlation functional
"""
def __init__(self, basis = 'qz', xc='BH'):
label = 'H2O' #TODO: for now, change later
if xc =='REVPBE': xc = 'revPBE'
fdf_arguments = {'DM.MixingWeight': 0.3,
'DM.NumberPulay': 3,
'ElectronicTemperature': 5e-3,
'WriteMullikenPop': 0,
'MaxSCFIterations': 40}
if basis == 'uf':
super().__init__(label=label,
xc='PBE',
mesh_cutoff=100 * Ry,
energy_shift=0.02 * Ry,
basis_set = 'SZ')
dmtol = 5e-4
elif not 'custom' in basis.lower():
super().__init__(label=label,
xc=xc,
mesh_cutoff=200 * Ry,
energy_shift=0.02 * Ry,
basis_set = basis.upper())
dmtol = 5e-4
else:
species_o = Species(symbol= 'O',
basis_set = PAOBasisBlock(basis_sets['o_basis_{}'.format(basis)]))
species_h = Species(symbol= 'H',
basis_set = PAOBasisBlock(basis_sets['h_basis_{}'.format(basis)]))
super().__init__(label='H2O',
xc=xc,
mesh_cutoff=200 * Ry,
basis_set = 'DZP',
species=[species_o, species_h],
energy_shift=0.02 * Ry)
dmtol = 5e-4
fdf_arguments['DM.UseSaveDM'] = 'True'
fdf_arguments['SCF.MustConverge'] = 'False'
fdf_arguments['DM.Tolerance'] = dmtol
fdf_arguments['MLCF.Use'] = False
fdf_arguments['SaveDeltaRho'] = True
allowed_keys = self.allowed_fdf_keywords
allowed_keys['SaveDeltaRho'] = False
allowed_keys['SaveRhoXC'] = False
allowed_keys['MLCF.Use'] = False
allowed_keys['SCF.MustConverge'] = False
self.allowed_keywords = allowed_keys
self.set_fdf_arguments(fdf_arguments)
[docs] def set_solution_method(self, method):
""" Whether diagonalization or orbital minimization should be used.
Parameters
---
method: str,
{'diagon','OMM'}
Returns
----
None
"""
if not method.lower() == 'diagon' and not method.upper() == 'OMM':
raise Exception('Invalid solution method: choose "diagon" or "OMM"')
else:
fdf_arguments = self.parameters['fdf_arguments']
fdf_arguments['SolutionMethod'] = method
self.set_fdf_arguments(fdf_arguments)
[docs] def read_eigenvalues(self):
pass
[docs] def read_results(self):
""" Overrides read_results in base class to skip reading
the charge density etc. for speed-up"""
self.read_energy()
self.read_forces_stress()
[docs]class MLCF_Calculator:
"""MLCF_Calculator that consists of a baseline calculator (base_calculator), e.g. Siesta,
and a Machine learned correcting functional (MLCF).
Parameters
---
base_calculator: ase.Calculator
baseline method to get first approximation to forces
feature_getter: FeatureGetter
read the electron density and transforms it into features (electronic fingerprints)
log_accuracy: bool
whether to log energies, forces and features during MD simulation, default: True
"""
def __init__(self, base_calculator = None, feature_getter = None,
log_accuracy = True):
self.force_models = {}
self.last_positions = None
self.Epot = 0
self.forces = 0
self.feature_getter = feature_getter
self.log_accuracy = log_accuracy
self.base_calculator = base_calculator
self.cm_corrected = False
if self.log_accuracy:
log_all()
[docs] def set_force_model(self, models):
""" Sets the force MLCF
Parameters
---
models: dict
dictionary containing the force models, dict keys should correspond to
element symbol
"""
self.force_models = models
[docs] def set_feature_getter(self,feature_getter):
""" Sets the feature_getter
Parameters
---
feature_getter: FeatureGetter
"""
self.feature_getter = feature_getter
def calculation_required(self, atoms, quantities = None):
if isinstance(self.last_positions,np.ndarray):
return not np.allclose(atoms.get_positions(), self.last_positions)
else:
return True
[docs] def set_base_calculator(self, base_calculator):
""" Sets the baseline calculator
Parameters
---
base_calculator: ase.Calculator
any ASE calculator can be used (only tested for SiestaCalculator)
"""
self.base_calculator = base_calculator
def get_potential_energy(self, atoms, force_consistent = False):
if self.calculation_required(atoms):
time_step = Timer("TIME_FULL_STEP")
time_siesta = Timer("TIME_SIESTA_BARE")
# Use the base calculator for a first approximation to energy
# and forces
pot_energy = self.base_calculator.get_potential_energy(atoms)
forces = self.base_calculator.get_forces(atoms)
correction_force = np.zeros_like(forces)
time_siesta.stop()
self.last_positions = atoms.get_positions()
if len(self.force_models) > 0:
print('Using force correction')
time_ML = Timer("TIME_ML")
if self.feature_getter == None:
raise Exception("Feature getter not defined, cannot proceed...")
time_feat = Timer('TIME_FEAT')
elfs_dict, angles_dict =\
self.feature_getter.get_features(atoms)
time_feat.stop()
prediction = {}
time_predict = Timer("TIME_PREDICT")
for species in elfs_dict:
prediction[species] = self.force_models[species.lower()].predict(elfs_dict[species],
processed = True)
for i, (pred, e, a) in enumerate(zip(prediction[species],
elfs_dict[species], angles_dict[species])):
prediction[species][i] = elf.geom.rotate_vector(np.array([pred]),
a, False)
time_predict.stop()
time_ML.stop()
for key in prediction:
prediction[key] = prediction[key].tolist()
masses = []
species_counter = {}
for i, chem_sym in enumerate(atoms.get_chemical_symbols()):
if chem_sym in prediction:
correction_force[i] = np.array(prediction[chem_sym].pop(0))
masses.append(atoms.get_masses()[i])
else:
masses.append(0)
masses = np.array(masses).reshape(-1,1)
for key in prediction:
assert len(prediction[key]) == 0
if self.cm_corrected:
# Subtract mean force correction to fix the center of mass
mean_correction = np.mean(correction_force, axis = 0)*len(correction_force)/np.sum(masses)
correction_force -= mean_correction * masses
features = {}
for key in elfs_dict:
features[key] = np.concatenate([np.array(elfs_dict[key]),
np.array(angles_dict[key])],
axis = -1)
else:
features = {}
forces = forces + correction_force.reshape(-1,3)
if self.log_accuracy:
forces_uncorrected = np.array(forces)
forces_uncorrected -= correction_force.reshape(-1,3)
log_all(pot_energy, pot_energy,
forces_uncorrected, forces,
features)
self.Epot = pot_energy
self.forces = forces
time_step.stop()
subprocess.call('rm fdf*', shell = True)
subprocess.call('rm INPUT*', shell = True)
return self.Epot
def get_forces(self, atoms):
self.get_potential_energy(atoms)
return self.forces
[docs]def load_from_file(input_file):
""" Read input_file that defines baseline calculator and MLCF model
and return MLCF_Calculator
"""
settings, mixing_settings = read_input(input_file)
if settings['mixing']:
iterate_over = ['1','2']
settings_choice = mixing_settings
else:
iterate_over = ['']
settings_choice = settings
calculators = []
for i in iterate_over:
model_path = settings_choice['model' + i]
if not model_path == None and 'mbpol' in model_path.lower():
if MBPOL_AVAIL:
dummy_atoms = Atoms('OHH',positions = [[0,0,0],[-0.76,0.59,0],[0.76,0.59,0]])
base_calculator = MbpolCalculator(dummy_atoms)
calculators.append(base_calculator)
continue
else:
raise Exception('module for MbpolCalculator not available')
base_calculator = Siesta_Calculator(basis= settings_choice['basis' + i],
xc = settings_choice['xcfunctional' + i].upper())
base_calculator.set_solution_method(settings_choice['solutionmethod' + i])
if model_path != None:
if settings['ipp_client'] != None:
client = ipp.Client(profile=settings['ipp_client'])
else:
client = None
mlcf_calc = load_mlcf(model_path, client)
mlcf_calc.cm_corrected = settings_choice['cmcorrection' + i]
mlcf_calc.set_base_calculator(base_calculator)
calculators.append(mlcf_calc)
else:
calculators.append(base_calculator)
if settings['mixing']:
calc = Mixer(calculators[0], calculators[1], mixing_settings['n'],
mixing_settings['which'])
else:
calc = calculators[0]
return calc
[docs]def load_mlcf(model_path, client = None):
""" Given a directory model_path, load and return a calculator that uses
the MLCF contained in that directory. For parallel computing an ipyparallel
client can be provided. The baseline calculator of the returned instance
still has to be set before usage.
"""
if not model_path[-1] == '/': model_path += '/'
all_files = os.listdir(model_path)
force_models = [f for f in all_files if 'force_' in f]
species = [f[-1] for f in force_models]
force_models = {s.lower(): load_force_model(model_path, s.lower())\
for s in species}
# Out of the element specific basis sets construct the full basis
full_basis = {}
for species in force_models:
for entry in force_models[species].basis:
if entry in full_basis and\
full_basis[entry] != force_models[species].basis[entry]:
raise Exception('Conflicting basis sets across force models')
else:
full_basis[entry] = force_models[species].basis[entry]
descr_getter = DescriptorGetter(full_basis, client)
masks = {}
for s in force_models:
masks[s.lower()] = force_models[s.lower()].mask
scalers = {}
for s in force_models:
scalers[s.lower()] = force_models[s.lower()].scaler
descr_getter.set_masks(masks)
descr_getter.set_scalers(scalers)
calc = MLCF_Calculator(feature_getter = descr_getter)
calc.set_force_model(force_models)
return calc