import os.path
from dolfin import *
from dolfin_adjoint import *
from .solver import Solver
from ..problems import SWProblem
from ..problems import SteadySWProblem
from ..problems import MultiSteadySWProblem
from ..helpers import StateWriter, FrozenClass
[docs]
class CoupledSWSolverParameters(FrozenClass):
""" A set of parameters for a :class:`CoupledSWSolver`.
Following parameters are available:
:ivar dolfin_solver: The dictionary with parameters for the dolfin
Newton solver. A list of valid entries can be printed with:
.. code-block:: python
info(NonlinearVariationalSolver.default_parameters(), True)
By default, the MUMPS direct solver is used for the linear system. If
not available, the default solver and preconditioner of FEniCS is used.
:ivar dump_period: Specifies how often the solution should be dumped to disk.
Use a negative value to disable it. Default 1.
:ivar cache_forward_state: If True, the shallow water solutions are stored
for every timestep and are used as initial guesses for the next solve.
If False, the solution of the previous timestep is used as an initial guess.
Default: True
:ivar print_individual_turbine_power: Print out the turbine power for each
turbine. Default: False
:ivar quadrature_degree: The quadrature degree for the matrix assembly.
Default: -1 (automatic)
:ivar cpp_flags: A list of cpp compiler options for the code generation.
Default: ["-O3", "-ffast-math", "-march=native"]
:ivar revolve_parameters: The adjoint checkpointing settings as a set of the
form (strategy, snaps_on_disk, snaps_in_ram, verbose). Default: None
:ivar output_dir: The base directory in which to store the file ouputs.
Default: `os.curdir`
:ivar output_turbine_power: Output the power generation of the individual
turbines. Default: False
:ivar output_j: Output the evaluation of the choosen functional (e.g power)
for each iteration and store it in a numpy textfile. (This is the same
j that is printed each iteration, when the FEniCS log level is INFO or
above.) Default: False
:ivar output_temporal_breakdown_of_j: Output the j at each timestep and at
iteration. The output is stored in a numpy textfile. (This is the same
temporal breakdown which is shown when the FEniCS log level is INFO or
above.) Default: False
:ivar output_abs_u_at_turbine_positions: Output the absolute value of the
velocity at each turbine position. Default: False
:ivar output_control_array: Output a numpy textfile containing the
control array from each optimisation and search iteration. Default: False
:ivar callback: A callback function that is executed for every time-level.
The callback function must take a single parameter which contains the
dictionary with the solution variables.
"""
dolfin_solver = {"newton_solver": {}}
dump_period = 1
print_individual_turbine_power = False
# If we're printing individual turbine information, the solver needs
# the helper functional instantiated in the reduced_functional which will live here
output_writer = None
# Output settings
output_dir = os.curdir
output_turbine_power = False
output_j = False
output_temporal_breakdown_of_j = False
output_control_array = False
output_abs_u_at_turbine_positions = False
# Performance settings
cache_forward_state = True
quadrature_degree = -1
cpp_flags = ["-O3", "-ffast-math", "-march=native"]
revolve_parameters = None # (strategy,
# snaps_on_disk,
# snaps_in_ram,
# verbose)
# Callback function
callback = lambda self, sol: None
def __init__(self):
linear_solver = 'mumps' if 'mumps' in linear_solver_methods() else 'default'
preconditioner = 'default'
self.dolfin_solver["newton_solver"]["linear_solver"] = linear_solver
self.dolfin_solver["newton_solver"]["preconditioner"] = preconditioner
self.dolfin_solver["newton_solver"]["maximum_iterations"] = 20
self.dolfin_solver["newton_solver"]["convergence_criterion"] = "incremental"
[docs]
class CoupledSWSolver(Solver):
r""" The coupled solver solves the shallow water equations as a fully coupled
system with a :math:`\theta`-timestepping discretization.
For a :class:`opentidalfarm.problems.steady_sw.SteadySWProblem`, it solves
:math:`u` and :math:`\eta` such that:
.. math:: u \cdot \nabla u - \nu \Delta u +
g \nabla \eta + \frac{c_b + c_t}{H} \| u\| u & = f_u, \\
\nabla \cdot \left(H u\right) & = 0.
For a :class:`opentidalfarm.problems.multi_steady_sw.MultiSteadySWProblem`,
it solves :math:`u^{n+1}` and :math:`\eta^{n+1}` for each timelevel
(including the initial time) such that:
.. math:: u^{n+1} \cdot \nabla u^{n+1} - \nu \Delta u^{n+1} +
g \nabla \eta^{n+1} + \frac{c_b + c_t}{H^{n+1}} \| u^{n+1}\| u^{n+1} &
= f_u^{n+1}, \\
\nabla \cdot \left(H^{n+1} u^{n+1}\right) & = 0.
For a :class:`opentidalfarm.problems.sw.SWProblem`, and given an initial
condition :math:`u^{0}` and :math:`\eta^{0}` it solves :math:`u^{n+1}` and
:math:`\eta^{n+1}` for each timelevel such that:
.. math:: \frac{1}{\Delta t} \left(u^{n+1} - u^{n}\right) + u^{n+\theta} \cdot \nabla u^{n+\theta} - \nu \Delta u^{n+\theta} +
g \nabla \eta^{n+\theta} + \frac{c_b + c_t}{H^{n+\theta}} \|
u^{n+\theta}\| u^{n+\theta} & = f_u^{n+\theta}, \\
\frac{1}{\Delta t} \left(\eta^{n+1} - \eta^{n}\right) + \nabla \cdot \left(H^{n+\theta} u^{n+\theta}\right) & = 0.
with :math:`\theta \in [0, 1]` and :math:`u^{n+\theta} := \theta u^{n+1} + (1-\theta) u^n`
and :math:`\eta^{n+\theta} := \theta \eta^{n+1} + (1-\theta) \eta^n`.
Furthermore:
- :math:`u` is the velocity,
- :math:`\eta` is the free-surface displacement,
- :math:`H=\eta + h` is the total water depth where :math:`h` is the
water depth at rest,
- :math:`f_u` is the velocity forcing term,
- :math:`c_b` is the (quadratic) natural bottom friction coefficient,
- :math:`c_t` is the (quadratic) friction coefficient due to the turbine
farm,
- :math:`\nu` is the viscosity coefficient,
- :math:`g` is the gravitational constant,
- :math:`\Delta t` is the time step.
Some terms, such as the advection or the diffusion term, may be deactivated
in the problem specification.
The resulting equations are solved with Newton-iterations and the linear
problems solved as specified in the `dolfin_solver` setting in
:class:`CoupledSWSolverParameters`.
"""
def __init__(self, problem, solver_params):
if not isinstance(problem, (SWProblem,
SteadySWProblem)):
raise TypeError("problem must be of type Problem")
if not isinstance(solver_params, CoupledSWSolverParameters):
raise TypeError("solver_params must be of type \
CoupledSWSolverParameters.")
super(CoupledSWSolver, self).__init__()
self.problem = problem
self.parameters = solver_params
# If cache_for_nonlinear_initial_guess is true, then we store all
# intermediate state variables in this dictionary to be used for the
# next solve
self.state_cache = {}
self.state = None
self.mesh = problem.parameters.domain.mesh
elements = self.problem.parameters.finite_element()
self.function_space = FunctionSpace(self.mesh, MixedElement(elements))
[docs]
@staticmethod
def default_parameters():
""" Return the default parameters for the :class:`CoupledSWSolver`.
"""
return CoupledSWSolverParameters()
def _generate_strong_bcs(self):
bcs = self.problem.parameters.bcs
facet_ids = self.problem.parameters.domain.facet_ids
fs = self.function_space
# Generate velocity boundary conditions
bcs_u = []
for _, expr, facet_id, _ in bcs.filter("u", "strong_dirichlet"):
bc = DirichletBC(fs.sub(0), expr, facet_ids, facet_id)
bcs_u.append(bc)
# Generate free-surface boundary conditions
bcs_eta = []
for _, expr, facet_id, _ in bcs.filter("eta", "strong_dirichlet"):
bc = DirichletBC(fs.sub(1), expr, facet_ids, facet_id)
bcs_eta.append(bc)
return bcs_u + bcs_eta
def get_optimisation_and_search_directory(self):
dir = os.path.join(self.parameters.output_dir,
"iter_{}".format(self.optimisation_iteration),
"search_{}".format(self.search_iteration))
try:
os.makedirs(dir)
except OSError:
pass # directory already exists
return dir
# For dynamic friction the problem parameters need to use the same
# finished funciton as in the solver to avoid machine precision error.
def _finished(self, current_time, finish_time):
if (hasattr(self.problem.parameters, 'finished')):
return self.problem.parameters.finished(current_time)
else:
return float(current_time - finish_time) >= - 1e3*DOLFIN_EPS
[docs]
def solve(self, annotate=True):
''' Returns an iterator for solving the shallow water equations. '''
############################### Setting up the equations ###########################
# Get parameters
problem_params = self.problem.parameters
solver_params = self.parameters
farm = problem_params.tidal_farm
# Performance settings
parameters['form_compiler']['quadrature_degree'] = \
solver_params.quadrature_degree
parameters['form_compiler']['cpp_optimize_flags'] = \
" ".join(solver_params.cpp_flags)
parameters['form_compiler']['cpp_optimize'] = True
parameters['form_compiler']['optimize'] = True
# Get domain measures
ds = problem_params.domain.ds
# Initialise solver settings
if type(self.problem) == SWProblem:
log(INFO, "Solve a transient shallow water problem")
theta = Constant(problem_params.theta)
dt = Constant(problem_params.dt)
finish_time = Constant(problem_params.finish_time)
t = Constant(problem_params.start_time)
include_time_term = True
elif type(self.problem) == MultiSteadySWProblem:
log(INFO, "Solve a multi steady-state shallow water problem")
theta = Constant(1)
dt = Constant(problem_params.dt)
finish_time = Constant(problem_params.finish_time)
# The multi steady-state case solves the steady-state equation also
# for the start time
t = Constant(problem_params.start_time - dt)
include_time_term = False
elif type(self.problem) == SteadySWProblem:
log(INFO, "Solve a steady-state shallow water problem")
theta = Constant(1.)
dt = Constant(1.)
finish_time = Constant(0.5)
t = Constant(0.)
include_time_term = False
else:
raise TypeError("Do not know how to solve problem of type %s." %
type(self.problem))
g = problem_params.g
depth = problem_params.depth
include_advection = problem_params.include_advection
include_viscosity = problem_params.include_viscosity
viscosity = problem_params.viscosity
bcs = problem_params.bcs
linear_divergence = problem_params.linear_divergence
cache_forward_state = solver_params.cache_forward_state
f_u = problem_params.f_u
u_dg = "Discontinuous" in str(self.function_space.split()[0])
# Define test functions
v, q = TestFunctions(self.function_space)
# Define functions
state = Function(self.function_space, name="Current_state")
self.state = state
state_new = Function(self.function_space, name="New_state")
# Load initial condition (or initial guess for stady problems)
# Projection is necessary to obtain 2nd order convergence
ic = project(problem_params.initial_condition, self.function_space)
state.assign(ic, annotate=False)
# Split mixed functions
u, h = split(state_new)
u0, h0 = split(state)
# Define the water depth
if linear_divergence:
H = depth
else:
H = h + depth
# Create initial conditions and interpolate
state_new.assign(state, annotate=annotate)
# u_(n+theta) and h_(n+theta)
u_mid = (1.0 - theta) * u0 + theta * u
h_mid = (1.0 - theta) * h0 + theta * h
# The normal direction
n = FacetNormal(self.mesh)
# Mass matrix contributions
M = inner(v, u) * dx
M += inner(q, h) * dx
M0 = inner(v, u0) * dx
M0 += inner(q, h0) * dx
# Divergence term.
Ct_mid = -H * inner(u_mid, grad(q)) * dx
#+inner(avg(u_mid),jump(q,n))*dS # This term is only needed for dg
# element pairs which is not supported
# The surface integral contribution from the divergence term
bc_contr = -H * dot(u_mid, n) * q * ds
for function_name, u_expr, facet_id, bctype in bcs.filter(function_name='u'):
if bctype in ('weak_dirichlet', 'free_slip'):
# Subtract the divergence integral again
bc_contr -= -H * dot(u_mid, n) * q * ds(facet_id)
# for weak Dirichlet we replace it with the bc expression
if bctype=='weak_dirichlet':
bc_contr -= H * dot(u_expr, n) * q * ds(facet_id)
elif bctype == 'flather':
# The Flather boundary condition requires two facet_ids.
assert len(facet_id) == 2
# Subtract the divergence integrals again
bc_contr -= -H * dot(u_mid, n) * q * ds(facet_id[0])
bc_contr -= -H * dot(u_mid, n) * q * ds(facet_id[1])
# The Flather boundary condition on the left hand side
bc_contr -= H * dot(u_expr, n) * q * ds(facet_id[0])
Ct_mid += sqrt(g * H) * inner(h_mid, q) * ds(facet_id[0])
# The contributions of the Flather boundary condition on the right hand side
Ct_mid += sqrt(g * H) * inner(h_mid, q) * ds(facet_id[1])
# Pressure gradient term
weak_eta_bcs = bcs.filter(function_name='eta', bctype='weak_dirichlet')
# don't integrate pressure gradient by parts (a.o. allows dg test function v)
C_mid = g * inner(v, grad(h_mid)) * dx
for function_name, eta_expr, facet_id, bctype in weak_eta_bcs:
# Apply the eta boundary conditions weakly on boundary IDs 1 and 2
C_mid += g * inner(dot(v, n), (eta_expr-h_mid)) * ds(facet_id)
# Bottom friction
friction = problem_params.friction
if not farm:
tf = Constant(0)
elif type(farm.friction_function) == list:
tf = farm.friction_function[0].copy(deepcopy=True,
name="turbine_friction", annotate=annotate)
tf.assign(theta*farm.friction_function[1]+(1.-float(theta))*\
farm.friction_function[0], annotate=annotate)
else:
tf = farm.friction_function.copy(deepcopy=True,
name="turbine_friction", annotate=annotate)
# FIXME: FEniCS fails on assembling the below form for u_mid = 0, even
# though it is differentiable. Even this potential fix does not help:
#norm_u_mid = conditional(inner(u_mid, u_mid)**0.5 < DOLFIN_EPS, Constant(0),
# inner(u_mid, u_mid)**0.5)
norm_u_mid = inner(u_mid, u_mid)**0.5
R_mid = friction / H * norm_u_mid * inner(u_mid, v) * dx(self.mesh)
if farm:
u_mag = dot(u_mid, u_mid)**0.5
if not farm.turbine_specification.thrust:
R_mid += tf/H*u_mag*inner(u_mid, v)*farm.site_dx
else:
C_t = farm.turbine_specification.compute_C_t(u_mag)
R_mid += (tf * farm.turbine_specification.turbine_parametrisation_constant * \
C_t / H) * u_mag * inner(u_mid, v) * farm.site_dx
# Advection term
if include_advection:
Ad_mid = inner(dot(grad(u_mid), u_mid), v) * dx
# un is |u.n| on the down-side (incoming), and 0 on the up-side of a facet (outgoing)
un = (abs(dot(u, n))-dot(u,n))/2.0
if u_dg:
# at the downside, we want inner(|u.n|*(u_down-u_up),v_down) - u_down-u_up being the gradient along the flow
Ad_mid += (inner(un('+')*(u_mid('+')-u_mid('-')), v('+')) + inner(un('-')*(u_mid('-')-u_mid('+')), v('-')))*dS
# apply weak_diricihlet bc for the advection term at the incoming boundaries only
for function_name, u_expr, facet_id, bctype in bcs:
if function_name =="u" and bctype == 'weak_dirichlet':
# this is the same thing as for DG above, except u_up is the boundary value and u_down is u+ or u- (they are the same)
# the fact that these are the same also means that the DG term above vanishes at the boundary and is replaced here
Ad_mid += inner(un*(u_mid-u_expr), v)*ds(facet_id)
if include_viscosity:
# Check that we are not using a DG velocity function space, as the facet integrals are not implemented.
if u_dg:
raise NotImplementedError("The viscosity term for \
discontinuous elements is not supported.")
D_mid = viscosity * inner(2*sym(grad(u_mid)), grad(v)) * dx
# Create the final form
G_mid = C_mid + Ct_mid + R_mid
# Add the advection term
if include_advection:
G_mid += Ad_mid
# Add the viscosity term
if include_viscosity:
G_mid += D_mid
# Add the source term
G_mid -= inner(f_u, v) * dx
F = dt * G_mid - dt * bc_contr
# Add the time term
if include_time_term:
F += M - M0
# Generate the scheme specific strong boundary conditions
strong_bcs = self._generate_strong_bcs()
############################### Perform the simulation ###########################
if solver_params.dump_period > 0:
writer = StateWriter(solver=self)
if type(self.problem) == SWProblem:
log(INFO, "Writing state to disk...")
writer.write(state)
result = {"time": float(t),
"u": state.split()[0],
"eta": state.split()[1],
"tf": tf,
"state": state,
"is_final": self._finished(t, finish_time)}
solver_params.callback(result)
yield(result)
log(INFO, "Start of time loop")
adjointer.time.start(t)
timestep = 0
while not self._finished(t, finish_time):
# Update timestep
timestep += 1
t = Constant(t + dt)
# Update bc's
t_theta = Constant(t - (1.0 - theta) * dt)
bcs.update_time(t, only_type=["strong_dirichlet"])
bcs.update_time(t_theta, exclude_type=["strong_dirichlet"])
# Update source term
f_u.t = Constant(t_theta)
# Set the control function for the upcoming timestep.
if farm:
if type(farm.friction_function) == list:
tf.assign(theta*farm.friction_function[timestep]+(1.\
-float(theta))*farm.friction_function[timestep-1],
annotate=annotate)
else:
tf.assign(farm.friction_function)
# Set the initial guess for the solve
if cache_forward_state and float(t) in self.state_cache:
log(INFO, "Read initial guess from cache for t=%f." % t)
# Load initial guess for solver from cache
state_new.assign(self.state_cache[float(t)], annotate=False)
elif not include_time_term:
log(INFO, "Set the initial guess for the nonlinear solver to the initial condition.")
# Reset the initial guess after each timestep
ic = problem_params.initial_condition
state_new.assign(ic, annotate=False)
# Solve non-linear system with a Newton solver
if self.problem._is_transient:
log(INFO, "Solve shallow water equations at time %s" % float(t))
else:
log(INFO, "Solve shallow water equations.")
solve(F == 0, state_new, bcs=strong_bcs,
solver_parameters=solver_params.dolfin_solver,
annotate=annotate,
J=derivative(F, state_new))
# After the timestep solve, update state
state.assign(state_new)
if cache_forward_state:
# Save state for initial guess cache
log(INFO, "Cache solution t=%f as next initial guess." % t)
if float(t) not in self.state_cache:
self.state_cache[float(t)] = Function(self.function_space)
self.state_cache[float(t)].assign(state_new, annotate=False)
if (solver_params.dump_period > 0 and
timestep % solver_params.dump_period == 0):
log(INFO, "Write state to disk...")
writer.write(state)
# Return the results
result = {"time": float(t),
"u": state.split()[0],
"eta": state.split()[1],
"tf": tf,
"state": state,
"is_final": self._finished(t, finish_time)}
solver_params.callback(result)
yield(result)
# Increase the adjoint timestep
adj_inc_timestep(time=float(t), finished=self._finished(t,
finish_time))
# If we're outputting the individual turbine power
if (self.parameters.print_individual_turbine_power
or ((solver_params.dump_period > 0)
and self.parameters.output_turbine_power)):
self.parameters.output_writer.individual_turbine_power(self)
log(INFO, "End of time loop.")