diff --git a/devito/data/data.py b/devito/data/data.py index 0a4d671e93..7a7db3d398 100644 --- a/devito/data/data.py +++ b/devito/data/data.py @@ -259,8 +259,31 @@ def T(self): """ return self.transpose() + def _mpi_advanced_1d_target(self, glb_idx, axis): + """ + Return the raw local ndarray addressed by ``glb_idx`` without + advanced indexing along ``axis``. + + The MPI advanced-indexing helper code in ``devito.data.utils`` owns + the communication pattern; this hook is kept on ``Data`` because only + the subclass can bypass its own ``__getitem__`` and obtain a plain + ndarray view. + """ + target_idx = list(glb_idx) + target_idx[axis] = slice(None) + loc_idx = self._index_glb_to_loc(tuple(target_idx)) + target_axis = sum(not is_integer(i) for i in glb_idx[:axis]) + return super().__getitem__(loc_idx).view(np.ndarray), target_axis + @_check_idx def __getitem__(self, glb_idx, comm_type, gather_rank=None): + advanced = mpi_advanced_1d_index(self, glb_idx) + if advanced is not None: + # Global integer indices may refer to data owned by any rank. + return mpi_advanced_1d_get( + self, *advanced, target_getter=self._mpi_advanced_1d_target + ) + loc_idx = self._index_glb_to_loc(glb_idx) is_gather = isinstance(gather_rank, int) if is_gather and comm_type is gather: @@ -383,6 +406,17 @@ def __getitem__(self, glb_idx, comm_type, gather_rank=None): @_check_idx def __setitem__(self, glb_idx, val, comm_type): + advanced = mpi_advanced_1d_index(self, glb_idx) + if advanced is not None: + # ``val`` is rank-local and ordered by the caller's global integer + # indices; route entries to their owner ranks. + glb_idx, axis, indices, decomposition = advanced + mpi_advanced_1d_set( + self, glb_idx, val, axis, indices, decomposition, + target_getter=self._mpi_advanced_1d_target + ) + return + loc_idx = self._index_glb_to_loc(glb_idx) if loc_idx is NONLOCAL: @@ -461,7 +495,9 @@ def __setitem__(self, glb_idx, val, comm_type): raise ValueError(f"Cannot insert obj of type `{type(val)}` into a Data") def _normalize_index(self, idx): - if isinstance(idx, np.ndarray): + if isinstance(idx, np.ndarray) or ( + isinstance(idx, list) and index_contains_integer_sequence(idx, self.ndim) + ): # Advanced indexing mode return (idx,) else: diff --git a/devito/data/utils.py b/devito/data/utils.py index c2f044ddaf..3b4b165f09 100644 --- a/devito/data/utils.py +++ b/devito/data/utils.py @@ -1,6 +1,6 @@ import numpy as np -from devito.tools import Tag, as_list, as_tuple, is_integer +from devito.tools import Tag, as_list, as_tuple, dtype_to_mpidtype, is_integer, prod __all__ = [ 'NONLOCAL', @@ -9,10 +9,15 @@ 'convert_index', 'flip_idx', 'index_apply_modulo', + 'index_contains_integer_sequence', 'index_dist_to_repl', 'index_handle_oob', 'index_is_basic', + 'index_is_integer_sequence', 'loc_data_idx', + 'mpi_advanced_1d_get', + 'mpi_advanced_1d_index', + 'mpi_advanced_1d_set', 'mpi_index_maps', ] @@ -32,6 +37,312 @@ def index_is_basic(idx): return all(is_integer(i) or (i is NONLOCAL) for i in idx) +def index_is_integer_sequence(idx): + """ + Return True for a one-dimensional integer array-like index. + + NumPy treats ``a[[...]]`` and ``a[np.array([...])]`` as advanced indexing. + This helper recognizes only the integer-sequence form used by + ``mpi_advanced_1d_*``; slices and scalars continue through the existing + global-to-local index conversion. + """ + if not isinstance(idx, (list, tuple, np.ndarray)): + return False + + arr = np.asarray(idx) + return (arr.ndim == 1 and + (arr.size == 0 or np.issubdtype(arr.dtype, np.integer))) + + +def index_is_legacy_multidim_basic(idx, ndim): + """ + Return True when a top-level Python list is a legacy basic index. + + Historically, Devito accepted expressions such as ``data[[0, 1, 2]]`` as + shorthand for ``data[(0, 1, 2)]`` on 3D ``Data`` objects. NumPy treats a + plain list of integers as advanced indexing, but changing this top-level + Devito behavior would break existing MPI tests and user code. Lists nested + inside an index tuple, e.g. ``data[:, [0, 2]]``, remain available for the + routed MPI advanced-indexing path. + """ + return (ndim > 1 and isinstance(idx, list) and len(idx) == ndim and + all(is_integer(i) for i in idx)) + + +def index_contains_integer_sequence(idx, ndim): + """ + Return True if an index expression contains a 1D integer array-like item. + + This is the full-index-expression counterpart to + :func:`index_is_integer_sequence`, which checks one index component. It is + used to cheaply reject ordinary basic indexing before normalizing ``idx``, + keeping the MPI advanced-indexing path out of the hot path for + scalar/slice-only accesses. + """ + if isinstance(idx, list) and index_is_legacy_multidim_basic(idx, ndim): + return False + elif isinstance(idx, (np.ndarray, list)): + return index_is_integer_sequence(idx) + elif isinstance(idx, tuple): + return any(index_is_integer_sequence(i) for i in idx) + else: + return False + + +def mpi_advanced_1d_index(data, glb_idx): + """ + Normalize the supported 1D MPI advanced-indexing subset. + + Returns ``None`` when ``glb_idx`` can be handled by the regular Data + indexing path. Otherwise returns the normalized index, advanced axis, + global integer indices, and the axis decomposition used by + :func:`mpi_advanced_1d_get` or :func:`mpi_advanced_1d_set`. + + The supported case is deliberately narrow: exactly one MPI-distributed + dimension, indexed by exactly one one-dimensional integer sequence. The + integer sequence is interpreted as global indices in that distributed + dimension. All other dimensions must use basic indexing, i.e. slices or + scalar integers. + """ + if not index_contains_integer_sequence(glb_idx, data.ndim): + return None + + if not data._is_decomposed: + return None + + glb_idx = data._normalize_index(glb_idx) + if len(glb_idx) > data.ndim: + return None + elif len(glb_idx) < data.ndim: + glb_idx = glb_idx + (slice(None),)*(data.ndim - len(glb_idx)) + + distributed = [] + advanced = [] + for i, d in enumerate(data._decomposition): + if d is None: + continue + + distributed.append(i) + if index_is_integer_sequence(glb_idx[i]): + advanced.append(i) + + if not advanced: + return None + elif len(distributed) != 1: + raise NotImplementedError( + "Advanced indexing with MPI-distributed Data is currently " + "supported only for data with a single distributed dimension" + ) + elif len(advanced) != 1: + raise NotImplementedError( + "Advanced indexing with MPI-distributed Data supports a single " + "integer index array" + ) + + axis = advanced[0] + for i, idx in enumerate(glb_idx): + if i != axis and index_is_integer_sequence(idx): + raise NotImplementedError( + "Advanced indexing with MPI-distributed Data supports a single " + "integer index array" + ) + + indices = np.asarray(glb_idx[axis], dtype=np.int64) + return glb_idx, axis, indices, data._decomposition[axis] + + +def mpi_advanced_1d_get(data, glb_idx, axis, indices, decomposition, + target_getter): + """ + Read MPI-distributed ``data`` using one global integer index sequence. + + This implements the read side of the supported NumPy advanced-indexing + subset. Each rank supplies the global indices it wants in its local output. + The helper asks owner ranks for those entries and returns a normal NumPy + array ordered exactly like the caller's index sequence. + """ + indices, owners = _mpi_advanced_1d_owners(data, indices, decomposition) + shape = _mpi_advanced_1d_result_shape(data, glb_idx, axis, indices) + positions, scount, rcount, recv_indices = \ + _mpi_advanced_1d_indices_alltoall(data, indices, owners) + + source, source_axis = target_getter(glb_idx, axis) + source = np.moveaxis(source, source_axis, 0) + local_offsets = _mpi_advanced_1d_local_offsets(recv_indices, decomposition) + send_data = np.ascontiguousarray(source[local_offsets]) + + payload_shape = send_data.shape[1:] + recv_data = _mpi_advanced_1d_data_alltoall(data, send_data, rcount, scount, + payload_shape) + + ret = np.empty(shape, dtype=data.dtype) + ret_view = np.moveaxis(ret, source_axis, 0) + ret_view[np.concatenate(positions)] = recv_data + return ret + + +def mpi_advanced_1d_set(data, glb_idx, val, axis, indices, decomposition, + target_getter): + """ + Assign into MPI-distributed ``data`` using one global integer index sequence. + + ``val`` is interpreted as local to the calling rank and ordered according + to ``indices``. The helper routes each value to the rank that owns the + corresponding global index, preserving NumPy broadcasting for the + non-distributed dimensions. + """ + indices, owners = _mpi_advanced_1d_owners(data, indices, decomposition) + shape = _mpi_advanced_1d_result_shape(data, glb_idx, axis, indices) + val = np.asarray(val, dtype=data.dtype) + val = np.broadcast_to(val, shape) + value_axis = sum(not is_integer(i) for i in glb_idx[:axis]) + val = np.ascontiguousarray(np.moveaxis(val, value_axis, 0)) + payload_shape = val.shape[1:] + + positions, scount, rcount, recv_indices = \ + _mpi_advanced_1d_indices_alltoall(data, indices, owners) + + send_data = _mpi_advanced_1d_pack_axis0(val, positions) + recv_data = _mpi_advanced_1d_data_alltoall(data, send_data, scount, rcount, + payload_shape) + + error = None + if recv_indices.size and np.unique(recv_indices).size != recv_indices.size: + error = "Duplicate global indices in MPI-distributed advanced assignment" + _mpi_advanced_1d_error(data, error) + + target, target_axis = target_getter(glb_idx, axis) + target = np.moveaxis(target, target_axis, 0) + local_offsets = _mpi_advanced_1d_local_offsets(recv_indices, decomposition) + target[local_offsets] = recv_data + + +def _mpi_advanced_1d_error(data, error): + """Raise the first error reported by any rank, on every rank.""" + if data._distributor.nprocs > 1: + errors = data._distributor.comm.allgather(error) + error = next((i for i in errors if i is not None), None) + + if error is not None: + raise ValueError(error) + + +def _mpi_advanced_1d_owners(data, indices, decomposition): + """Map global indices to owning ranks, normalizing negative indices.""" + indices = indices.copy() + if decomposition.glb_max is not None: + indices[indices < 0] += decomposition.glb_max + 1 + + owners = np.full(indices.size, -1, dtype=np.int32) + for i, r in enumerate(decomposition): + if r.size: + owners[np.isin(indices, r)] = i + + error = None + if np.any(owners < 0): + error = "Advanced index contains out-of-bounds global indices" + _mpi_advanced_1d_error(data, error) + + return indices, owners + + +def _mpi_advanced_1d_result_shape(data, glb_idx, axis, indices): + """Return the NumPy result shape for the supported advanced-index case.""" + shape = [] + for i, idx in enumerate(glb_idx): + if is_integer(idx): + continue + elif i == axis: + shape.append(indices.size) + elif isinstance(idx, slice): + shape.append(len(range(*idx.indices(data.shape[i])))) + else: + raise NotImplementedError( + "Advanced indexing with MPI-distributed Data supports only " + "integer arrays, integer indices, and slices" + ) + return tuple(shape) + + +def _mpi_advanced_1d_local_offsets(indices, decomposition): + """Convert global indices received by an owner rank to local offsets.""" + return np.array([decomposition.index_glb_to_loc(int(i)) for i in indices], + dtype=np.int64) + + +def _mpi_advanced_1d_positions(owners, nprocs): + """ + Group local index positions by destination rank. + + ``positions[r]`` are the entries in the caller's advanced index whose data + must be exchanged with rank ``r``. + """ + return [np.where(owners == i)[0] for i in range(nprocs)] + + +def _mpi_advanced_1d_indices_alltoall(data, indices, owners): + """ + Exchange requested global indices with their owner ranks. + + The returned counts describe the same exchange pattern used later for the + payload values. + """ + comm = data._distributor.comm + nprocs = data._distributor.nprocs + positions = _mpi_advanced_1d_positions(owners, nprocs) + scount = np.array([i.size for i in positions], dtype=np.int32) + rcount = _mpi_advanced_1d_count_alltoall(comm, scount) + + send_indices = _mpi_advanced_1d_pack_axis0(indices, positions) + recv_indices = np.empty(int(np.sum(rcount)), dtype=np.int64) + idtype = dtype_to_mpidtype(np.int64) + _mpi_advanced_1d_alltoallv(comm, send_indices, recv_indices, scount, + rcount, idtype) + + return positions, scount, rcount, recv_indices + + +def _mpi_advanced_1d_data_alltoall(data, send_data, scount, rcount, payload_shape): + """Exchange payload arrays using an already established index exchange.""" + comm = data._distributor.comm + payload_size = prod(payload_shape) + recv_data = np.empty((int(np.sum(rcount)), *payload_shape), dtype=data.dtype) + + dscount = scount * payload_size + drcount = rcount * payload_size + mpitype = dtype_to_mpidtype(data.dtype) + _mpi_advanced_1d_alltoallv(comm, send_data, recv_data, dscount, drcount, + mpitype) + + return recv_data + + +def _mpi_advanced_1d_count_alltoall(comm, scount): + rcount = np.empty_like(scount) + comm.Alltoall(scount, rcount) + return rcount + + +def _mpi_advanced_1d_alltoallv(comm, send, recv, scount, rcount, mpitype): + sdisp = _mpi_advanced_1d_displacements(scount) + rdisp = _mpi_advanced_1d_displacements(rcount) + comm.Alltoallv([send, scount, sdisp, mpitype], + [recv, rcount, rdisp, mpitype]) + + +def _mpi_advanced_1d_displacements(counts): + displacements = np.empty_like(counts, dtype=np.int32) + displacements[0] = 0 + displacements[1:] = np.cumsum(counts[:-1], dtype=np.int64) + return displacements + + +def _mpi_advanced_1d_pack_axis0(array, positions): + """Pack an axis-0 array in destination-rank order.""" + return np.ascontiguousarray(np.concatenate([array[i] for i in positions], + axis=0)) + + def index_apply_modulo(idx, modulo): if is_integer(idx): return idx % modulo diff --git a/devito/types/dense.py b/devito/types/dense.py index f62c452058..eaf8a85600 100644 --- a/devito/types/dense.py +++ b/devito/types/dense.py @@ -522,6 +522,21 @@ def data_domain(self): self._is_halo_dirty = True return self._data._global(self._mask_domain, self._decomposition) + @property + @_allocate_memory + def data_local(self): + """ + The local domain data values, with global indexing disabled. + + Notes + ----- + This accessor behaves like a rank-local numpy.ndarray view under MPI. It + is useful when the input data is already distributed according to this + object's decomposition. + """ + self._is_halo_dirty = True + return self._data._global(self._mask_domain, self._decomposition)._local + @property @_allocate_memory def data_with_halo(self): diff --git a/examples/mpi/data_initialization.ipynb b/examples/mpi/data_initialization.ipynb new file mode 100644 index 0000000000..dbe31d393b --- /dev/null +++ b/examples/mpi/data_initialization.ipynb @@ -0,0 +1,438 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Initializing distributed Devito data under MPI\n", + "\n", + "This notebook summarizes the common ways to initialize `Function`, `TimeFunction`, `SparseFunction`, and `SparseTimeFunction` data when running with MPI. It focuses on the distinction between globally replicated arrays, arrays already distributed like the Devito object, and arrays distributed in some external layout.\n", + "\n", + "The examples use the same execution style as `examples/mpi/overview.ipynb`: start an MPI `ipyparallel` cluster, connect to it from the notebook, and execute the Devito examples on all engines with `%%px`." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Prerequisites\n", + "\n", + "The examples below are written for 4 MPI processes. Other process counts work for the APIs shown here, but a few comments and toy distributions assume 4 ranks.\n", + "\n", + "From the root Devito directory, install the MPI requirements and create the `ipyparallel` profile if needed:\n", + "\n", + "```bash\n", + "pip install -r requirements-mpi.txt\n", + "./scripts/create_ipyparallel_mpi_profile.sh\n", + "```\n", + "\n", + "Then start the cluster in another terminal:\n", + "\n", + "```bash\n", + "ipcluster start -n 4 --profile=mpi --engines=mpi\n", + "```" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import ipyparallel as ipp\n", + "\n", + "c = ipp.Client(profile=\"mpi\")\n", + "view = c[:]\n", + "view.block = True" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%px\n", + "from devito import configuration\n", + "\n", + "configuration[\"mpi\"] = True\n", + "configuration[\"log-level\"] = \"WARNING\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Terminology\n", + "\n", + "A distributed Devito object has a logical global shape, but each MPI rank owns only one local piece. Here, \"array\" means an ordinary NumPy array unless the text explicitly refers to a Devito `.data` or `.data_local` object. In the examples below:\n", + "\n", + "* `glb_a` is a globally replicated array: every rank has the full logical array.\n", + "* `loc_a` is a local array with the same distribution as the Devito object.\n", + "* `loc_ext_a` is a local array with a different, external distribution.\n", + "* `glb_idx` is a 1D array of global indices labeling the local values in `loc_ext_a`.\n", + "\n", + "## Data assignment overview\n", + "\n", + "| Object | Global replicated RHS | Local RHS, same distribution | Local RHS, different distribution |\n", + "| --- | --- | --- | --- |\n", + "| Dense | [`f.data[...] = glb_a`](#dense-global) | [`f.data_local[...] = loc_a`](#dense-local) | Not supported for 2D/3D volumes |\n", + "| Sparse | [`src.data[...] = glb_a`](#sparse-global) | [`src.data_local[...] = loc_a`](#sparse-local) | [`src.data[:, glb_idx] = loc_ext_a`](#sparse-external) |\n", + "\n", + "### Indexing forms\n", + "\n", + "Indexing with a 1D integer list or array on a specific axis is the routed form used below:\n", + "\n", + "* `data[:, [g]]` or `data[:, np.array([g])]`: advanced indexing on that axis; under MPI, Devito routes by global index on the distributed dimension.\n", + "* `data[np.array([...])]`: advanced indexing on the first axis.\n", + "* `data[[0, 0, 0, 0]]` on multi-dimensional `Data`: Devito legacy basic indexing, equivalent to `data[0, 0, 0, 0]`.\n", + "\n", + "The last form is a known historical inconsistency with NumPy: Devito has long accepted a top-level Python list whose length matches the data rank as basic multi-dimensional indexing. It is kept for compatibility. Prefer explicit tuples for new basic multi-dimensional indexing, for example `data[(0, 0, 0, 0)]`, and use a nested list or array on the selected axis for routed indexing, for example `data[:, [g]]`." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "\n", + "\n", + "## Dense data from global replicated RHS\n", + "\n", + "For dense objects such as `Function` and `TimeFunction`, `data` uses the logical global domain. If every rank has the full array, assign it directly through `data`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%px\n", + "import numpy as np\n", + "from devito import Function, Grid\n", + "\n", + "grid = Grid(shape=(4, 4), extent=(3.0, 3.0))\n", + "f = Function(name=\"f_global\", grid=grid)\n", + "\n", + "# Every rank has the full logical array.\n", + "glb_a = np.arange(np.prod(grid.shape), dtype=f.dtype).reshape(grid.shape)\n", + "f.data[...] = glb_a\n", + "assert np.all(f.data_local == glb_a[f.local_indices])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "\n", + "\n", + "## Dense data from local array with same distribution\n", + "\n", + "Use `data_local` when the array already has Devito's local shape on each MPI rank. `data_local` is a rank-local NumPy view: reads, writes, and in-place NumPy operations affect only the storage owned by that rank and do not perform global indexing or routing." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%px\n", + "import numpy as np\n", + "from devito import Function, Grid\n", + "\n", + "grid = Grid(shape=(4, 4), extent=(3.0, 3.0))\n", + "f = Function(name=\"f_local\", grid=grid)\n", + "\n", + "# The RHS is already shaped like this rank's owned domain.\n", + "rank = grid.distributor.myrank\n", + "loc_a = np.full(f.data_local.shape, rank + 1, dtype=f.dtype)\n", + "f.data_local[:, :] = loc_a[:, :]\n", + "assert np.all(f.data_local == loc_a)\n", + "\n", + "# In-place local NumPy operations are also local.\n", + "f.data_local[:, :] *= -1\n", + "assert np.all(f.data_local == -loc_a)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "\n", + "\n", + "## Sparse data from global replicated RHS\n", + "\n", + "For sparse objects, `data` can also be initialized from a globally replicated sparse array. Every rank sees the same logical `(nt, npoint)` values; Devito stores only the sparse points owned by the rank." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%px\n", + "import numpy as np\n", + "from devito import Grid, SparseTimeFunction\n", + "\n", + "grid = Grid(shape=(4, 4), extent=(3.0, 3.0))\n", + "nt = 3\n", + "npoint = 8\n", + "src = SparseTimeFunction(name=\"src_global\", grid=grid, nt=nt, npoint=npoint)\n", + "\n", + "# Every rank has the full logical sparse array.\n", + "glb_a = np.arange(nt*npoint, dtype=src.dtype).reshape(nt, npoint)\n", + "src.data[:, :] = glb_a\n", + "\n", + "assert np.all(src.data_local == glb_a[src.local_indices])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "\n", + "\n", + "## Sparse data from local array with same distribution\n", + "\n", + "Use `data_local` when the array already follows Devito's sparse distribution on this rank. The local array columns must be ordered exactly like Devito's local sparse storage." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%px\n", + "import numpy as np\n", + "from devito import Grid, SparseTimeFunction\n", + "\n", + "grid = Grid(shape=(4, 4), extent=(3.0, 3.0))\n", + "nt = 3\n", + "npoint = 8\n", + "src = SparseTimeFunction(name=\"src_local\", grid=grid, nt=nt, npoint=npoint)\n", + "\n", + "glb_a = np.arange(nt*npoint, dtype=src.dtype).reshape(nt, npoint)\n", + "# The RHS is already shaped and ordered like this rank's sparse storage.\n", + "loc_a = -glb_a[src.local_indices]\n", + "src.data_local[:, :] = loc_a\n", + "assert np.all(src.data_local == loc_a)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "\n", + "\n", + "## Sparse data from an external distribution\n", + "\n", + "A sparse object has one dimension whose entries are sparse points, numbered globally from `0` to `npoint - 1`. Devito may store those points on different ranks from the external object that produced the input data.\n", + "\n", + "When the input array has its own distribution, this rank holds only some sparse-point columns. Provide those columns together with `glb_idx`, where each entry gives the global sparse point number for the matching local column. Devito then routes the values to the ranks that own those sparse points.\n", + "\n", + "The key assignment is:\n", + "\n", + "```python\n", + "src.data[:, glb_idx] = loc_ext_a\n", + "```\n", + "\n", + "where `glb_idx[j]` is the global sparse point number for `loc_ext_a[:, j]`. The local order does not have to match Devito's sparse distribution.\n", + "\n", + "This is the routed indexing form described above: one 1D integer index on one MPI-distributed dimension; the other dimensions use basic indices such as slices or scalars." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%px\n", + "import numpy as np\n", + "from devito import Grid, SparseTimeFunction\n", + "\n", + "grid = Grid(shape=(4, 4), extent=(3.0, 3.0))\n", + "nt = 3\n", + "npoint = 8\n", + "src = SparseTimeFunction(name=\"src\", grid=grid, nt=nt, npoint=npoint)\n", + "\n", + "rank = grid.distributor.myrank\n", + "nprocs = grid.distributor.nprocs\n", + "\n", + "# Toy external distribution: each rank has a subset of sparse points in a non-Devito order.\n", + "glb_idx = np.array_split(np.arange(npoint)[::-1], nprocs)[rank]\n", + "\n", + "# A globally known reference is used only to make this example self-checking.\n", + "# In an application, loc_ext_a would come from an external data object.\n", + "glb_a = np.arange(nt*npoint, dtype=src.dtype).reshape(nt, npoint)\n", + "loc_ext_a = glb_a[:, glb_idx]\n", + "\n", + "src.data_local[:, :] = 0\n", + "src.data[:, glb_idx] = loc_ext_a\n", + "\n", + "# Read back the same externally distributed labels.\n", + "assert np.all(src.data[:, glb_idx] == loc_ext_a)\n", + "\n", + "# Check this rank's Devito-local sparse storage too.\n", + "assert np.all(src.data_local == glb_a[src.local_indices])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The RHS must have the same logical layout as the selected LHS. For a `SparseTimeFunction`, `loc_ext_a` should have shape `(nt, len(glb_idx))`." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## One sparse point at a time\n", + "\n", + "Use this form when each local sparse time series must be transformed before loading:\n", + "\n", + "```python\n", + "src.data[:, [g]] = new_a[:, None]\n", + "```\n", + "\n", + "The one-element list is intentional. `src.data[:, g] = new_a` is basic indexing and preserves the existing owner-local behavior; `src.data[:, [g]] = new_a[:, None]` is routed by global sparse point number." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%px\n", + "import numpy as np\n", + "from devito import Grid, SparseTimeFunction\n", + "\n", + "grid = Grid(shape=(4, 4), extent=(3.0, 3.0))\n", + "nt = 4\n", + "rank = grid.distributor.myrank\n", + "nprocs = grid.distributor.nprocs\n", + "npoint = 2*nprocs\n", + "src = SparseTimeFunction(name=\"src_interp\", grid=grid, nt=nt, npoint=npoint)\n", + "\n", + "# Equal local counts keep the example focused on the one-point assignment.\n", + "glb_idx = np.array_split(np.arange(npoint)[::-1], nprocs)[rank]\n", + "\n", + "glb_a = np.arange(nt*npoint, dtype=src.dtype).reshape(nt, npoint)\n", + "loc_ext_a = glb_a[:, glb_idx]\n", + "\n", + "def transform(values):\n", + " # Stand-in for a cubic spline or another application-level operation.\n", + " return 2*values + 1\n", + "\n", + "src.data_local[:, :] = 0\n", + "\n", + "for j, g in enumerate(glb_idx):\n", + " new_a = transform(loc_ext_a[:, j])\n", + " src.data[:, [g]] = new_a[:, np.newaxis]\n", + "\n", + "assert np.all(src.data[:, glb_idx] == transform(loc_ext_a))\n", + "\n", + "assert np.all(src.data_local == transform(glb_a[src.local_indices]))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Each routed assignment is collective. The example above has equal local counts, so all ranks naturally enter the loop the same number of times. With irregular counts, loop over the maximum local count and have ranks without data in a round pass empty arrays:\n", + "\n", + "```python\n", + "idx = np.empty(0, dtype=np.int64)\n", + "values = np.empty((nt, 0), dtype=src.dtype)\n", + "src.data[:, idx] = values\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "\n", + "\n", + "## NumPy expressions on the RHS\n", + "\n", + "There is no separate rule for NumPy expressions. Choose the LHS from the distribution of the RHS result:\n", + "\n", + "* use `data` when the RHS result is globally replicated;\n", + "* use `data_local` when the RHS result is already in Devito's local distribution;\n", + "* for sparse data in an external distribution, use the indexed form shown above: `src.data[:, glb_idx] = loc_ext_a`.\n", + "\n", + "In the example below, `taper` is replicated, but `rec.data_local*taper[:, None]` is local because the result has the shape and order of `rec.data_local`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%px\n", + "import numpy as np\n", + "from devito import Grid, SparseTimeFunction\n", + "\n", + "grid = Grid(shape=(4, 4), extent=(3.0, 3.0))\n", + "nt = 5\n", + "npoint = 8\n", + "rec = SparseTimeFunction(name=\"rec\", grid=grid, nt=nt, npoint=npoint)\n", + "\n", + "glb_a = np.arange(nt*npoint, dtype=rec.dtype).reshape(nt, npoint)\n", + "loc_a = glb_a[rec.local_indices]\n", + "rec.data_local[:, :] = loc_a\n", + "\n", + "before = rec.data_local.copy()\n", + "taper = np.linspace(1.0, 2.0, nt).astype(rec.dtype)\n", + "\n", + "# Vectorized local expression. np.newaxis and None are equivalent here.\n", + "rec.data_local[:, :] *= taper[:, np.newaxis]\n", + "assert np.all(rec.data_local == before*taper[:, None])\n", + "\n", + "# A read/write no-op is also local and valid.\n", + "rec.data_local[:, :] = rec.data_local[:, :]\n", + "assert np.all(rec.data_local == before*taper[:, None])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Layout notes\n", + "\n", + "* Logical axes and shape must match the selected LHS view. For `src.data[:, glb_idx]`, the RHS shape is `(nt, len(glb_idx))`.\n", + "* Devito does not transpose axes automatically. If an external object stores values as `(npoint_local, nt)`, transpose or build a `(nt, npoint_local)` view or buffer first.\n", + "* Memory order is separate from logical layout. C-contiguous, Fortran-contiguous, and ordinary NumPy strided views are valid as long as the shape and axes are right. Flags such as `array.flags[\"F_CONTIGUOUS\"]` are useful diagnostics, but Devito does not need special handling solely because an array came from a Fortran-oriented object.\n", + "* A different time sampling, padding scheme, or unsupported dense 2D/3D distribution still needs an explicit interpolation, slicing, or exchange step before assignment." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.5" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/tests/test_data.py b/tests/test_data.py index fb09d4ef47..cf899cfc1d 100644 --- a/tests/test_data.py +++ b/tests/test_data.py @@ -68,6 +68,20 @@ def test_advanced_indexing(self): assert np.all(u.data[1, :, :, 0] == 0.) assert np.all(u.data[1, :, :, -1] == 0.) + def test_boolean_masking_multidim(self): + """Test data access through a multi-dimensional boolean mask.""" + grid = Grid(shape=(5, 6, 7)) + u = Function(name='u', grid=grid, space_order=0) + + mask = np.zeros(grid.shape, dtype=bool) + mask[1:4, 2:5, 3:6] = True + + u.data[:] = 0 + u.data[mask] = 1 + + assert np.all(u.data[mask] == 1) + assert np.all(u.data[~mask] == 0) + def test_negative_step(self): """Test slicing with a negative step.""" grid = Grid(shape=(6, 6, 6)) @@ -643,6 +657,11 @@ class TestDataDistributed: Test Data indexing and manipulation when distributed over a set of MPI processes. """ + @staticmethod + def _local_global_indices(data, axis=0): + rank = data._distributor.myrank + return np.asarray(data._decomposition[axis][rank], dtype=np.int64) + @pytest.mark.parallel(mode=4) def test_localviews(self, mode): grid = Grid(shape=(4, 4)) @@ -681,6 +700,10 @@ def test_trivial_insertion(self, mode): assert np.all(u.data == 1.) assert np.all(u.data._local == 1.) + u.data_local[:] = 2. + assert np.all(u.data == 2.) + assert np.all(u.data_local == 2.) + v.data_with_halo[:] = 1. assert v.data_with_halo[:].shape == (3, 3) assert np.all(v.data_with_halo == 1.) @@ -1351,6 +1374,151 @@ def test_from_replicated_to_distributed(self, mode): except Exception as e: raise AssertionError('Assert False') from e + @pytest.mark.parallel(mode=4) + def test_advanced_indexing_set_get_1d(self, mode): + grid = Grid(shape=(8,)) + f = Function(name='f', grid=grid, space_order=0, dtype=np.int32) + + global_data = np.arange(8, dtype=f.dtype) + rank = grid.distributor.myrank + source_index = np.array_split(np.arange(8)[::-1], + grid.distributor.nprocs)[rank] + source_data = global_data[source_index] + + f.data[:] = 0 + f.data[source_index] = source_data + + local_index = self._local_global_indices(f.data) + assert np.all(f.data_local == global_data[local_index]) + assert np.all(f.data[source_index] == source_data) + + @pytest.mark.parallel(mode=4) + def test_advanced_indexing_list_and_negative_indices(self, mode): + grid = Grid(shape=(8,)) + f = Function(name='f', grid=grid, space_order=0, dtype=np.int32) + + rank = grid.distributor.myrank + index = [-1, 0] if rank == 0 else [] + values = np.array([70, 10], dtype=f.dtype) if rank == 0 else \ + np.empty(0, dtype=f.dtype) + + f.data[:] = 0 + f.data[index] = values + + expected = np.zeros(8, dtype=f.dtype) + expected[[-1, 0]] = [70, 10] + local_index = self._local_global_indices(f.data) + assert np.all(f.data_local == expected[local_index]) + assert np.all(f.data[index] == values) + + index = [1] if rank == 0 else [] + values = np.array([20], dtype=f.dtype) if rank == 0 else \ + np.empty(0, dtype=f.dtype) + + f.data[:] = 0 + f.data[index] = values + + expected = np.zeros(8, dtype=f.dtype) + expected[1] = 20 + assert np.all(f.data_local == expected[local_index]) + assert np.all(f.data[index] == values) + + @pytest.mark.parallel(mode=4) + def test_top_level_list_as_basic_multidim_index(self, mode): + grid = Grid(shape=(4, 4)) + f = Function(name='f', grid=grid, space_order=0, dtype=np.int32) + + f.data[:] = np.arange(16, dtype=f.dtype).reshape(4, 4) + + # Preserve Devito's legacy interpretation of a top-level list matching + # the data rank as a multi-dimensional basic index, not advanced + # indexing along the first dimension. + assert f.data[[0, 0]] == f.data[0, 0] + f.data[[0, 0]] = 99 + assert f.data[[0, 0]] == f.data[0, 0] + + @pytest.mark.parallel(mode=4) + @pytest.mark.parametrize('order', ['C', 'F']) + def test_advanced_indexing_with_slice(self, mode, order): + grid = Grid(shape=(8,)) + nt = 3 + f = TimeFunction(name='f', grid=grid, save=nt, time_order=1, + space_order=0, dtype=np.int32) + + global_data = np.arange(nt*8, dtype=f.dtype).reshape(nt, 8) + rank = grid.distributor.myrank + source_index = np.array_split(np.arange(8)[::-1], + grid.distributor.nprocs)[rank] + time_window = slice(1, None) + source_data = np.array(global_data[time_window, source_index], + order=order) + if order == 'C': + assert source_data.flags.c_contiguous + else: + assert source_data.flags.f_contiguous + + f.data[:] = 0 + f.data[time_window, source_index] = source_data + + expected = np.zeros_like(global_data) + expected[time_window, :] = global_data[time_window, :] + local_index = self._local_global_indices(f.data, axis=1) + assert np.all(f.data == expected[:, local_index]) + assert np.all(f.data[time_window, source_index] == source_data) + + @pytest.mark.parallel(mode=4) + def test_advanced_indexing_with_scalar(self, mode): + grid = Grid(shape=(8,)) + nt = 3 + f = TimeFunction(name='f', grid=grid, save=nt, time_order=1, + space_order=0, dtype=np.int32) + + global_data = np.arange(nt*8, dtype=f.dtype).reshape(nt, 8) + rank = grid.distributor.myrank + source_index = np.array_split(np.arange(8)[::-1], + grid.distributor.nprocs)[rank] + time_index = 1 + source_data = global_data[time_index, source_index] + + f.data[:] = 0 + f.data[time_index, source_index] = source_data + + expected = np.zeros_like(global_data) + expected[time_index, :] = global_data[time_index, :] + local_index = self._local_global_indices(f.data, axis=1) + assert np.all(f.data == expected[:, local_index]) + assert np.all(f.data[time_index, source_index] == source_data) + + @pytest.mark.parallel(mode=4) + def test_advanced_indexing_errors(self, mode): + grid = Grid(shape=(8,)) + f = Function(name='f', grid=grid, space_order=0, dtype=np.int32) + + rank = grid.distributor.myrank + duplicate_index = np.array([1, 1]) if rank == 0 else \ + np.empty(0, dtype=np.int64) + duplicate_data = np.ones(duplicate_index.size, dtype=f.dtype) + + with pytest.raises(ValueError, match="Duplicate global indices"): + f.data[duplicate_index] = duplicate_data + + oob_index = np.array([8]) if rank == 0 else np.empty(0, dtype=np.int64) + oob_data = np.ones(oob_index.size, dtype=f.dtype) + + with pytest.raises(ValueError, match="out-of-bounds"): + f.data[oob_index] = oob_data + + with pytest.raises(ValueError, match="out-of-bounds"): + f.data[oob_index] + + @pytest.mark.parallel(mode=4) + def test_advanced_indexing_multiple_distributed_dimensions(self, mode): + grid = Grid(shape=(4, 4)) + f = Function(name='f', grid=grid, space_order=0, dtype=np.int32) + + with pytest.raises(NotImplementedError, match="single distributed dimension"): + f.data[np.array([0, 1]), :] + @pytest.mark.parallel(mode=4) def test_misc_setup(self, mode): """Test setup of Functions with mixed distributed/replicated Dimensions.""" diff --git a/tests/test_mpi.py b/tests/test_mpi.py index 7329cc31e3..2ce609821b 100644 --- a/tests/test_mpi.py +++ b/tests/test_mpi.py @@ -651,6 +651,110 @@ def test_local_indices(self, coords, expected, expectedinds, mode): expectedinds = expectedinds[grid.distributor.myrank] assert sf.local_indices == expectedinds + @pytest.mark.parallel(mode=[1, 4]) + @pytest.mark.parametrize('order', ['C', 'F']) + def test_sparse_time_data_advanced_indexing(self, mode, order): + """ + Rank-local external traces are labeled by global sparse indices. + """ + grid = Grid(shape=(4, 4), extent=(3.0, 3.0)) + nt = 3 + npoint = 8 + rec = SparseTimeFunction(name="rec", grid=grid, nt=nt, npoint=npoint) + + local_sparse_indices = np.arange(npoint)[rec.local_indices[-1]] + global_data = np.arange(nt*npoint, dtype=rec.dtype).reshape(nt, npoint) + expected_local = global_data[:, local_sparse_indices] + + rank = grid.distributor.myrank + trace_index = np.array_split(np.arange(npoint)[::-1], + grid.distributor.nprocs)[rank] + trace_data = np.array(global_data[:, trace_index], order=order) + if order == 'C': + assert trace_data.flags.c_contiguous + else: + assert trace_data.flags.f_contiguous + + rec.data_local[:, :] = 0 + rec.data[:, trace_index] = trace_data + assert np.all(rec.data_local == expected_local) + assert np.all(rec.data[:, trace_index] == trace_data) + + @pytest.mark.parallel(mode=[1, 4]) + def test_sparse_time_data_local_access(self, mode): + grid = Grid(shape=(4, 4), extent=(3.0, 3.0)) + nt = 3 + npoint = 8 + rec = SparseTimeFunction(name="rec", grid=grid, nt=nt, npoint=npoint) + + local_sparse_indices = np.arange(npoint)[rec.local_indices[-1]] + global_data = np.arange(nt*npoint, dtype=rec.dtype).reshape(nt, npoint) + expected_local = global_data[:, local_sparse_indices] + + # A full global, replicated RHS is sliced by Devito for this rank. + rec.data[:, :] = global_data[:, :] + assert np.all(rec.data_local == expected_local) + + # Individual sparse-column writes use global sparse indices. + rec.data_local[:, :] = 0 + for local_i, global_i in enumerate(local_sparse_indices): + rec.data[:, global_i] = expected_local[:, local_i] + assert np.all(rec.data_local == expected_local) + + # Already-distributed rank-local buffers should be copied locally. + local_data = -expected_local + rec.data[rec.local_indices] = local_data[:, :] + assert np.all(rec.data_local == local_data) + local_indices_data = rec.data_local.copy() + + rec.data_local[:, :] = 0 + rec.data_local[:, :] = local_data[:, :] + assert np.all(rec.data_local == local_indices_data) + + taper = np.arange(1, nt + 1, dtype=rec.dtype) + loop_tapered = local_data.copy() + for i in range(rec.npoint): + loop_tapered[:, i] *= taper[:] + + rec.data_local[:, :] *= taper[:, np.newaxis] + assert np.all(rec.data_local == loop_tapered) + + @pytest.mark.parallel(mode=4) + def test_sparse_data_advanced_indexing_empty_ranks(self, mode): + grid = Grid(shape=(4, 4), extent=(3.0, 3.0)) + npoint = 2 + sf = SparseFunction(name="sf", grid=grid, npoint=npoint) + + global_data = np.arange(npoint, dtype=sf.dtype) + local_sparse_indices = sf._distributor.glb_numb[0] + expected_local = global_data[local_sparse_indices] + + rank = grid.distributor.myrank + point_index = np.array([1, 0]) if rank == 0 else np.empty(0, dtype=np.int64) + point_data = global_data[point_index] + + sf.data[point_index] = point_data + assert np.all(sf.data_local == expected_local) + assert np.all(sf.data[point_index] == point_data) + + @pytest.mark.parallel(mode=4) + def test_sparse_coordinates_advanced_indexing_empty_ranks(self, mode): + grid = Grid(shape=(4, 4), extent=(3.0, 3.0)) + npoint = 2 + sf = SparseFunction(name="sf", grid=grid, npoint=npoint) + + local_sparse_indices = sf._distributor.glb_numb[0] + rank = grid.distributor.myrank + point_index = np.array([1, 0]) if rank == 0 else np.empty(0, dtype=np.int64) + global_coords = np.arange(npoint*grid.dim, dtype=sf.coordinates.dtype) + global_coords = global_coords.reshape(npoint, grid.dim) + point_coords = global_coords[point_index] + + sf.coordinates.data[point_index] = point_coords + + assert np.all(sf.coordinates.data == global_coords[local_sparse_indices]) + assert np.all(sf.coordinates.data[point_index] == point_coords) + @pytest.mark.parallel(mode=4) def test_scatter_gather(self, mode): """