Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ext/NavierStokes/NavierStokes.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ using Adapt: adapt
include("callback.jl")
include("utils.jl")
include("io.jl")
include("create_data.jl")

end
31 changes: 22 additions & 9 deletions ext/NavierStokes/callback.jl
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,23 @@ function create_callback(
# Initialize the callback state
# To store data coming from CUDA device, we have to serialize them to CPU
callbackstate = (;
θmin = θ, loss_min = eltype(θ)(Inf), lhist_val = [],
θmin = θ, maximprove = eltype(θ)(0), loss_min = eltype(θ)(Inf), lhist_val = [],
lhist_train = [], lhist_nomodel = [])
end
if nunroll === nothing && batch_size === nothing
error("Either nunroll or batch_size must be provided")
elseif nunroll !== nothing
@info "Creating a posteriori callback"
dataloader = create_dataloader_posteriori(val_io_data; nunroll = nunroll, rng)
dataloader = create_dataloader_posteriori(
val_io_data; nunroll = nunroll, nsamples = 1, device = device, rng)
else
@info "Creating a priori callback"
dataloader = create_dataloader_prior(val_io_data; batchsize = batch_size, rng)
dataloader = create_dataloader_prior(
val_io_data; batchsize = batch_size, device = device, rng)
end
# Take data only once
data = dataloader()
no_model_loss = loss_function(model, device(callbackstate.θmin .* 0), st, data)[1]

function rolling_average(y)
return [mean(@view y[max(1, i - average_window):i]) for i in 1:length(y)]
Expand All @@ -78,14 +83,22 @@ function create_callback(
push!(callbackstate.lhist_train, l_train |> cpu_device())

if step % plot_every == 0
y1, y2 = device(dataloader())
l_val = loss_function(model, p, st, (y1, y2))[1]
# check if this set of p produces a lower validation loss
l_val < callbackstate.loss_min &&
(callbackstate = (; callbackstate..., θmin = p, loss_min = l_val))

#data = dataloader(
l_val = loss_function(model, p, st, data)[1]
@info "[$(step)] Validation Loss: $(l_val)"
no_model_loss = loss_function(model, callbackstate.θmin .* 0, st, (y1, y2))[1]
#no_model_loss = loss_function(model, callbackstate.θmin .* 0, st, data)[1]
@info "[$(step)] Validation Loss (no model): $(no_model_loss)"
if l_val < callbackstate.loss_min
callbackstate = (callbackstate..., θmin = p, loss_min = l_val)
end

## check the percentage of improvement
#pimprove = abs(l_val - no_model_loss)/no_model_loss
#@info "[$(step)] Validation Loss improvement: $(pimprove)"
#if pimprove > callbackstate.maximprove
# (callbackstate = (; callbackstate..., θmin = p, maximprove = pimprove))
#end

push!(callbackstate.lhist_val, l_val |> cpu_device())
push!(callbackstate.lhist_nomodel, no_model_loss |> cpu_device())
Expand Down
111 changes: 111 additions & 0 deletions ext/NavierStokes/create_data.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
using DifferentialEquations
using IncompressibleNavierStokes: right_hand_side!, apply_bc_u!, momentum!, project!

function create_les_data_projected(;
D,
Re,
lims,
nles,
ndns,
filters,
tburn,
tsim,
savefreq,
Δt = nothing,
method = RKMethods.RK44(; T = typeof(Re)),
create_psolver = default_psolver,
backend = IncompressibleNavierStokes.CPU(),
icfunc = (setup, psolver, rng) -> random_field(setup, typeof(Re)(0); psolver, rng),
processors = (; log = timelogger(; nupdate = 10)),
rng,
filenames = nothing,
kwargs...
)
@assert length(nles) == 1 "We only support one les at a time"
@assert length(filters) == 1 "We only support one filter at a time"

T = typeof(Re)

compression = div.(ndns, nles)
@assert all(==(ndns), compression .* nles)

# Build setup and assemble operators
dns = Setup(; x = ntuple(α -> LinRange(lims..., ndns + 1), D), Re, backend, kwargs...)
les = map(
nles -> Setup(;
x = ntuple(α -> LinRange(lims..., nles + 1), D),
Re,
backend,
kwargs...
),
nles
)

# Since the grid is uniform and identical for x and y, we may use a specialized
# spectral pressure solver
psolver = create_psolver(dns)
psolver_les = create_psolver.(les)

# Initial conditions
ustart = icfunc(dns, psolver, rng)

any(u -> any(isnan, u), ustart) && @warn "Initial conditions contain NaNs"

_dns = dns
_les = les

# Solve burn-in DNS using INS
(; u, t),
outputs = solve_unsteady(; setup = _dns, ustart, tlims = (T(0), tburn), Δt, psolver)
@info "Burn-in DNS simulation finished"

# Define the callback function for the filter
Φ = filters[1]
compression = compression[1]
les = les[1]
psolver_les = psolver_les[1]
tsave = collect(T(0):(savefreq * Δt):tsim)
function condition(u, t, integrator)
t in tsave && return true
return false
end
all_ules = []
all_c = []
all_t = []
Fdns = create_right_hand_side(dns, psolver)
function filter_callback(integrator)
u = integrator.u
t = integrator.t
F = Fdns(u, nothing, t) #TODO check if we can avoid recomputing this
p = scalarfield(les)
Φu = vectorfield(les)
FΦ = vectorfield(les)
ΦF = vectorfield(les)
c = vectorfield(les)
temp = nothing

Φ(Φu, u, les, compression)
apply_bc_u!(Φu, t, les)
Φ(ΦF, F, les, compression)
momentum!(FΦ, Φu, temp, t, les)
apply_bc_u!(FΦ, t, les; dudt = true)
project!(FΦ, les; psolver = psolver_les, p = p)
@. c = ΦF - FΦ
push!(all_ules, Array(Φu))
push!(all_c, Array(c))
push!(all_t, T(t))
end
cb = DiscreteCallback(condition, filter_callback)

# Now use SciML to solve the DNS
rhs!(du, u, p, t) = right_hand_side!(du, u, Ref([dns, psolver]), t)
tspan = (T(0), tsim)
prob = ODEProblem(rhs!, u, tspan, nothing)
dns_solution = solve(
prob, Tsit5(); u0 = u, p = nothing,
adaptive = true, saveat = tsim, callback = cb, tspan = tspan, tstops = tsave)

@info "DNS simulation finished"

(; u = all_ules, c = all_c, t = all_t)
end
88 changes: 71 additions & 17 deletions ext/NavierStokes/utils.jl
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,12 @@ end
In place version of [`create_right_hand_side`](@ref).
"""
function create_right_hand_side_inplace(setup, psolver)
@error "Deprecated: look at the following link: https://agdestein.github.io/IncompressibleNavierStokes.jl/dev/manual/sciml "
(; x, N, dimension) = setup.grid
D = dimension()
F = similar(u)
div = similar(x[1], N)
p = similar(x[1], N)
F = similar(x[1], (N..., D))
function right_hand_side!(dudt, u, params, t)
INS.apply_bc_u!(u, t, setup)
INS.momentum!(F, u, nothing, t, setup)
Expand All @@ -74,7 +75,7 @@ Create ``(\\bar{u}, c)`` pairs for training.
# Returns
A named tuple with fields `u` and `c`. (without boundary conditions padding)
"""
function create_io_arrays_priori(data, setups)
function INS_create_io_arrays_priori(data, setups)
# This is a reference function that creates the io_arrays for the a-priori
nsample = length(data)
ngrid, nfilter = size(data[1])
Expand All @@ -100,6 +101,30 @@ function create_io_arrays_priori(data, setups)
(; u = reshape(u, (N .- 2)..., D, :), c = reshape(c, (N .- 2)..., D, :))
end
end
function create_io_arrays_priori(data, setup, device = identity)
# This is a reference function that creates the io_arrays for the a-priori
nsample = length(data)
nt = length(data[1].t) - 1
T = eltype(data[1].t[1])
(; dimension, N, Iu) = setup.grid
D = dimension()
u = zeros(T, (N .- 2)..., D, nt + 1, nsample)
c = zeros(T, (N .- 2)..., D, nt + 1, nsample)
ifield = ntuple(Returns(:), D)
for is in 1:nsample
for it in 1:nt
copyto!(
view(u, (ifield...), :, it, is),
data[is].u[it][Iu[1], :]
)
copyto!(
view(c, (ifield...), :, it, is),
data[is].c[it][Iu[1], :]
)
end
end
(; u = device(reshape(u, (N .- 2)..., D, :)), c = device(reshape(c, (N .- 2)..., D, :)))
end

"""
create_io_arrays_posteriori(data, setups)
Expand All @@ -113,7 +138,30 @@ Main differences between this function and NeuralClosure.create_io_arrays
A named tuple with fields `u` and `t`.
`u` is a matrix without padding and shape (nless..., D, sample, t)
"""
function create_io_arrays_posteriori(data, setups, device = identity)
function create_io_arrays_posteriori(data, setup, device = identity)
nsample = length(data)
nt = length(data[1].t) - 1
T = eltype(data[1].t[1])
(; dimension, N) = setup.grid
D = dimension()
u = zeros(T, N..., D, nsample, nt + 1)
t = zeros(T, nsample, nt + 1)
ifield = ntuple(Returns(:), D)
for is in 1:nsample
for it in 1:nt
copyto!(
view(u, (ifield...), :, is, it),
data[is].u[it][ifield..., :]
)
copyto!(
view(t, is, it),
data[is].t[it]
)
end
end
(; u = device(u), t = t)
end
function INS_create_io_arrays_posteriori(data, setups, device = identity)
nsample = length(data)
ngrid, nfilter = size(data[1])
nt = length(data[1][1].t) - 1
Expand Down Expand Up @@ -163,8 +211,8 @@ function create_dataloader_prior(io_array; batchsize = 50, device = identity, rn
nsample = size(x)[end]
d = ndims(x)
i = sort(shuffle(rng, 1:nsample)[1:batchsize])
xuse = device(Array(selectdim(x, d, i)))
yuse = device(Array(selectdim(y, d, i)))
xuse = device(selectdim(x, d, i))
yuse = device(selectdim(y, d, i))
xuse, yuse
end
end
Expand All @@ -190,25 +238,31 @@ Creates a dataloader function for a-posteriori fitting from the given `io_array`
- Have only tested in 2D cases.
- It assumes that the data are loaded in batches of size 1
"""
function create_dataloader_posteriori(io_array; nunroll = 10, device = identity, rng)
function create_dataloader_posteriori(
io_array; nunroll = 10, nsamples = 1, device = identity, rng)
function dataloader()
(n..., dim, _, _) = axes(io_array.u) # expects that the io_array will be for a i_grid
(n..., dim, _, _) = axes(io_array.u)
(_..., samples, nt) = size(io_array.u)

@assert nt ≥ nunroll
# select starting point for unrolling
@assert nsamples ≤ samples "Requested nsamples ($nsamples) exceeds available samples ($samples)"

# Select starting point for unrolling
istart = rand(rng, 1:(nt - nunroll))
it = istart:(istart + nunroll)
# select the sample
isample = rand(rng, 1:samples)
if device == identity
u = view(io_array.u, n..., dim, isample, it)
t = io_array.t[isample, it]
else
# TODO: check if this is the correct way to move the data to the device
u = device(collect(view(io_array.u, n..., dim, isample, it)))
t = device(io_array.t[isample, it])

# Select multiple samples
isamples = rand(rng, 1:samples, nsamples)

# Use views and batch data movement
u = view(io_array.u, n..., dim, isamples, it)
t = view(io_array.t, isamples, it)

if device != identity
u = device(copy(u))
t = device(copy(t))
end

(; u = u, t = t)
end
end
Empty file.
Empty file.
Empty file.
77 changes: 0 additions & 77 deletions simulations/NavierStokes_2D/scripts/evaluate_posteriori.jl

This file was deleted.

Loading
Loading