Source code for simudo.fem.newton_solver

from __future__ import absolute_import, division, print_function

import logging
from builtins import dict, super
from copy import deepcopy

import numpy
from cached_property import cached_property

import dolfin
from petsc4py import PETSc

# cached_property = property

__all__ = [
    'NewtonSolution',
    'NewtonSolver',
    'NewtonSolverMaxDu',
    'NewtonSolverLogDamping',
    'ExitOptimizerException']

def petsc_diag(vec):
    n = vec.sizes[0]
    m = PETSc.Mat()
    m.create(PETSc.COMM_WORLD)
    m.setSizes([n, n])
    m.setType('aij')  # sparse
    m.setPreallocationNNZ(5)
    m.setDiagonal(vec)
    return m

# originally we want to solve A x = b
# but instead we're solving (A Q_inv)(Q x) = b
# let q be the diagonal of Q_inv


[docs]class NewtonSolution(object): mumps_icntl = () # Iterable of (inctl, val) pairs to set mumps values def __init__(self, solver, u_=None): self.solver = solver W = solver.W self.u_ = u_ if u_ else dolfin.Function(W) # current solution self.u = dolfin.Function(W) # next solution self.du = dolfin.Function(W) # change to current solution self.Qdu = dolfin.PETScVector()
[docs] def do_assemble_system(self, assemble_A=True, assemble_b=True): if assemble_A: self.A if assemble_b: self.b
@cached_property def size(self): return len(self.u_.vector()) @cached_property def q(self): return None @cached_property def A(self): A = dolfin.PETScMatrix() self.solver.assembler.assemble(A) # q = self.q # if q is not None: # A.mat().diagonalScale(None, q) return A @cached_property def b(self): b = dolfin.PETScVector() self.solver.assembler.assemble(b, self.u_.vector()) # <---- (*) # (*) If you're wondering how we can just apply nonzero Dirichlet BC's # despite du being required to be equal to zero on u's Dirichlet # boundaries, then look no further. The answer is that second argument # to SystemAssembler::assemble(). return b @cached_property def du_norm(self): return numpy.linalg.norm( self.du.vector().get_local(), ord=numpy.Inf) @cached_property def has_nans(self): return ( numpy.isnan(self.u_.vector().get_local()).any() or numpy.isnan(self.b_norm) ) @cached_property def rel_du_norm(self): # |du| / (|u| + |du|) bad # = 1 / (|u|/|du| + 1) bad du = self.du.vector().get_local() u = self.u_.vector().get_local() with numpy.errstate(divide='ignore', invalid='ignore'): r = numpy.abs(du/u) ok = ~numpy.isnan(r) if not any(ok): return numpy.nan return numpy.linalg.norm( r[ok], ord=2) @cached_property def combined_du_norm(self): du = self.du.vector().get_local() u = self.u_.vector().get_local() rel_tol = self.solver.parameters['relative_tolerance'] abs_tol = self.solver.parameters['absolute_tolerance'] # user adjustable weight abs_tol_vec = self.solver.parameters['absolute_tolerance_vector'] with numpy.errstate(divide='ignore', invalid='ignore'): r = numpy.abs(du)/(numpy.abs(u)*rel_tol + abs_tol * abs_tol_vec) return numpy.linalg.norm(r, ord=numpy.Inf) @cached_property def b_norm(self): return numpy.linalg.norm( self.b.get_local(), ord=numpy.Inf)
[docs] def do_linear_solve(self): A, Qdu, b = self.A, self.Qdu, self.b q = self.q # Output diagnostic info and stats from MUMPS. # dolfin.PETScOptions.set('mat_mumps_icntl_2', 6) # dolfin.PETScOptions.set('mat_mumps_icntl_4', 3) # print mumps error analysis, i.e. condition number # dolfin.PETScOptions.set('mat_mumps_icntl_11', 1) # settings for automatic scaling and pre-processing of the A matrix. # dolfin.PETScOptions.set('ksp_view') # dolfin.PETScOptions.set('-info', 'out/petsc_log.txt') # dolfin.PETScOptions.set('log_trace', 'out/petsc_trace.txt') # dolfin.PETScOptions.set('pc_factor_mat_ordering_type', 'natural') dolfin.PETScOptions.set('mat_mumps_icntl_6', 5) dolfin.PETScOptions.set('mat_mumps_icntl_7', 6) dolfin.PETScOptions.set('mat_mumps_icntl_8', 77) # 1=sequential mode (no MPI). Still uses multithreading. dolfin.PETScOptions.set('mat_mumps_icntl_28', 1) # dolfin.PETScOptions.set('mat_mumps_icntl_29', 1) # After setting the above defaults, go through the mumps_icntl to (re)set parameters for opt in self.mumps_icntl: # ((28,7), (29,3), (30, 1)) dolfin.PETScOptions.set('mat_mumps_icntl_' + str(opt[0]), int(opt[1])) solver = dolfin.PETScLUSolver("mumps") solver.solve(A, Qdu, b) ### trash not working code ### # solver = dolfin.KrylovSolver('gmres', 'ilu') # prm = solver.parameters # prm['absolute_tolerance'] = 1E-5 # prm['relative_tolerance'] = 1E-5 # prm['maximum_iterations'] = 5000 # prm["monitor_convergence"] = True # solver.solve(A, Qdu, b) # try: # solver.solve(self.A, Qdu, self.b) # except: # try: # prm['nonzero_initial_guess'] = True # solver.solve(self.A, Qdu, self.b) # except: # raise if q is not None: dolfin.as_backend_type(self.du.vector()).vec().pointwiseMult( q, Qdu.vec()) else: self.du.vector()[:] = Qdu.get_local() # FIXME: EXTREMELY INEFFICIENT!
# Qdu.vec().copy(self.du.vector().vec())
[docs] def str_error(self): abserr = getattr(self, 'b_norm', -1) relerr = getattr(self, 'rel_du_norm', -1) comberr = getattr(self, 'combined_du_norm', -1) return "|b| {:.6e}, |du/u| {:.6e}, |du/(a*u+c)| {:.6e}".format( abserr, relerr, comberr)
_no_copy = frozenset(('solver','A','b','q','size'))
[docs] def copy_to(self, other): for k, v in self.__dict__.items(): if k in self._no_copy: continue if isinstance(v, dolfin.Function): getattr(other, k).assign(getattr(self, k)) # elif isinstance(v, (dolfin.PETScMatrix, dolfin.PETScVector)): # setattr(other, k, v.copy()) else: setattr(other, k, deepcopy(v))
[docs] def copy(self): other = self.__class__(self.solver) self.copy_to(other) return other
_cached_property_attrs = frozenset( ('rel_du_norm', 'du_norm', 'combined_du_norm', 'b_norm', 'A', 'b', 'q', 'size', 'has_nans'))
[docs] def do_invalidate_cache(self, check=False): d = self.__dict__ for k in self._cached_property_attrs: if d.pop(k, None) is not None: if check: logging.info('*'*30 + k)
[docs]class NewtonSolver(object): '''A general Newton solver. Parameters ---------- F: :py:class:`ufl.Form` Nonlinear form to be solved (by setting equal to zero). That is, we are trying to solve :code:`F(u_) = 0` by varying :code:`u_`. u\_: dolfin.Function Function being varied (read: solved for). bcs: list of dolfin.DirichletBC List of essential boundary conditions to be applied. J: :py:class:`ufl.core.expr.Expr`, optional Jacobian of the form with respect to :code:`u_`'s coefficients. If this is not supplied, it is derived from :code:`F` automatically. parameters: dict Parameters controlling the Newton solution process. Attributes ---------- iteration: int Current iteration. Notes ----- The underscore in :code:`u_` is a subscript minus, and signifies that this is the "previous" value of the function is used to evalute the form in the iterative Newton process for nonlinear problems. ''' iteration = 0 solution_class = NewtonSolution du_mask_array = None extra_iterations = None def __init__(self, F, u_, bcs, J=None, parameters=None): self.do_init_parameters() if parameters is not None: self.parameters.update(parameters) self.W = u_.function_space() self.do_init_solution(u_) self.set_problem(F, u_, bcs, J=J) self.user_before_first_iteration_hooks = [] self.user_pre_iteration_hooks = [] self.user_post_iteration_hooks = []
[docs] @classmethod def from_nice_obj(cls, obj): F = obj.get_weak_form().to_ufl().magnitude u_ = obj.get_solution_function() bcs = obj.get_essential_bcs() return cls(F=F, u_=u_, bcs=bcs)
[docs] def set_problem(self, F, u_, bcs, J=None): self.F = F self.J = dolfin.derivative(F, u_) if J is None else J self.bcs = bcs self.do_init_assembler()
[docs] def create_solution(self, *args, **kwargs): return self.solution_class(self, *args, **kwargs)
[docs] def do_init_solution(self, u_): self.solution = self.create_solution(u_=u_)
[docs] def do_init_parameters(self): self.parameters = dict(maximum_iterations=25, minimum_iterations=-1, relaxation_parameter=1.0, absolute_tolerance=1e-5, relative_tolerance=1e-6, extra_iterations=3)
[docs] def do_init_assembler(self): try: self.assembler = dolfin.fem.assembling.SystemAssembler( self.J, self.F, self.bcs) except TypeError as exc: err = self.logger.error err("SystemAssembler arguments follow:") err(" J={!r}".format(self.J)) err(" F={!r}".format(self.F)) err(" bcs={!r}".format(self.bcs)) raise self.solution.do_invalidate_cache()
[docs] def do_update_solution(self): s = self.solution du = s.du.vector()[:] maskv = self.du_mask_array if maskv is not None: du = du*maskv self.user_update_solution_vector(s, du) s.do_invalidate_cache()
[docs] def user_update_solution_vector(self, solution, du): s = solution s.u_.vector()[:] -= du*self.get_omega()
[docs] def do_iteration(self): sltn = self.solution self.do_pre_iteration_hook() sltn.do_assemble_system() sltn.do_linear_solve() self.do_update_solution() sltn.do_invalidate_cache() self.do_post_iteration_hook()
@cached_property def logger(self): return logging.getLogger('newton')
[docs] def log_print(self, string='', end='\n'): # print(string, file=sys.stderr, end=end) self.logger.info(string)
[docs] def do_before_first_iteration_hook(self): self.solution.do_invalidate_cache() for hook in self.user_before_first_iteration_hooks: hook(self)
[docs] def do_pre_iteration_hook(self): self.solution.do_invalidate_cache() for hook in self.user_pre_iteration_hooks: hook(self)
[docs] def do_post_iteration_hook(self): self.log_print("{}* iteration {:>3d}; {}".format( '\n' if self.iteration == 0 else '', self.iteration, self.solution.str_error())) self.solution.do_invalidate_cache() for hook in self.user_post_iteration_hooks: hook(self)
[docs] def get_omega(self): omega_cb = self.parameters.get('omega_callback', None) if omega_cb is not None: return omega_cb(self) if self.iteration <= self.parameters.get('num_damped_iterations', 5): return self.parameters['relaxation_parameter'] else: return 1.0
[docs] def solve(self): self.do_before_first_iteration_hook() while True: self.iteration += 1 self.do_iteration() if self.should_stop_real(): break
[docs] def has_converged(self): sltn = self.solution return ((not self.solution.has_nans) and (sltn.combined_du_norm <= 1.))
[docs] def should_stop_real(self): extra = self.extra_iterations should_stop = self.should_stop() if self.solution.has_nans: return True # solution is nan-poisoned, all hope is lost if extra is not None: if should_stop: # endgame mode: nail the solution further self.extra_iterations = extra = extra - 1 return extra <= 0 else: # false minimum in error metric, cancel endgame mode self.extra_iterations = None if should_stop: extra = self.parameters.get('extra_iterations') if extra is None: return True else: self.extra_iterations = extra return extra <= 0 else: return False
[docs] def should_stop(self): sltn = self.solution iteration = self.iteration if (iteration < self.parameters['minimum_iterations']): return False return ((iteration >= self.parameters['maximum_iterations']) or (self.has_converged()) or (numpy.isnan(sltn.du_norm)))
[docs]class ExitOptimizerException(Exception): pass
class RestartingNewtonSolution(NewtonSolution): @cached_property def cost(self): return self.b_norm # + numpy.linalg.norm(self.b.get_local(), ord=2)/10000.0 # numpy.linalg.norm(self.b.get_local(), ord=2) _cached_property_attrs = frozenset(('cost',)).union( NewtonSolution._cached_property_attrs) class RestartingNewtonSolver(NewtonSolver): solution_class = RestartingNewtonSolution def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.newton_history = [] self.optimizer_history = [] self.opt_extra_iterations_left = None def get_minimum_cost(self): return self.solution.cost*1.3 def get_optimizer_extra_iterations(self): return 5 def get_optimizer_dimension(self): return 2 def compute_linear_combination(self, x): cs, vs = self.optimizer_vector, self.optimizer_basis assert len(cs) == len(vs)-1 self.optimizer_coefficients = cs = [c-1.0 for c in cs] self.optimizer_coefficients.insert(1, 1.0-sum(cs)) return sum(v*c for (v, c) in zip(vs, cs)) def post_optimizer_iteration_hook(self): self.do_post_iteration_hook() def post_newton_iteration_hook(self): self.do_post_iteration_hook() def cost_function(self, x): sl = self.solution self.current_solver_mode = 'optimizer' self.optimizer_vector = x sl.u_.vector()[:] = self.compute_linear_combination(x) def constraint(a): return -a if a <= 0.0 else 0.0 penalty = (constraint((5.0 - max(self.optimizer_coefficients))) + constraint(-(-5.0 - min(self.optimizer_coefficients)))) sl.do_invalidate_cache() sl.do_assemble_system(assemble_A=False) self.post_optimizer_iteration_hook() self.optimizer_history.insert(0, sl.copy()) self.iteration += 1 sc = sl.cost if sc < self.min_cost: ei = getattr(self, 'opt_extra_iterations_left', None) if ei is None: ei = self.get_optimizer_extra_iterations() ei -= 1 if ei < 0: self.opt_extra_iterations_left = None raise ExitOptimizerException() self.opt_extra_iterations_left = ei return sc + penalty*1000 def optimizer_procedure(self, func, x0s): from scipy.optimize import fmin_l_bfgs_b for x0 in x0s[1:]: func(x0) fmin_l_bfgs_b(func=func, x0=x0s[0], approx_grad=1) def do_post_iteration_hook(self): if self.iteration == 0: self.log_print() self.log_print("*** {:>9s} iteration {:>3d}; {}".format( self.current_solver_mode, self.iteration, self.solution.str_error())) if self.current_solver_mode == 'optimizer': self.log_print(" ---> {}".format(self.optimizer_coefficients)) def solve(self): nhistory = self.newton_history ohistory = self.optimizer_history self.min_cost = numpy.Inf sl = self.solution self.current_solver_mode = 'null' self.do_before_first_iteration_hook() while True: self.current_solver_mode = 'newton' self.do_iteration() self.iteration += 1 nhistory.insert(0, sl.copy()) odim = self.get_optimizer_dimension() if len(nhistory) >= odim and not (sl.cost <= self.min_cost): def vcopy(w): v = sl.u_.copy(deepcopy=True) v.assign(w) return v.vector().array() self.optimizer_basis = [vcopy(s.u_) for s in nhistory[:odim]] #x0 = (1.0, 2.0) + (1.0,)*(odim-2) x0s = [(1.0+10**x,) + (1.0,)*(odim-2) for x in numpy.linspace(-5, -0.1, 6)] try: self.optimizer_procedure(self.cost_function, x0s) except ExitOptimizerException: pass ohistory.sort(key=lambda s:s.cost) sl.u_.assign(ohistory[0].u_) sl.do_invalidate_cache() nhistory.insert(0, sl.copy()) del ohistory[:] if self.should_stop_real(): break self.min_cost = self.get_minimum_cost()
[docs]class NewtonSolverMaxDu(NewtonSolver):
[docs] def user_update_solution_vector(self, solution, du): maxdu = self.parameters.get('maximum_du', None) if maxdu is not None: du = numpy.minimum(du, maxdu) du = numpy.maximum(du, -maxdu) super().user_update_solution_vector(solution, du)
[docs]class NewtonSolverLogDamping(NewtonSolver):
[docs] def user_update_solution_vector(self, solution, du): # ref:[Gaury2018a] alpha = self.parameters.get('logdamping_alpha', 1.72) du = numpy.sign(du)*numpy.log1p(numpy.abs(du) * alpha) / alpha super().user_update_solution_vector(solution, du)