Advanced Calibration
The built-in Calibration class covers the most common scenario:
one or more output variables, each compared to observed data with a standard indicator (NSE, KGE, RMSE, etc.),
optimised with GA, DE, or NSGA2.
For cases that go beyond this — custom scoring functions, grouped Pareto objectives, integration with optimisers other than pymoo, or any other non-standard setup — pySWATPlus exposes the building blocks you need to compose your own calibration workflow directly.
Building blocks
| Task | Tool |
|---|---|
| Run a SWAT+ simulation with parameter changes | TxtinoutReader.run_swat |
| Read a SWAT+ output file | DataManager.simulated_timeseries_df |
| Compute performance indicators | PerformanceMetrics.compute_from_abbr |
| Parallel execution | concurrent.futures.ProcessPoolExecutor |
| Optimisation | pymoo, scipy.optimize, Optuna, … |
Use the built-in Calibration class first
Before following this guide, check whether the built-in Calibration class covers your needs:
| Scenario | Use |
|---|---|
| One output variable compared to observed data | Calibration |
| Multiple output variables, one per file | Calibration (multi-objective) |
| Multiple channels or locations from the same file | This guide |
| Custom scoring or aggregation across locations | This guide |
| Optimiser other than GA, DE, or NSGA2 | This guide |
Example: multi-channel calibration with pymoo
This example calibrates two parameters against observed daily flow at two channels simultaneously.
Mean NSE and mean KGE (each averaged across both channels) are used as separate Pareto objectives —
a case that the built-in Calibration class cannot express directly.
Note
Steps 2 – 5 all belong in one Python file. They are shown separately only to explain each part.
Step 1 — Configure the model
Set the simulation period, output print options, and any fixed parameters before running calibration. See Configuration Settings for a detailed walkthrough.
Step 2 — Module-level setup
All imports, paths, and data are defined at the top of the file so they are available to every function and class defined below.
import json
import logging
import pathlib
import shutil
import concurrent.futures
import numpy
import pandas
import pymoo.algorithms.moo.nsga2
import pymoo.core.problem
import pymoo.optimize
from pySWATPlus import TxtinoutReader, DataManager, PerformanceMetrics
logging.basicConfig(level=logging.INFO)
# --- Paths ---
TXTINOUT_DIR = pathlib.Path(r'C:\Users\Username\custom_folder')
SIM_DIR = pathlib.Path(r'C:\Users\Username\calibration_dir')
# --- Channels to calibrate against ---
CHANNEL_IDS = [561, 562]
# --- Parameter search space ---
PARAM_CONFIGS = [
{'name': 'esco', 'change_type': 'absval', 'lower_bound': 0, 'upper_bound': 1},
{'name': 'cn2', 'change_type': 'pctchg', 'lower_bound': 25, 'upper_bound': 75},
]
# --- Observed discharge (one CSV per channel, must contain 'date' and 'discharge' columns) ---
obs = {
561: pandas.read_csv(r'C:\Users\Username\obs_561.csv', parse_dates=['date']),
562: pandas.read_csv(r'C:\Users\Username\obs_562.csv', parse_dates=['date']),
}
for df in obs.values():
df['date'] = df['date'].dt.date
df.rename(columns={'discharge': 'obs'}, inplace=True)
Step 3 — Define the simulation function
_run_sim must be a top-level function (not defined inside a class or another function).
ProcessPoolExecutor sends it to worker processes by name, and lambdas or nested functions
cannot be transferred that way — they will raise a PicklingError.
def _run_sim(idx, param_values):
'''Run one SWAT+ simulation and return its channel outputs.'''
parameters = [
{'name': cfg['name'], 'change_type': cfg['change_type'], 'value': float(val)}
for cfg, val in zip(PARAM_CONFIGS, param_values)
]
# Each simulation needs its own empty subdirectory
sim_path = SIM_DIR / f'sim_{idx}'
sim_path.mkdir()
try:
reader = TxtinoutReader(tio_dir=TXTINOUT_DIR)
run_path = reader.run_swat(sim_dir=sim_path, parameters=parameters)
outputs = {}
for ch_id in CHANNEL_IDS:
df = DataManager().simulated_timeseries_df(
sim_file=run_path / 'channel_sd_day.txt',
has_units=True,
apply_filter={'gis_id': [ch_id]},
usecols=['flo_out']
)
# Rename 'flo_out' → 'sim' so PerformanceMetrics can find the simulated column
outputs[ch_id] = df.rename(columns={'flo_out': 'sim'})
finally:
shutil.rmtree(sim_path, ignore_errors=True)
return outputs
Note
If a previous run was interrupted and left stale sim_<n> directories behind,
remove them manually before restarting — run_swat requires each simulation
directory to be empty.
Step 4 — Define the pymoo Problem
class MultiChannelCalibration(pymoo.core.problem.Problem):
def __init__(self):
xl = numpy.array([cfg['lower_bound'] for cfg in PARAM_CONFIGS])
xu = numpy.array([cfg['upper_bound'] for cfg in PARAM_CONFIGS])
super().__init__(n_var=len(PARAM_CONFIGS), n_obj=2, xl=xl, xu=xu)
def _evaluate(self, X, out, *args, **kwargs):
# Run all simulations in parallel — results come back in the same order as X
with concurrent.futures.ProcessPoolExecutor() as executor:
all_outputs = list(executor.map(_run_sim, range(len(X)), X))
# Score each individual
pm = PerformanceMetrics()
F = []
for i in range(len(X)):
nse_vals = []
kge_vals = []
for ch_id in CHANNEL_IDS:
merged = all_outputs[i][ch_id].merge(
obs[ch_id][['date', 'obs']], on='date', how='inner'
)
nse_vals.append(pm.compute_from_abbr(merged, 'sim', 'obs', 'NSE'))
kge_vals.append(pm.compute_from_abbr(merged, 'sim', 'obs', 'KGE'))
# pymoo minimises — negate NSE and KGE so higher is better
F.append([-numpy.mean(nse_vals), -numpy.mean(kge_vals)])
out["F"] = numpy.array(F)
Step 5 — Run and save
if __name__ == '__main__':
result = pymoo.optimize.minimize(
problem=MultiChannelCalibration(),
algorithm=pymoo.algorithms.moo.nsga2.NSGA2(pop_size=16),
termination=('n_gen', 20),
save_history=True,
verbose=True
)
# Pareto-optimal solutions — un-negate to recover natural direction
print('Parameters:', result.X)
print('NSE / KGE:', -result.F)
# Save results to disk
PARAM_NAMES = [cfg['name'] for cfg in PARAM_CONFIGS]
OBJ_NAMES = ['NSE', 'KGE']
with open('optimization_result.json', 'w') as f:
json.dump({
'param_names': PARAM_NAMES,
'obj_names': OBJ_NAMES,
'variables': result.X.tolist(),
'objectives': (-result.F).tolist()
}, f, indent=4)
with open('optimization_history.json', 'w') as f:
json.dump({
'param_names': PARAM_NAMES,
'obj_names': OBJ_NAMES,
'generations': {
i: {
'pop': gen.pop.get('X').tolist(),
'obj': (-gen.pop.get('F')).tolist()
}
for i, gen in enumerate(result.history, start=1)
}
}, f, indent=4)
optimization_result.json will look like this — self-contained, no need to remember the parameter order:
{
"param_names": ["esco", "cn2"],
"obj_names": ["NSE", "KGE"],
"variables": [[0.45, 18.2], [0.61, 12.7]],
"objectives": [[0.83, 0.71], [0.79, 0.88]]
}
Each row in variables and objectives is one Pareto-optimal solution.
Tips
Choosing pop_size
Set pop_size to a multiple of your CPU count so each generation runs in a single
parallel batch. With 16 CPUs and pop_size=16, one batch per generation; with
pop_size=20, two batches are needed (16 + 4).
Using other optimisers
_run_sim is independent of pymoo. The same function works inside any optimisation
loop — scipy, Optuna, a manual random search, or anything else. Wrap calls to
_run_sim in a ProcessPoolExecutor whenever you need to evaluate a batch of
parameter sets in parallel.
Tracking progress
Set verbose=True in pymoo.optimize.minimize for generation-level logging.
For per-simulation progress, pySWATPlus emits messages via Python's standard
logging module:
import logging
logging.basicConfig(level=logging.INFO)
Troubleshooting parallel errors
If ProcessPoolExecutor raises an unclear error, close the terminal and restart it.
Lingering background processes or locked files from a previous run can interfere
with new parallel workloads.