Skip to content

Commit

Permalink
Implement interface for set! and use it to set! distributed field…
Browse files Browse the repository at this point in the history
…s better (#3817)

* Implement an interface for set!

* Cleanup

* Move towards better partition syntax

* Clean up distributed stuff

* Generalize set! to encompass distributed architectures

* Define child_architecture for AbstractField

* Clean up scatter_local_grids (but more is needed)

* Update tests

* Do nothing on serial architecture
  • Loading branch information
glwagner authored Oct 5, 2024
1 parent 0f60a2a commit 9c6ed92
Show file tree
Hide file tree
Showing 17 changed files with 256 additions and 213 deletions.
2 changes: 1 addition & 1 deletion src/Architectures.jl
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ architecture(a::OffsetArray) = architecture(parent(a))
Return `arch`itecture of child processes.
On single-process, non-distributed systems, return `arch`.
"""
child_architecture(arch) = arch
child_architecture(arch::AbstractSerialArchitecture) = arch

array_type(::CPU) = Array
array_type(::GPU) = CuArray
Expand Down
2 changes: 1 addition & 1 deletion src/DistributedComputations/DistributedComputations.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module DistributedComputations

export
Distributed, Partition, Equal, Fractional,
child_architecture, reconstruct_global_grid,
child_architecture, reconstruct_global_grid, partition,
inject_halo_communication_boundary_conditions,
DistributedFFTBasedPoissonSolver

Expand Down
66 changes: 32 additions & 34 deletions src/DistributedComputations/distributed_fields.jl
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
import Oceananigans.Fields: Field, FieldBoundaryBuffers, location, set!
import Oceananigans.BoundaryConditions: fill_halo_regions!

using CUDA: CuArray
using OffsetArrays: OffsetArray
using Oceananigans.Grids: topology
using Oceananigans.Fields: validate_field_data, indices, validate_boundary_conditions, validate_indices, recv_from_buffers!
using Oceananigans.Fields: validate_field_data, indices, validate_boundary_conditions
using Oceananigans.Fields: validate_indices, recv_from_buffers!, set_to_array!, set_to_field!

import Oceananigans.Fields: Field, FieldBoundaryBuffers, location, set!
import Oceananigans.BoundaryConditions: fill_halo_regions!

function Field((LX, LY, LZ)::Tuple, grid::DistributedGrid, data, old_bcs, indices::Tuple, op, status)
arch = architecture(grid)
indices = validate_indices(indices, (LX, LY, LZ), grid)
validate_field_data((LX, LY, LZ), data, grid, indices)
validate_boundary_conditions((LX, LY, LZ), grid, old_bcs)
new_bcs = inject_halo_communication_boundary_conditions(old_bcs, arch.local_rank, arch.connectivity, topology(grid))

arch = architecture(grid)
rank = arch.local_rank
new_bcs = inject_halo_communication_boundary_conditions(old_bcs, rank, arch.connectivity, topology(grid))
buffers = FieldBoundaryBuffers(grid, data, new_bcs)

return Field{LX, LY, LZ}(grid, data, new_bcs, indices, op, status, buffers)
Expand All @@ -19,42 +23,36 @@ end
const DistributedField = Field{<:Any, <:Any, <:Any, <:Any, <:DistributedGrid}
const DistributedFieldTuple = NamedTuple{S, <:NTuple{N, DistributedField}} where {S, N}

function set!(u::DistributedField, f::Function)
arch = architecture(u)
if child_architecture(arch) isa GPU
cpu_grid = on_architecture(cpu_architecture(arch), u.grid)
u_cpu = Field(location(u), cpu_grid, eltype(u); indices = indices(u))
f_field = field(location(u), f, cpu_grid)
set!(u_cpu, f_field)
set!(u, u_cpu)
elseif child_architecture(arch) isa CPU
f_field = field(location(u), f, u.grid)
set!(u, f_field)
global_size(f::DistributedField) = global_size(architecture(f), size(f))

# Automatically partition under the hood if sizes are compatible
function set!(u::DistributedField, V::Union{Array, CuArray, OffsetArray})
NV = size(V)
Nu = global_size(u)

# Suppress singleton indices
NV′ = filter(n -> n > 1, NV)
Nu′ = filter(n -> n > 1, Nu)

if NV′ == Nu′
v = partition(V, u)
else
v = V
end

return u
return set_to_array!(u, v)
end

# Automatically partition under the hood if sizes are compatible
function set!(u::DistributedField, v::Union{Array, CuArray})
gsize = global_size(architecture(u), size(u))

if size(v) == gsize
f = partition_global_array(architecture(u), v, size(u))
u .= f
return u
function set!(u::DistributedField, V::Field)
if size(V) == global_size(u)
v = partition(V, u)
return set_to_array!(u, v)
else
try
f = on_architecture(architecture(u), v)
u .= f
return u

catch
throw(ArgumentError("ERROR: DimensionMismatch: array could not be set to match destination field"))
end
return set_to_field!(u, V)
end
end


"""
synchronize_communication!(field)
Expand Down
12 changes: 6 additions & 6 deletions src/DistributedComputations/distributed_grids.jl
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ function reconstruct_global_grid(grid::DistributedRectilinearGrid)

nx, ny, nz = n = size(grid)
Hx, Hy, Hz = H = halo_size(grid)
Nx, Ny, Nz = map(sum, concatenate_local_sizes(n, arch))
Nx, Ny, Nz = global_size(arch, n)

TX, TY, TZ = topology(grid)

Expand Down Expand Up @@ -219,7 +219,7 @@ function reconstruct_global_grid(grid::DistributedLatitudeLongitudeGrid)

nλ, nφ, nz = n = size(grid)
Hλ, Hφ, Hz = H = halo_size(grid)
Nλ, Nφ, Nz = map(sum, concatenate_local_sizes(n, arch))
Nλ, Nφ, Nz = global_size(arch, n)

TX, TY, TZ = topology(grid)

Expand Down Expand Up @@ -266,12 +266,12 @@ end
# take precedence on `DistributedGrid`
function with_halo(new_halo, grid::DistributedRectilinearGrid)
new_grid = with_halo(new_halo, reconstruct_global_grid(grid))
return scatter_local_grids(architecture(grid), new_grid, size(grid))
return scatter_local_grids(new_grid, architecture(grid), size(grid))
end

function with_halo(new_halo, grid::DistributedLatitudeLongitudeGrid)
new_grid = with_halo(new_halo, reconstruct_global_grid(grid))
return scatter_local_grids(architecture(grid), new_grid, size(grid))
return scatter_local_grids(new_grid, architecture(grid), size(grid))
end

# Extending child_architecture for grids
Expand All @@ -295,13 +295,13 @@ function scatter_grid_properties(global_grid)
return x, y, z, topo, halo
end

function scatter_local_grids(arch::Distributed, global_grid::RectilinearGrid, local_size)
function scatter_local_grids(global_grid::RectilinearGrid, arch::Distributed, local_size)
x, y, z, topo, halo = scatter_grid_properties(global_grid)
global_sz = global_size(arch, local_size)
return RectilinearGrid(arch, eltype(global_grid); size=global_sz, x=x, y=y, z=z, halo=halo, topology=topo)
end

function scatter_local_grids(arch::Distributed, global_grid::LatitudeLongitudeGrid, local_size)
function scatter_local_grids(global_grid::LatitudeLongitudeGrid, arch::Distributed, local_size)
x, y, z, topo, halo = scatter_grid_properties(global_grid)
global_sz = global_size(arch, local_size)
return LatitudeLongitudeGrid(arch, eltype(global_grid); size=global_sz, longitude=x,
Expand Down
149 changes: 87 additions & 62 deletions src/DistributedComputations/partition_assemble.jl
Original file line number Diff line number Diff line change
@@ -1,61 +1,66 @@
import Oceananigans.Architectures: on_architecture
using Oceananigans.Fields: Field

all_reduce(op, val, arch::Distributed) =
MPI.Allreduce(val, op, arch.communicator)
import Oceananigans.Architectures: on_architecture

all_reduce(op, val, arch::Distributed) = MPI.Allreduce(val, op, arch.communicator)
all_reduce(op, val, arch) = val

# MPI Barrier
barrier!(arch) = nothing
barrier!(arch::Distributed) = MPI.Barrier(arch.communicator)

"""
concatenate_local_sizes(n, arch::Distributed)
concatenate_local_sizes(local_size, arch::Distributed)
Return a 3-Tuple containing a vector of `size(grid, idx)` for each rank in
Return a 3-Tuple containing a vector of `size(grid, dim)` for each rank in
all 3 directions.
"""
concatenate_local_sizes(n, arch::Distributed) =
Tuple(concatenate_local_sizes(n, arch, i) for i in 1:length(n))
concatenate_local_sizes(local_size, arch::Distributed) =
Tuple(concatenate_local_sizes(local_size, arch, d) for d in 1:length(local_size))

concatenate_local_sizes(sz, arch, dim) = concatenate_local_sizes(sz[dim], arch, dim)

function concatenate_local_sizes(n, arch::Distributed, idx)
R = arch.ranks[idx]
r = arch.local_index[idx]
n = n isa Number ? n : n[idx]
l = zeros(Int, R)
function concatenate_local_sizes(n::Number, arch::Distributed, dim)
R = arch.ranks[dim]
r = arch.local_index[dim]
N = zeros(Int, R)

r1, r2 = arch.local_index[[1, 2, 3] .!= idx]
r1, r2 = arch.local_index[[1, 2, 3] .!= dim]

if r1 == 1 && r2 == 1
l[r] = n
N[r] = n
end

MPI.Allreduce!(l, +, arch.communicator)
MPI.Allreduce!(N, +, arch.communicator)

return l
return N
end

# Partitioning (localization of global objects) and assembly (global assembly of local objects)
# Used for grid constructors (cpu_face_constructor_x, cpu_face_constructor_y, cpu_face_constructor_z)
# We need to repeat the value at the right boundary
function partition_coordinate(c::AbstractVector, n, arch, idx)
nl = concatenate_local_sizes(n, arch, idx)
r = arch.local_index[idx]
"""
partition_coordinate(coordinate, n, arch, dim)
Return the local component of the global `coordinate`, which has
local length `n` and is distributed on `arch`itecture
in the x-, y-, or z- `dim`ension.
"""
function partition_coordinate(c::AbstractVector, n, arch, dim)
nl = concatenate_local_sizes(n, arch, dim)
r = arch.local_index[dim]

start_idx = sum(nl[1:r-1]) + 1 # sum of all previous rank's dimension + 1
end_idx = if r == ranks(arch)[idx]
end_idx = if r == ranks(arch)[dim]
length(c)
else
sum(nl[1:r]) + 1
end

return c[start_idx : end_idx]
return c[start_idx:end_idx]
end

function partition_coordinate(c::Tuple, n, arch, idx)
nl = concatenate_local_sizes(n, arch, idx)
function partition_coordinate(c::Tuple, n, arch, dim)
nl = concatenate_local_sizes(n, arch, dim)
N = sum(nl)
R = arch.ranks[idx]
R = arch.ranks[dim]
Δl = (c[2] - c[1]) / N

l = Tuple{Float64, Float64}[(c[1], c[1] + Δl * nl[1])]
Expand All @@ -64,7 +69,7 @@ function partition_coordinate(c::Tuple, n, arch, idx)
push!(l, (lp, lp + Δl * nl[i]))
end

return l[arch.local_index[idx]]
return l[arch.local_index[dim]]
end

"""
Expand All @@ -75,11 +80,11 @@ a local number of elements `Nc`, number of ranks `Nr`, rank `r`,
and `arch`itecture. Since we use a global reduction, only ranks at positions
1 in the other two directions `r1 == 1` and `r2 == 1` fill the 1D array.
"""
function assemble_coordinate(c_local::AbstractVector, n, arch, idx)
nl = concatenate_local_sizes(n, arch, idx)
R = arch.ranks[idx]
r = arch.local_index[idx]
r2 = [arch.local_index[i] for i in filter(x -> x != idx, (1, 2, 3))]
function assemble_coordinate(c_local::AbstractVector, n, arch, dim)
nl = concatenate_local_sizes(n, arch, dim)
R = arch.ranks[dim]
r = arch.local_index[dim]
r2 = [arch.local_index[i] for i in filter(x -> x != dim, (1, 2, 3))]

c_global = zeros(eltype(c_local), sum(nl)+1)

Expand All @@ -94,13 +99,13 @@ function assemble_coordinate(c_local::AbstractVector, n, arch, idx)
end

# Simple case, just take the first and the last core
function assemble_coordinate(c_local::Tuple, n, arch, idx)
function assemble_coordinate(c_local::Tuple, n, arch, dim)
c_global = zeros(Float64, 2)

rank = arch.local_index
R = arch.ranks[idx]
r = rank[idx]
r2 = [rank[i] for i in filter(x -> x != idx, (1, 2, 3))]
R = arch.ranks[dim]
r = rank[dim]
r2 = [rank[i] for i in filter(x -> x != dim, (1, 2, 3))]

if rank[1] == 1 && rank[2] == 1 && rank[3] == 1
c_global[1] = c_local[1]
Expand All @@ -113,42 +118,62 @@ function assemble_coordinate(c_local::Tuple, n, arch, idx)
return tuple(c_global...)
end

# TODO: partition_global_array and construct_global_array
# do not currently work for 3D parallelizations
# (They are not used anywhere in the code at the moment exept for immersed boundaries)
# TODO: make partition and construct_global_array work for 3D distribution.

"""
partition_global_array(arch, c_global, (nx, ny, nz))
partition(A, b)
Partition the globally-sized `A` into local arrays with the same size as `b`.
"""
partition(A, b::Field) = partition(A, architecture(b), size(b))
partition(F::Field, b::Field) = partition(interior(F), b)
partition(f::Function, arch, n) = f
partition(A::AbstractArray, arch::AbstractSerialArchitecture, local_size) = A

Partition a global array in local arrays of size `(nx, ny)` if 2D or `(nx, ny, nz)` is 3D.
Usefull for boundary arrays, forcings and initial conditions.
"""
partition_global_array(arch, c_global::AbstractArray, n) = c_global
partition_global_array(arch, c_global::Function, n) = c_global
partition(A, arch, local_size)
# Here we assume that we cannot partition in z (we should remove support for that)
function partition_global_array(arch::Distributed, c_global::AbstractArray, n)
c_global = on_architecture(CPU(), c_global)
Partition the globally-sized `A` into local arrays with `local_size` on `arch`itecture.
"""
function partition(A::AbstractArray, arch::Distributed, local_size)
A = on_architecture(CPU(), A)

ri, rj, rk = arch.local_index
dims = length(size(A))

dims = length(size(c_global))
nx, ny, nz = concatenate_local_sizes(n, arch)
# Vectors with the local size for every rank
nxs, nys, nzs = concatenate_local_sizes(local_size, arch)

nz = nz[1]
# The local size
nx = nxs[ri]
ny = nys[rj]
nz = nzs[1]
# @assert (nx, ny, nz) == local_size

if dims == 2
c_local = zeros(eltype(c_global), nx[ri], ny[rj])
up_to = nxs[1:ri-1]
including = nxs[1:ri]
i₁ = sum(up_to) + 1
i₂ = sum(including)

c_local .= c_global[1 + sum(nx[1:ri-1]) : sum(nx[1:ri]),
1 + sum(ny[1:rj-1]) : sum(ny[1:rj])]
else
c_local = zeros(eltype(c_global), nx[ri], ny[rj], nz)
up_to = nys[1:rj-1]
including = nys[1:rj]
j₁ = sum(up_to) + 1
j₂ = sum(including)

c_local .= c_global[1 + sum(nx[1:ri-1]) : sum(nx[1:ri]),
1 + sum(ny[1:rj-1]) : sum(ny[1:rj]),
1:nz]
ii = UnitRange(i₁, i₂)
jj = UnitRange(j₁, j₂)
kk = 1:nz # no partitioning in z

# TODO: undo this toxic assumption that all 2D arrays span x, y.
if dims == 2
a = zeros(eltype(A), nx, ny)
a .= A[ii, jj]
else
a = zeros(eltype(A), nx, ny, nz)
a .= A[ii, jj, 1:nz]
end
return on_architecture(child_architecture(arch), c_local)

return on_architecture(child_architecture(arch), a)
end

"""
Expand All @@ -160,7 +185,7 @@ Usefull for boundary arrays, forcings and initial conditions.
construct_global_array(arch, c_local::AbstractArray, n) = c_local
construct_global_array(arch, c_local::Function, N) = c_local

# TODO: This does not work for 3D parallelizations!!!
# TODO: This does not work for 3D parallelizations
function construct_global_array(arch::Distributed, c_local::AbstractArray, n)
c_local = on_architecture(CPU(), c_local)

Expand Down
Loading

0 comments on commit 9c6ed92

Please sign in to comment.