Skip to content

FIX: Parallel build JSON file issues #13241

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
May 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/changes/devel/13241.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improved the configuration json to handle with parallel access with file lock configuration by :newcontrib:`Bruno Aristimunha`.
1 change: 1 addition & 0 deletions doc/changes/names.inc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
.. _Austin Hurst: https://github.com/a-hurst
.. _Ben Beasley: https://github.com/musicinmybrain
.. _Britta Westner: https://britta-wstnr.github.io
.. _Bruno Aristimunha: https://bruaristimunha.github.io
.. _Bruno Nicenboim: https://bnicenboim.github.io
.. _btkcodedev: https://github.com/btkcodedev
.. _buildqa: https://github.com/buildqa
Expand Down
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dependencies:
- dipy
- edfio >=0.2.1
- eeglabio
- filelock >=3.18.0
- h5io >=0.2.4
- h5py
- imageio >=2.6.1
Expand Down
6 changes: 3 additions & 3 deletions mne/forward/_lead_dots.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from numpy.polynomial import legendre

from ..parallel import parallel_func
from ..utils import _get_extra_data_path, fill_doc, logger, verbose
from ..utils import _get_extra_data_path, _open_lock, fill_doc, logger, verbose

##############################################################################
# FAST LEGENDRE (DERIVATIVE) POLYNOMIALS USING LOOKUP TABLE
Expand Down Expand Up @@ -80,11 +80,11 @@ def _get_legen_table(
x_interp = np.linspace(-1, 1, n_interp + 1)
lut = leg_fun(x_interp, n_coeff).astype(np.float32)
if not force_calc:
with open(fname, "wb") as fid:
with _open_lock(fname, "wb") as fid:
fid.write(lut.tobytes())
else:
logger.info(f"Reading Legendre{extra_str} table...")
with open(fname, "rb", buffering=0) as fid:
with _open_lock(fname, "rb", buffering=0) as fid:
lut = np.fromfile(fid, np.float32)
lut.shape = lut_shape

Expand Down
2 changes: 2 additions & 0 deletions mne/utils/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ __all__ = [
"_julian_to_date",
"_mask_to_onsets_offsets",
"_on_missing",
"_open_lock",
"_parse_verbose",
"_path_like",
"_pl",
Expand Down Expand Up @@ -280,6 +281,7 @@ from .config import (
_get_numpy_libs,
_get_root_dir,
_get_stim_channel,
_open_lock,
get_config,
get_config_path,
get_subjects_dir,
Expand Down
80 changes: 76 additions & 4 deletions mne/utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# Copyright the MNE-Python contributors.

import atexit
import contextlib
import json
import multiprocessing
import os
Expand All @@ -23,7 +24,13 @@
from packaging.version import parse

from ._logging import logger, warn
from .check import _check_fname, _check_option, _check_qt_version, _validate_type
from .check import (
_check_fname,
_check_option,
_check_qt_version,
_soft_import,
_validate_type,
)
from .docs import fill_doc
from .misc import _pl

Expand Down Expand Up @@ -218,9 +225,53 @@ def set_memmap_min_size(memmap_min_size):
)


@contextlib.contextmanager
def _open_lock(path, *args, **kwargs):
"""
Context manager that opens a file with an optional file lock.

If the `filelock` package is available, a lock is acquired on a lock file
based on the given path (by appending '.lock').

Otherwise, a null context is used. The path is then opened in the
specified mode.

Parameters
----------
path : str
The path to the file to be opened.
*args, **kwargs : optional
Additional arguments and keyword arguments to be passed to the
`open` function.

"""
filelock = _soft_import(
"filelock", purpose="parallel config set and get", strict=False
)

lock_context = contextlib.nullcontext() # default to no lock

if filelock is not None:
lock_path = f"{path}.lock"
try:
from filelock import FileLock

lock_context = FileLock(lock_path, timeout=5)
lock_context.acquire()
except TimeoutError:
warn(
"Could not acquire lock file after 5 seconds, consider deleting it "
f"if you know the corresponding file is usable:\n{lock_path}"
)
lock_context = contextlib.nullcontext()

with lock_context, open(path, *args, **kwargs) as fid:
yield fid


def _load_config(config_path, raise_error=False):
"""Safely load a config file."""
with open(config_path) as fid:
with _open_lock(config_path, "r+") as fid:
try:
config = json.load(fid)
except ValueError:
Expand Down Expand Up @@ -398,8 +449,29 @@ def set_config(key, value, home_dir=None, set_env=True):
directory = op.dirname(config_path)
if not op.isdir(directory):
os.mkdir(directory)
with open(config_path, "w") as fid:
json.dump(config, fid, sort_keys=True, indent=0)

# Adapting the mode depend if you are create the file
# or no.
mode = "r+" if op.isfile(config_path) else "w+"

with _open_lock(config_path, mode) as fid:
try:
data = json.load(fid)
except (ValueError, json.JSONDecodeError) as exc:
logger.info(
f"Could not read the {config_path} json file during the writing."
f" Assuming it is empty. Got: {exc}"
)
data = {}

if value is None:
data.pop(key, None)
else:
data[key] = value

fid.seek(0)
fid.truncate()
json.dump(data, fid, sort_keys=True, indent=0)


def _get_extra_data_path(home_dir=None):
Expand Down
103 changes: 103 additions & 0 deletions mne/utils/tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
# License: BSD-3-Clause
# Copyright the MNE-Python contributors.

import json
import os
import platform
import random
import re
import time
from functools import partial
from pathlib import Path
from urllib.error import URLError
Expand Down Expand Up @@ -232,3 +235,103 @@ def bad_open(url, timeout, msg):
out = out.getvalue()
assert "devel, " in out
assert "updating.html" not in out


def _worker_update_config_loop(home_dir, worker_id, iterations=10):
"""Util function to update config in parallel.

Worker function that repeatedly reads the config (via get_config)
and then updates it (via set_config) with a unique key/value pair.
A short random sleep is added to encourage interleaving.

Dummy function to simulate a worker that reads and updates the config.

Parameters
----------
home_dir : str
The home directory where the config file is located.
worker_id : int
The ID of the worker (for creating unique keys).
iterations : int
The number of iterations to run the loop.

"""
for i in range(iterations):
# Read current configuration (to simulate a read-modify cycle)
_ = get_config(home_dir=home_dir)
# Create a unique key/value pair.
new_key = f"worker_{worker_id}_{i}"
new_value = f"value_{worker_id}_{i}"
# Update the configuration (our set_config holds the lock over the full cycle)
set_config(new_key, new_value, home_dir=home_dir)
time.sleep(random.uniform(0, 0.05))
return worker_id


def test_parallel_get_set_config(tmp_path: Path):
"""Test that uses parallel workers to get and set config.

All the workers update the same configuration file concurrently. In a
correct implementation with proper path file locking, the final
config file remains valid JSON and includes all expected updates.

"""
pytest.importorskip("joblib")
pytest.importorskip("filelock")
from joblib import Parallel, delayed

# Use the temporary directory as our home directory.
home_dir = str(tmp_path)
# get_config_path will return home_dir/.mne/mne-python.json
config_file = get_config_path(home_dir=home_dir)

# if the config file already exists, remove it
if os.path.exists(config_file):
os.remove(config_file)

# Ensure that the .mne directory exists.
config_dir = tmp_path / ".mne"
config_dir.mkdir(exist_ok=True)

# Write an initial (valid) config file.
initial_config = {"initial": "True"}
with open(config_file, "w") as f:
json.dump(initial_config, f)

n_workers = 50
iterations = 10

# Launch multiple workers concurrently using joblib.
Parallel(n_jobs=10)(
delayed(_worker_update_config_loop)(home_dir, worker_id, iterations)
for worker_id in range(n_workers)
)

# Now, read back the config file.
final_config = get_config(home_dir=home_dir)
expected_keys = set()
expected_values = set()
# For each worker and iteration, check that the expected key/value pair is present.
for worker_id in range(n_workers):
for i in range(iterations):
expected_key = f"worker_{worker_id}_{i}"
expected_value = f"value_{worker_id}_{i}"

assert final_config.get(expected_key) == expected_value, (
f"Missing or incorrect value for key {expected_key}"
)
expected_keys.add(expected_key)
expected_values.add(expected_value)

# include the initial key/value pair
# that was written before the workers started

assert len(expected_keys - set(final_config.keys())) == 0
assert len(expected_values - set(final_config.values())) == 0

# Check that the final config is valid JSON.
with open(config_file) as f:
try:
json.load(f)
except json.JSONDecodeError as e:
pytest.fail(f"Config file is not valid JSON: {e}")
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ full-no-qt = [
"dipy",
"edfio >= 0.2.1",
"eeglabio",
"filelock>=3.18.0",
"h5py",
"imageio >= 2.6.1",
"imageio-ffmpeg >= 0.4.1",
Expand Down
Loading