Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,5 @@ build/*

# AI Agent files
AGENTS.md

spec.md
8 changes: 7 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ DEVTOOLS_DIR := devtools

.PHONY: all help clean test test-unittests test-functional test-all \
install-all install-ci install-pyrdl install-rmg install-rmgdb install-autotst install-gcn \
install-gcn-cpu install-kinbot install-sella install-xtb install-torchani install-ob \
install-gcn-cpu install-kinbot install-sella install-xtb install-torchani install-uma install-ob \
lite check-env compile


Expand Down Expand Up @@ -37,6 +37,7 @@ help:
@echo " install-sella Install Sella"
@echo " install-xtb Install xTB"
@echo " install-torchani Install TorchANI"
@echo " install-uma Install UMA (fairchem MLIP, gated model; users only, not CI)"
@echo " install-ob Install OpenBabel"
@echo ""
@echo "Maintenance:"
Expand Down Expand Up @@ -103,6 +104,11 @@ install-xtb:
install-torchani:
bash $(DEVTOOLS_DIR)/install_torchani.sh

# UMA (fairchem MLIP). Not part of install-ci: the model is gated (Meta license + HuggingFace
# token) and heavy, so this is a manual, user-driven setup. See devtools/install_uma.sh.
install-uma:
bash $(DEVTOOLS_DIR)/install_uma.sh

install-ob:
bash $(DEVTOOLS_DIR)/install_ob.sh

Expand Down
1 change: 1 addition & 0 deletions arc/job/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class JobEnum(str, Enum):
- gan # Generative adversarial networks, https://doi.org/10.1063/5.0055094
"""
# ESS
ase = 'ase'
cfour = 'cfour'
gaussian = 'gaussian'
mockter = 'mockter'
Expand Down
1 change: 1 addition & 0 deletions arc/job/adapters/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import arc.job.adapters.common
import arc.job.adapters.ase
import arc.job.adapters.cfour
import arc.job.adapters.gaussian
import arc.job.adapters.mockter
Expand Down
301 changes: 301 additions & 0 deletions arc/job/adapters/ase.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,301 @@
"""
An adapter for executing ASE (Atomic Simulation Environment) jobs
"""

import datetime
import os
import subprocess
from typing import TYPE_CHECKING, List, Optional, Tuple, Union

from arc.common import get_logger, read_yaml_file, save_yaml_file
from arc.job.adapter import JobAdapter
from arc.job.adapters.common import _initialize_adapter
from arc.job.factory import register_job_adapter
from arc.imports import settings
from arc.settings.settings import ARC_PYTHON, UMA_LATEST_MODEL, find_executable

if TYPE_CHECKING:
from arc.level import Level
from arc.species.species import ARCSpecies
from arc.reaction import ARCReaction

logger = get_logger()

# Default mapping if not yet fully defined in settings.py
DEFAULT_ASE_ENV = {
'torchani': 'TANI_PYTHON',
'xtb': 'XTB_PYTHON',
'uma': 'UMA_PYTHON',
}

# Level methods that select the UMA calculator. 'uma' resolves to the latest model.
UMA_METHODS = ('uma', 'uma-s-1', 'uma-s-1p1')

class ASEAdapter(JobAdapter):
"""
A generic adapter for ASE (Atomic Simulation Environment) jobs.
Supports multiple calculators and environments.
"""
def __init__(self,
project: str,
project_directory: str,
job_type: Union[List[str], str],
args: Optional[dict] = None,
bath_gas: Optional[str] = None,
checkfile: Optional[str] = None,
conformer: Optional[int] = None,
constraints: Optional[List[Tuple[List[int], float]]] = None,
cpu_cores: Optional[str] = None,
dihedral_increment: Optional[float] = None,
dihedrals: Optional[List[float]] = None,
directed_scan_type: Optional[str] = None,
ess_settings: Optional[dict] = None,
ess_trsh_methods: Optional[List[str]] = None,
execution_type: Optional[str] = None,
fine: bool = False,
initial_time: Optional[Union[datetime.datetime, str]] = None,
irc_direction: Optional[str] = None,
job_id: Optional[int] = None,
job_memory_gb: float = 14.0,
job_name: Optional[str] = None,
job_num: Optional[int] = None,
job_server_name: Optional[str] = None,
job_status: Optional[List[Union[dict, str]]] = None,
level: Optional['Level'] = None,
max_job_time: Optional[float] = None,
run_multi_species: bool = False,
reactions: Optional[List['ARCReaction']] = None,
rotor_index: Optional[int] = None,
server: Optional[str] = None,
server_nodes: Optional[list] = None,
queue: Optional[str] = None,
attempted_queues: Optional[List[str]] = None,
species: Optional[List['ARCSpecies']] = None,
testing: bool = False,
times_rerun: int = 0,
torsions: Optional[List[List[int]]] = None,
tsg: Optional[int] = None,
xyz: Optional[dict] = None,
):

self.job_adapter = 'ase'
self.execution_type = execution_type or 'incore'
self.incore_capacity = 100

self.sp = None
self.opt_xyz = None
self.freqs = None

self.args = args or dict()
self.level = level # also set by _initialize_adapter; needed early by get_python_executable
self.python_executable = self.get_python_executable()
self.script_path = os.path.join(os.path.dirname(__file__), 'scripts', 'ase_script.py')

_initialize_adapter(obj=self,
is_ts=False,
project=project,
project_directory=project_directory,
job_type=job_type,
args=args,
bath_gas=bath_gas,
checkfile=checkfile,
conformer=conformer,
constraints=constraints,
cpu_cores=cpu_cores,
dihedral_increment=dihedral_increment,
dihedrals=dihedrals,
directed_scan_type=directed_scan_type,
ess_settings=ess_settings,
ess_trsh_methods=ess_trsh_methods,
fine=fine,
initial_time=initial_time,
irc_direction=irc_direction,
job_id=job_id,
job_memory_gb=job_memory_gb,
job_name=job_name,
job_num=job_num,
job_server_name=job_server_name,
job_status=job_status,
level=level,
max_job_time=max_job_time,
run_multi_species=run_multi_species,
reactions=reactions,
rotor_index=rotor_index,
server=server,
server_nodes=server_nodes,
queue=queue,
attempted_queues=attempted_queues,
species=species,
testing=testing,
times_rerun=times_rerun,
torsions=torsions,
tsg=tsg,
xyz=xyz,
)

def determine_calculator_name(self) -> str:
"""
Determine the ASE calculator name, from ``args['keyword']['calculator']`` if given,
otherwise inferred from the level method (e.g., a 'uma' method selects the UMA calculator).

Returns:
str: The lowercased calculator name (empty string if undetermined).
"""
calc = (self.args or dict()).get('keyword', dict()).get('calculator', '')
if not calc and self.level is not None and getattr(self.level, 'method', None) \
and self.level.method.lower() in UMA_METHODS:
calc = 'uma'
return calc.lower()

def determine_settings(self) -> dict:
"""
Build the ``settings`` block passed to ase_script.py: the user's ``args['keyword']`` plus
a resolved ``calculator`` and, for UMA, default ``model`` (the level method, with 'uma'
resolving to the latest model), ``task``, and ``device``.

Returns:
dict: The resolved ASE run settings.
"""
settings_dict = dict((self.args or dict()).get('keyword', dict()))
calc = self.determine_calculator_name()
if calc:
settings_dict.setdefault('calculator', calc)
if calc == 'uma':
if 'model' not in settings_dict:
method = self.level.method.lower() if self.level is not None and self.level.method else 'uma'
settings_dict['model'] = UMA_LATEST_MODEL if method == 'uma' else method
settings_dict.setdefault('task', 'omol')
settings_dict.setdefault('device', 'cpu')
return settings_dict

def get_python_executable(self) -> str:
"""
Identify the correct Python executable based on the calculator.
"""
calc = self.determine_calculator_name()
env_mapping = settings.get('ASE_CALCULATORS_ENV', DEFAULT_ASE_ENV)
env_var_name = env_mapping.get(calc)

if env_var_name and env_var_name in settings:
exe = settings[env_var_name]
if exe:
return exe

# Fallback to calculator-specific env if it exists
found_exe = find_executable(f'{calc}_env')
if found_exe:
return found_exe

return ARC_PYTHON or 'python'

def write_input_file(self) -> None:
"""
Write the input file for ase_script.py.
"""
input_dict = {
'job_type': self.job_type,
'xyz': self.xyz,
'charge': self.charge,
'multiplicity': self.multiplicity,
'is_ts': self.species[0].is_ts if self.species else False,
'constraints': self.constraints,
'irc_direction': self.irc_direction,
'settings': self.determine_settings(),
}
save_yaml_file(os.path.join(self.local_path, 'input.yml'), input_dict)

def warn_if_unreliable_uma_sp(self) -> None:
"""
Warn if this is a UMA single point on a species whose absolute UMA energy is unreliable
(an isolated atom or triplet O2). UMA's geometries/frequencies are fine; only the absolute
energy of these under-represented species is off, so a DFT single point is preferable.
"""
if self.job_type not in ['sp', 'conf_sp'] or self.determine_calculator_name() != 'uma':
return
symbols = self.xyz['symbols'] if self.xyz is not None else tuple()
is_atom = len(symbols) == 1
is_triplet_o2 = len(symbols) == 2 and all(s == 'O' for s in symbols) and self.multiplicity == 3
if is_atom or is_triplet_o2:
label = self.species[0].label if self.species else 'species'
logger.warning(f'Computing a UMA single point for {label} (an isolated atom or triplet O2). '
f'UMA absolute energies are unreliable for these under-represented species; '
f'consider using a DFT single point instead.')

def execute_incore(self) -> None:
"""
Execute the job incore.
"""
self.warn_if_unreliable_uma_sp()
self.write_input_file()
cmd = [self.python_executable, self.script_path, '--yml_path', self.local_path]
process = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if process.returncode != 0:
logger.error(f"ASE job failed incore:\n{process.stderr}")
self.parse_results()

def execute_queue(self) -> None:
"""
Execute a job to the server's queue.
"""
self.write_input_file()
self.write_submit_script()
self.set_files()
if self.server_adapter is not None:
for file_dict in self.files_to_upload:
self.server_adapter.upload_file(remote_path=file_dict['remote'],
local_path=file_dict['local'])
self.server_adapter.submit_job(self.remote_path)

def set_files(self) -> None:
"""
Set files to be uploaded and downloaded.
"""
# 1. Upload
if self.execution_type != 'incore':
self.files_to_upload.append(self.get_file_property_dictionary(file_name='submit.sh'))
self.files_to_upload.append(self.get_file_property_dictionary(file_name='input.yml'))
self.files_to_upload.append(self.get_file_property_dictionary(file_name='ase_script.py',
local=self.script_path))
# 2. Download
self.files_to_download.append(self.get_file_property_dictionary(file_name='output.yml'))

def set_additional_file_paths(self) -> None:
"""
Set additional file paths specific for the adapter.
"""
pass

def set_input_file_memory(self) -> None:
"""
Set the input_file_memory attribute.
"""
pass

def write_submit_script(self) -> None:
"""
Write the submission script.
"""
remote_script_path = os.path.join(self.remote_path, 'ase_script.py')
command = f"{self.python_executable} {remote_script_path} --yml_path {self.remote_path}"
content = f"#!/bin/bash\n\n{command}\n"
with open(os.path.join(self.local_path, 'submit.sh'), 'w') as f:
f.write(content)

def parse_results(self) -> None:
"""
Parse the output.yml generated by ase_script.py.
"""
out_path = os.path.join(self.local_path, 'output.yml')
if os.path.isfile(out_path):
results = read_yaml_file(out_path)
self.electronic_energy = results.get('sp')
self.xyz_out = results.get('opt_xyz') or results.get('xyz')
self.frequencies = results.get('freqs')
self.hessian = results.get('hessian')
self.normal_modes = results.get('modes')
self.reduced_masses = results.get('reduced_masses')
self.force_constants = results.get('force_constants')
if 'error' in results:
logger.error(f"ASE job error: {results['error']}")

register_job_adapter('ase', ASEAdapter)
Loading