Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
261 changes: 261 additions & 0 deletions tests/test_trajectory.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import sys
import tempfile
from collections.abc import Callable, Generator
from pathlib import Path

Expand Down Expand Up @@ -834,3 +835,263 @@ def test_write_ase_trajectory_importerror(
with pytest.raises(ImportError, match="ASE is required to convert to ASE trajectory"):
traj.write_ase_trajectory(tmp_path / "dummy.traj")
traj.close()


def test_optimize_append_to_trajectory(
si_double_sim_state: SimState, lj_model: LennardJonesModel
) -> None:
"""Test appending to an existing trajectory when running ts.optimize."""

# Create a temporary trajectory file
with tempfile.TemporaryDirectory() as temp_dir:
traj_files = [f"{temp_dir}/optimize_trajectory_{idx}.h5" for idx in range(2)]

# Initialize model and state
trajectory_reporter = ts.TrajectoryReporter(
traj_files,
state_frequency=1,
)

# First optimization run
opt_state = ts.optimize(
system=si_double_sim_state,
model=lj_model,
max_steps=5,
optimizer=ts.Optimizer.fire,
trajectory_reporter=trajectory_reporter,
steps_between_swaps=100,
)

for traj in trajectory_reporter.trajectories:
with TorchSimTrajectory(traj.filename, mode="r") as traj:
# Check that the trajectory file has 5 frames
np.testing.assert_allclose(traj.get_steps("positions"), range(1, 6))

trajectory_reporter_2 = ts.TrajectoryReporter(
traj_files, state_frequency=1, trajectory_kwargs=dict(mode="a")
)
_ = ts.optimize(
system=opt_state,
model=lj_model,
max_steps=7,
optimizer=ts.Optimizer.fire,
trajectory_reporter=trajectory_reporter_2,
steps_between_swaps=100,
)
for traj in trajectory_reporter_2.trajectories:
with TorchSimTrajectory(traj.filename, mode="r") as traj:
# Check that the trajectory file now has 7 frames
np.testing.assert_allclose(traj.get_steps("positions"), range(1, 8))


def test_integrate_append_to_trajectory(
si_double_sim_state: SimState, lj_model: LennardJonesModel
) -> None:
"""Test appending to an existing trajectory when running ts.integrate."""

# Create a temporary trajectory file
with tempfile.TemporaryDirectory() as temp_dir:
traj_files = [f"{temp_dir}/integrate_trajectory_{idx}.h5" for idx in range(2)]

# Initialize model and state
trajectory_reporter = ts.TrajectoryReporter(
traj_files,
state_frequency=1,
)

# First integration run
int_state = ts.integrate(
system=si_double_sim_state,
model=lj_model,
timestep=0.001,
n_steps=5,
temperature=300.0,
integrator=ts.Integrator.nvt_langevin,
trajectory_reporter=trajectory_reporter,
)

for traj in trajectory_reporter.trajectories:
with TorchSimTrajectory(traj.filename, mode="r") as traj:
# Check that the trajectory file has 5 frames
np.testing.assert_allclose(traj.get_steps("positions"), range(1, 6))

trajectory_reporter_2 = ts.TrajectoryReporter(
traj_files, state_frequency=1, trajectory_kwargs=dict(mode="a")
)
# run 7 more steps of integration.
_ = ts.integrate(
system=int_state,
model=lj_model,
timestep=0.001,
temperature=300.0,
n_steps=7,
integrator=ts.Integrator.nvt_langevin,
trajectory_reporter=trajectory_reporter_2,
)
for traj in trajectory_reporter_2.trajectories:
with TorchSimTrajectory(traj.filename, mode="r") as traj:
# Check that the trajectory file now has 12 (5 + 7) frames
np.testing.assert_allclose(traj.get_steps("positions"), range(1, 13))


def test_truncate_trajectory(
si_double_sim_state: SimState, lj_model: LennardJonesModel
) -> None:
"""
Test trajectory.truncate_to_step().
"""

# Create a temporary trajectory file
with tempfile.TemporaryDirectory() as temp_dir:
traj_files = [f"{temp_dir}/truncate_trajectory_{idx}.h5" for idx in range(2)]

# Initialize model and state
trajectory_reporter = ts.TrajectoryReporter(
traj_files,
state_frequency=1,
prop_calculators={1: {"velocities": lambda state: state.velocities}},
)

# First integration run for 5 steps.
_ = ts.integrate(
system=si_double_sim_state,
model=lj_model,
timestep=0.001,
n_steps=5,
temperature=300.0,
integrator=ts.Integrator.nvt_langevin,
trajectory_reporter=trajectory_reporter,
)

# Manually remove last two frames from second trajectory to create unevenness
with TorchSimTrajectory(traj_files[1], mode="a") as traj:
traj.truncate_to_step(3)
# Verify that it has 3 frames now.
for array_name in traj.array_registry:
target_length = 3
target_steps = [1, 2, 3]
# Special cases: global arrays
if array_name in ["atomic_numbers", "masses"]:
target_length = 1
target_steps = [0]
if array_name == "pbc":
target_length = 3
target_steps = [0]
assert len(traj.get_array(array_name)) == target_length
np.testing.assert_allclose(traj.get_steps(array_name), target_steps)
with pytest.raises(
ValueError,
match=(
"Cannot truncate to a step greater than the last step. "
"self.last_step=3 < step=10"
),
):
traj.truncate_to_step(10)
with pytest.raises(
ValueError, match="Step must be larger than 0. Got step=0"
):
traj.truncate_to_step(0)


def test_truncate_trajectory_reporter(
si_double_sim_state: SimState, lj_model: LennardJonesModel
) -> None:
"""
Test TrajectoryReporter.truncate_to_step().
"""

# Create a temporary trajectory file
with tempfile.TemporaryDirectory() as temp_dir:
traj_files = [
f"{temp_dir}/truncate_reporter_trajectory_{idx}.h5" for idx in range(2)
]

# Initialize model and state
trajectory_reporter = ts.TrajectoryReporter(
traj_files,
state_frequency=1,
prop_calculators={1: {"velocities": lambda state: state.velocities}},
)

# First integration run for 5 steps.
_ = ts.integrate(
system=si_double_sim_state,
model=lj_model,
timestep=0.001,
n_steps=5,
temperature=300.0,
integrator=ts.Integrator.nvt_langevin,
trajectory_reporter=trajectory_reporter,
)

trajectory_reporter.truncate_to_step(step=min(trajectory_reporter.last_steps))
assert trajectory_reporter.last_steps == [5, 5]
with pytest.raises(
ValueError,
match=(
"Step 7 is greater than the minimum last step "
r"across trajectories \(5\)\."
),
):
trajectory_reporter.truncate_to_step(7)
# try negative number
with pytest.raises(ValueError, match="Step must be greater than 0. Got step=-2"):
trajectory_reporter.truncate_to_step(-2)
# truncate to step 3
trajectory_reporter.truncate_to_step(3)
assert trajectory_reporter.last_steps == [3, 3]


def test_integrate_uneven_trajectory_append(
si_double_sim_state: SimState, lj_model: LennardJonesModel
) -> None:
"""
Test appending to an existing trajectory with uneven frames running ts.integrate.
Expected behavior: ts.integrate should first truncate all trajectories to the shortest
length, and then append new frames to all trajectories.
"""

# Create a temporary trajectory file
with tempfile.TemporaryDirectory() as temp_dir:
traj_files = [
f"{temp_dir}/uneven_integrate_trajectory_{idx}.h5" for idx in range(2)
]

# Initialize model and state
trajectory_reporter = ts.TrajectoryReporter(
traj_files,
state_frequency=1,
prop_calculators={1: {"velocities": lambda state: state.velocities}},
)

# First integration run for 5 steps.
_ = ts.integrate(
system=si_double_sim_state,
model=lj_model,
timestep=0.001,
n_steps=5,
temperature=300.0,
integrator=ts.Integrator.nvt_langevin,
trajectory_reporter=trajectory_reporter,
)

# Manually remove last two frames from second trajectory to create unevenness
with TorchSimTrajectory(traj_files[1], mode="a") as traj:
traj.truncate_to_step(3)

trajectory_reporter_2 = ts.TrajectoryReporter(
traj_files, state_frequency=1, trajectory_kwargs=dict(mode="a")
)
# Should raise a ValueError:
with pytest.raises(
ValueError, match="Cannot resume integration from inconsistent states"
):
_ = ts.integrate(
system=si_double_sim_state,
model=lj_model,
timestep=0.001,
temperature=300.0,
n_steps=4,
integrator=ts.Integrator.nvt_langevin,
trajectory_reporter=trajectory_reporter_2,
)
Loading
Loading