blob: dba7d8332e875f49ba67c6cc5398565ba06ae9d0 [file]
// This file is part of Eigen, a lightweight C++ template library
// for linear algebra.
//
// Copyright (C) 2026 Rasmus Munk Larsen <rmlarsen@gmail.com>
//
// This Source Code Form is subject to the terms of the Mozilla
// Public License v. 2.0. If a copy of the MPL was not distributed
// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
// SPDX-License-Identifier: MPL-2.0
#ifndef EIGEN_THREADED_SPARSE_PRODUCT_H
#define EIGEN_THREADED_SPARSE_PRODUCT_H
// IWYU pragma: private
#include "./InternalHeaderCheck.h"
namespace Eigen {
namespace internal {
inline ThreadPool& default_threaded_sparse_pool() {
static ThreadPool pool(numext::maxi<unsigned>(1u, std::thread::hardware_concurrency()));
return pool;
}
// nnz-balanced partition of an outer range [0, outerSize) into numChunks
// contiguous chunks. boundaries[t] = first outer index owned by partition t;
// boundaries[numChunks] = outerSize.
//
// The split uses std::lower_bound on the outer-index array. Targets are
// monotonically increasing in t, so each search starts from the previous
// boundary; total work is bounded by O(numChunks + log outerSize) rather than
// numChunks * log(outerSize). Each chunk's nnz count differs from the ideal by
// at most max_nnz_per_outer.
template <typename StorageIndex>
inline void compute_nnz_balanced_partition(const StorageIndex* outer, Index outerSize, Index totalNnz, int numChunks,
std::vector<Index>& boundaries) {
boundaries.assign(numChunks + 1, 0);
boundaries[numChunks] = outerSize;
if (numChunks <= 1 || outerSize == 0 || totalNnz == 0) return;
const StorageIndex* const last = outer + outerSize + 1;
const StorageIndex* lo = outer;
for (int t = 1; t < numChunks; ++t) {
Index target = (static_cast<Index>(t) * totalNnz) / numChunks;
lo = std::lower_bound(lo, last, static_cast<StorageIndex>(target));
boundaries[t] = lo - outer;
}
}
// Single-row dot-product kernel. Used as the body of a per-row OpenMP
// `parallel for` (which can do its own dynamic scheduling) and from within
// run_dot_chunk for the ThreadPool dispatch path. Marked ALWAYS_INLINE so
// the loop body is visible to the OMP iteration scheduler for vectorization.
template <bool Conjugate, bool Overwrite, typename Scalar, typename StorageIndex, typename XScalar, typename YScalar,
typename AlphaT>
EIGEN_ALWAYS_INLINE void run_dot_row(const Scalar* EIGEN_RESTRICT vals, const StorageIndex* EIGEN_RESTRICT inner,
const StorageIndex* EIGEN_RESTRICT outer,
const StorageIndex* EIGEN_RESTRICT innerNnz, const XScalar* EIGEN_RESTRICT x,
YScalar* EIGEN_RESTRICT y, Index i, const AlphaT& alpha) {
const Index k0 = outer[i];
const Index end = innerNnz ? (k0 + innerNnz[i]) : outer[i + 1];
Scalar s0(0), s1(0);
Index k = k0;
const conj_if<Conjugate> cj{};
// Loop structure mirrors the existing kernel in SparseDenseProduct.h so
// the compiler vectorizes identically.
for (; k < end; ++k) {
s0 += cj(vals[k]) * x[inner[k]];
++k;
if (k < end) s1 += cj(vals[k]) * x[inner[k]];
}
const Scalar s = s0 + s1;
EIGEN_IF_CONSTEXPR (Overwrite) {
y[i] = alpha * s;
} else {
y[i] += alpha * s;
}
}
// Dot-product-per-outer kernel. Used by both forward and adjoint paths; the
// adjoint path sets Conjugate=true (no-op for real Scalar).
//
// Processes outer indices [lo, hi). For each outer i:
// if Overwrite: y[i] = alpha * sum_k op(val[k]) * x[inner[k]]
// else: y[i] += alpha * sum_k op(val[k]) * x[inner[k]]
// where op(.) is conj(.) iff Conjugate.
//
// Writes are independent across threads (each thread owns a contiguous output
// range), so no synchronization is required.
template <bool Conjugate, bool Overwrite, typename Scalar, typename StorageIndex, typename XScalar, typename YScalar,
typename AlphaT>
EIGEN_STRONG_INLINE void run_dot_chunk(const Scalar* EIGEN_RESTRICT vals, const StorageIndex* EIGEN_RESTRICT inner,
const StorageIndex* EIGEN_RESTRICT outer,
const StorageIndex* EIGEN_RESTRICT innerNnz, const XScalar* EIGEN_RESTRICT x,
YScalar* EIGEN_RESTRICT y, Index lo, Index hi, const AlphaT& alpha) {
for (Index i = lo; i < hi; ++i) {
run_dot_row<Conjugate, Overwrite, Scalar, StorageIndex>(vals, inner, outer, innerNnz, x, y, i, alpha);
}
}
} // namespace internal
/** \class ThreadedSparseProduct
* \ingroup SparseCore_Module
*
* \brief Cached, thread-parallel sparse matrix * dense vector product.
*
* Designed for iterative solvers (CG, BiCGSTAB, GMRES, LSCG) that multiply
* many times by the same sparse matrix and, in the case of LSCG, by its
* adjoint. The constructor (or analyzePattern()) computes an nnz-balanced
* row partition once and reuses it across every apply()/applyAdjoint() call.
*
* The forward direction (apply) always runs a dot-product-per-outer kernel,
* which is embarrassingly parallel (no write conflicts). For matrices whose
* native storage order doesn't match the forward direction (ColMajor for
* forward, or RowMajor for adjoint), a transposed mirror is built lazily on
* first use. The mirror doubles the memory footprint of the matrix but
* permits conflict-free parallel writes for both directions.
*
* The matrix is held by const reference (no copy). Caller is responsible for
* keeping it alive across apply calls, and for invalidating the cache when
* the matrix changes:
* - sparsity-pattern change: call analyzePattern()/compute() to rebuild
* the partition and drop the mirror.
* - coefficient-only change (same pattern): call refreshValues() to
* drop the (now-stale) mirror; the next mirror-using direction will
* rebuild it lazily. Without this call, mirror-backed directions
* would read frozen coefficients from the cached copy.
*
* Aliasing: apply()/applyAdjoint() do NOT insert temporaries on overlap
* between x and y; callers must use distinct storage (the kernel writes
* y[i] from a sum over x[inner[k]] reads, which would mis-compute if
* y aliases x). Asserted in debug builds.
*
* Thread-safety: the const apply()/applyAdjoint()/applyAddTo()/applyAdjointAddTo()
* methods are safe to call concurrently on the same operator -- the lazy mirror
* is published through an atomic with double-checked locking. The non-const
* methods (analyzePattern()/compute()/refreshValues()) and destruction are NOT;
* they mutate or free the cached mirror, so they must not overlap any in-flight
* apply() (the same rule as calling any method concurrently with the destructor).
*
* Layout: x and y are taken as Ref<const DenseVector> / Ref<DenseVector>.
* Plain vectors and unit-inner-stride views (e.g. a column of a ColMajor
* matrix, an unaligned Map of a contiguous buffer) bind without copying;
* a strided const input is copy-evaluated by Ref; a strided mutable output
* triggers a Ref runtime error.
*
* \tparam SparseMatrixType A compressed Eigen::SparseMatrix<Scalar, Order, StorageIndex>.
*/
template <typename SparseMatrixType_>
class ThreadedSparseProduct {
public:
typedef SparseMatrixType_ SparseMatrixType;
typedef typename SparseMatrixType::Scalar Scalar;
typedef typename SparseMatrixType::RealScalar RealScalar;
typedef typename SparseMatrixType::StorageIndex StorageIndex;
enum { IsRowMajor = static_cast<int>(SparseMatrixType::IsRowMajor) };
// Opposite-storage-order mirror used by the conflict-free path for whichever
// direction doesn't match A's native order.
typedef SparseMatrix<Scalar, IsRowMajor ? ColMajor : RowMajor, StorageIndex> MirrorType;
// The dot-product kernel walks x/y via raw `Scalar*` with unit inner
// stride; constraining the public API to `Ref` forces compatible layout
// (binding for plain matrices, vectors, and unit-stride Maps/Blocks;
// copy-evaluation for non-conforming const inputs; runtime error for
// non-conforming mutable outputs).
typedef Matrix<Scalar, Dynamic, 1> DenseVector;
typedef Ref<const DenseVector> ConstVectorRef;
typedef Ref<DenseVector> MutableVectorRef;
ThreadedSparseProduct() = default;
explicit ThreadedSparseProduct(const SparseMatrixType& mat, ThreadPool* pool = nullptr) : m_pool(pool) {
analyzePattern(mat);
}
~ThreadedSparseProduct() { delete m_mirror.load(std::memory_order_acquire); }
ThreadedSparseProduct(const ThreadedSparseProduct&) = delete;
ThreadedSparseProduct& operator=(const ThreadedSparseProduct&) = delete;
/** Bind to \a mat and (re)compute the partition. Drops any previously built
* adjoint mirror. */
ThreadedSparseProduct& analyzePattern(const SparseMatrixType& mat) {
eigen_assert(mat.isCompressed() && "ThreadedSparseProduct requires a compressed SparseMatrix");
m_mat = &mat;
delete m_mirror.exchange(nullptr, std::memory_order_acq_rel);
m_adj_partition.clear();
build_partition(mat.outerIndexPtr(), mat.outerSize(), mat.nonZeros(), m_native_partition);
return *this;
}
ThreadedSparseProduct& compute(const SparseMatrixType& mat) { return analyzePattern(mat); }
/** Drops the cached adjoint mirror without recomputing the partition. Use
* after coefficient-only updates to the bound matrix so the next
* applyAdjoint() (or forward apply on a ColMajor A) rebuilds the mirror
* with the new values. */
ThreadedSparseProduct& refreshValues() {
delete m_mirror.exchange(nullptr, std::memory_order_acq_rel);
return *this;
}
Index rows() const { return m_mat ? m_mat->rows() : Index(0); }
Index cols() const { return m_mat ? m_mat->cols() : Index(0); }
/// Overwriting forward apply: y = A * x.
void apply(const ConstVectorRef& x, MutableVectorRef y) const {
apply_impl<false, /*Overwrite=*/true>(x, y, Scalar(1));
}
/// Overwriting adjoint apply: y = A^H * x.
void applyAdjoint(const ConstVectorRef& x, MutableVectorRef y) const {
apply_impl<true, /*Overwrite=*/true>(x, y, Scalar(1));
}
/// Accumulating forward apply: y += alpha * A * x.
void applyAddTo(const ConstVectorRef& x, MutableVectorRef y, const Scalar& alpha) const {
apply_impl<false, /*Overwrite=*/false>(x, y, alpha);
}
/// Accumulating adjoint apply: y += alpha * A^H * x.
void applyAdjointAddTo(const ConstVectorRef& x, MutableVectorRef y, const Scalar& alpha) const {
apply_impl<true, /*Overwrite=*/false>(x, y, alpha);
}
/// Returns the thread pool used by this operator.
ThreadPool* pool() const { return m_pool ? m_pool : &internal::default_threaded_sparse_pool(); }
/// True iff the lazy adjoint mirror has been materialized.
bool hasMirror() const { return m_mirror.load(std::memory_order_acquire) != nullptr; }
private:
template <bool Adjoint, bool Overwrite>
void apply_impl(const ConstVectorRef& x, MutableVectorRef& y, const Scalar& alpha) const {
eigen_assert(m_mat && "ThreadedSparseProduct: matrix not set; call analyzePattern() first");
if (Adjoint) {
eigen_assert(x.size() == m_mat->rows());
eigen_assert(y.size() == m_mat->cols());
} else {
eigen_assert(x.size() == m_mat->cols());
eigen_assert(y.size() == m_mat->rows());
}
// The kernel reads x and writes y concurrently across threads; aliasing
// (x and y overlap) would mis-compute because some x[k] reads see y
// writes from the same SpMV call. Cheap address-range check; cast to
// uintptr_t because comparing pointers from unrelated allocations is
// technically UB in C++.
eigen_assert((x.size() == 0 || y.size() == 0 || std::uintptr_t(x.data() + x.size()) <= std::uintptr_t(y.data()) ||
std::uintptr_t(y.data() + y.size()) <= std::uintptr_t(x.data())) &&
"ThreadedSparseProduct: x and y must not overlap");
// Decide which storage to read from.
// Forward kernel iterates a RowMajor view of A; adjoint kernel iterates
// a ColMajor view of A. If A already has the required order, read it
// directly; otherwise build/use the mirror.
constexpr bool NeedRowMajorView = !Adjoint;
constexpr bool NativeIsRowMajor = IsRowMajor;
constexpr bool UseMirror = NeedRowMajorView != NativeIsRowMajor;
if (UseMirror) {
const MirrorType& m = ensure_mirror();
run<Adjoint, Overwrite>(m.valuePtr(), m.innerIndexPtr(), m.outerIndexPtr(), m.innerNonZeroPtr(),
/*outerSize=*/m.outerSize(), m_adj_partition, x, y, alpha);
} else {
run<Adjoint, Overwrite>(m_mat->valuePtr(), m_mat->innerIndexPtr(), m_mat->outerIndexPtr(),
m_mat->innerNonZeroPtr(),
/*outerSize=*/m_mat->outerSize(), m_native_partition, x, y, alpha);
}
}
// Serial-fallback threshold; matches the same constant in SparseDenseProduct.h.
static constexpr Index kThreadingThreshold = 20000;
template <bool Conjugate, bool Overwrite>
void run(const Scalar* vals, const StorageIndex* inner, const StorageIndex* outer, const StorageIndex* innerNnz,
Index outerSize, const std::vector<Index>& part, const ConstVectorRef& x, MutableVectorRef& y,
const Scalar& alpha) const {
// OpenMP path doesn't use the cached partition (dynamic scheduling balances
// itself), so derive T fresh from the current Eigen::nbThreads() /
// OMP_NUM_THREADS at apply time -- otherwise `setNbThreads()` after
// analyzePattern() would be silently ignored. The ThreadPool path is
// bound to the partition built for a specific T at analyzePattern() time.
#ifdef EIGEN_HAS_OPENMP
const int T = target_thread_count();
#else
const int T = static_cast<int>(part.size()) - 1;
#endif
const Index total_nnz = m_mat->nonZeros();
// Ref construction already enforced unit inner stride for x/y.
const Scalar* xp = x.data();
Scalar* yp = y.data();
if (T <= 1 || total_nnz < kThreadingThreshold) {
internal::run_dot_chunk<Conjugate, Overwrite, Scalar, StorageIndex>(vals, inner, outer, innerNnz, xp, yp, 0,
outerSize, alpha);
return;
}
#ifdef EIGEN_HAS_OPENMP
// Prefer OpenMP for dispatch when available. Use dynamic scheduling
// over rows with chunks sized so the OMP runtime gets ~4*T chunks to
// distribute -- enough granularity to absorb residual nnz imbalance
// without blowing dispatch overhead.
const Index chunk = numext::maxi<Index>(Index(1), (outerSize + Index(T) * 4 - 1) / (Index(T) * 4));
#pragma omp parallel for schedule(dynamic, chunk) num_threads(T)
for (Index i = 0; i < outerSize; ++i) {
internal::run_dot_row<Conjugate, Overwrite, Scalar, StorageIndex>(vals, inner, outer, innerNnz, xp, yp, i, alpha);
}
#else
// ThreadPool path: enqueue T-1 worker tasks, run partition 0 on this
// thread, then wait on the barrier. Avoids ForkJoin's log(T)-hop critical
// path before the last leaf starts.
Barrier barrier(static_cast<unsigned>(T));
ThreadPool* p = pool();
for (int t = 1; t < T; ++t) {
const Index lo = part[t], hi = part[t + 1];
if (lo == hi) {
barrier.Notify();
continue;
}
p->Schedule([=, &barrier]() {
internal::run_dot_chunk<Conjugate, Overwrite, Scalar, StorageIndex>(vals, inner, outer, innerNnz, xp, yp, lo,
hi, alpha);
barrier.Notify();
});
}
internal::run_dot_chunk<Conjugate, Overwrite, Scalar, StorageIndex>(vals, inner, outer, innerNnz, xp, yp, part[0],
part[1], alpha);
barrier.Notify();
barrier.Wait();
#endif
}
// Lazy mirror construction. Double-checked atomic init avoids per-call
// std::call_once cost (which historically varies across libstdc++ versions).
const MirrorType& ensure_mirror() const {
MirrorType* m = m_mirror.load(std::memory_order_acquire);
if (m) return *m;
std::lock_guard<std::mutex> lock(m_mirror_init_mu);
m = m_mirror.load(std::memory_order_relaxed);
if (!m) {
// Same logical matrix, opposite storage order. Eigen detects the
// storage-order mismatch in assignment and reorganizes the data
// (a "structural transpose"); the logical matrix is preserved, no
// conjugation. The kernel applies conj at use when needed.
// Hold the mirror in a unique_ptr while building: the assignment,
// makeCompressed(), and build_partition() can all throw (bad_alloc), and a
// raw owning pointer would leak. Hand ownership to the atomic only once the
// mirror is fully built; on throw the buffer is freed and m_mirror stays
// null so a later call rebuilds cleanly.
std::unique_ptr<MirrorType> built(new MirrorType(m_mat->rows(), m_mat->cols()));
*built = *m_mat;
built->makeCompressed();
build_partition(built->outerIndexPtr(), built->outerSize(), built->nonZeros(), m_adj_partition);
m = built.get();
m_mirror.store(built.release(), std::memory_order_release);
}
return *m;
}
// Thread count target. Under OpenMP, respect Eigen::setNbThreads() /
// OMP_NUM_THREADS instead of the lazy default ThreadPool's size (which
// also avoids constructing that pool when OMP does the dispatch).
int target_thread_count() const {
#ifdef EIGEN_HAS_OPENMP
return numext::maxi(1, Eigen::nbThreads());
#else
return pool()->NumThreads();
#endif
}
// nnz-balanced partition of A's outer dim into T contiguous chunks, with
// a guard against pathological skew: if more than half the partitions end
// up empty (one hub outer holding most of the nnz), fall back to a single
// serial chunk so the inner kernel doesn't pay parallel dispatch for no
// parallel work.
void build_partition(const StorageIndex* outer, Index outerSize, Index nnz, std::vector<Index>& part) const {
const int T = target_thread_count();
internal::compute_nnz_balanced_partition(outer, outerSize, nnz, T, part);
int non_empty = 0;
for (std::size_t t = 0; t + 1 < part.size(); ++t)
if (part[t + 1] > part[t]) ++non_empty;
if (non_empty * 2 < T) {
part.assign(2, 0);
part[1] = outerSize;
}
}
private:
const SparseMatrixType* m_mat = nullptr;
ThreadPool* m_pool = nullptr;
// Partition of A's native outer dim by nnz balance. Used by the direction
// whose kernel matches A's storage order.
std::vector<Index> m_native_partition;
// Lazy adjoint-direction mirror in the opposite storage order, plus its
// own nnz-balanced partition. Constructed on first use.
mutable std::atomic<MirrorType*> m_mirror{nullptr};
mutable std::mutex m_mirror_init_mu;
mutable std::vector<Index> m_adj_partition;
};
} // namespace Eigen
#endif // EIGEN_THREADED_SPARSE_PRODUCT_H