import logging
from collections import namedtuple
from cached_property import cached_property
from ..util import SetattrInitMixin
__all__ = [
'BaseAdaptiveStepper',
'AdaptiveStepper',
'ConstantStepperMixin',
'StepperError']
AdaptiveLogEntry = namedtuple(
'AdaptiveLogEntry',
['parameter', 'success', 'saved_solution'])
[docs]class StepperError(Exception):
pass
[docs]class BaseAdaptiveStepper(SetattrInitMixin):
'''Adaptive solution base class.
This implements an adaptive solution method: progressively change an
input parameter in small increments, re-solving the problem at each
step.
If solutions are successful, gradually larger step sizes will be
used. If a solution fails, the last successful solution is
re-loaded and a smaller step size is chosen.
A series of specific parameter values to be attained can be given, for
example to generate multiple points for a current-voltage curve.
Parameters
----------
solution: object
Solution object, whatever that means.
parameter_target_values: list
List of values for :py:attr:`parameter` to
attain. :py:attr:`output_writer` will be called as each target
value is attained.
break_criteria_cb: callable, optional
A function that will be evaluated after each target value. If it
returns :code:`True`, the adaptive stepper will stop.
Attributes
----------
parameter: float
Current value of the parameter.
step_size: float
Current step size.
update_parameter_success_factor: float
If the solver succeeds, multiply :py:attr:`step_size` by this
factor.
update_parameter_failure_factor: float
If the solver fails, multiply :py:attr:`step_size` by this
factor.
stepper_rel_tol: float
If :py:attr:`step_size ` drops below
:py:attr:`stepper_rel_tol`*max(:py:attr:`parameter`, initial_step_size),
the stepper will stop and raise a StepperError.
Notes
-----
This class implements the abstract algorithm, so it makes no
assumptions as to what :code:`solution` represents. Users of this
class must implement the :py:meth:`user_solver`,
:py:meth:`user_solution_add`, and :py:meth:`user_solution_save`
methods.
'''
solution = None
parameter_start_value = 0.0
parameter_target_values = [1.0]
step_size = 1e-2
stepper_rel_tol = 1e-5 # tolerance for bailing out of stepper
update_parameter_success_factor = 2.
update_parameter_failure_factor = 0.5
keep_saved_solutions_count = 2
output_writer = None
break_criteria_cb = None
@cached_property
def parameter(self):
return self.parameter_start_value
@cached_property
def prior_solutions(self):
''' list of (parameter_value, solver_succeeded, saved_solution) '''
return []
@cached_property
def prior_solutions_idx_successful(self):
return []
@cached_property
def prior_solutions_idx_saved_solution(self):
return []
[docs] def cleanup_prior_solutions(self):
prior = self.prior_solutions
prior_idx = self.prior_solutions_idx_saved_solution
while len(prior_idx) > self.keep_saved_solutions_count:
i = prior_idx.pop(0)
prior[i] = prior[i]._replace(saved_solution=None)
[docs] def add_prior_solution(self, parameter_value, solver_succeeded, saved_solution):
prior = self.prior_solutions
if saved_solution is not None and not solver_succeeded:
raise AssertionError("why save a failed solution?")
prior.append(AdaptiveLogEntry(
parameter_value, solver_succeeded, saved_solution))
index = len(prior) - 1
if saved_solution is not None:
self.prior_solutions_idx_saved_solution.append(index)
if solver_succeeded:
self.prior_solutions_idx_successful.append(index)
self.cleanup_prior_solutions()
[docs] def get_last_successful(self, count):
prior = self.prior_solutions
return [prior[i] for i in self.prior_solutions_idx_successful[-count:]]
[docs] def reset_parameter_to_last_successful(self):
last_ok = self.get_last_successful(1)
self.parameter = (last_ok[0].parameter if last_ok
else self.parameter_start_value)
[docs] def update_parameter(self):
'''This method is called to update :py:attr:`parameter` after
each solver call (successful or not). It also restores a previous
solution if the solver call failed.'''
last_ok = self.get_last_successful(1)
if not last_ok:
self.reset_parameter_to_last_successful()
return
last_ok = last_ok[0]
last = self.prior_solutions[-1]
if not last.success:
logging.getLogger('stepper').info(
'** Newton solver failed, trying smaller step size')
if not last.success:
self.step_size *= self.update_parameter_failure_factor
self.actual_step = min(self.step_size,
abs(self.parameter_target_value - last_ok.parameter))
# don't change step_size if we're just taking a small step
# to hit a target parameter value
if (self.actual_step == self.step_size) and last.success:
self.step_size *= self.update_parameter_success_factor
# If we just failed and actual_step<step_size, reduce step_size to
# actual_step * failure_factor
if (self.actual_step < self.step_size) and not last.success:
logging.getLogger('stepper').debug(
'** actual_step < step_size; reducing step_size further')
self.step_size = self.actual_step * self.update_parameter_failure_factor
if not last.success:
self.user_solution_add(
self.solution, 0.0, last_ok.saved_solution, 1.0)
self.parameter = last_ok.parameter + self.step_sign*self.actual_step
[docs] def user_solver(self, solution, parameter):
'''
This user-defined method must re-solve `solution` using
parameter value `parameter`.
Must return boolean indicating whether the solver succeeded.
'''
raise NotImplementedError()
[docs] def user_solution_save(self, solution):
'''
This procedure must return a copy of the solution variables of `solution`
in a format acceptable by `user_solution_add`. Typically a dict of 'key': vector pairs.
'''
raise NotImplementedError()
[docs] def user_solution_add(self, solution, solution_scale,
saved_solution, saved_scale):
'''This procedure must update the current solution so that::
self.current_solution = (self.solution*solution_scale +
saved_solution*saved_scale)
where ``saved_solution`` is as returned by
:py:meth:`user_solution_save`.
If ``solution_scale`` is equal to 0, then the current solution must be
erased (careful with NaNs!).
'''
raise NotImplementedError()
[docs] def do_iteration(self):
'''Solve the problem at a single value of the solution parameter.'''
self.update_parameter()
solution = self.solution
parameter = self.parameter
try:
success = self.user_solver(solution, parameter)
except RuntimeError:
success = False
if len(self.prior_solutions) == 0:
raise
saved = self.user_solution_save(solution) if success else None
self.add_prior_solution(parameter, success, saved)
[docs] def do_loop(self):
''' This method must be called to actually perform the adaptive
stepping procedure'''
sufficient_progress = True
initial_step_size = self.step_size
for val in self.parameter_target_values:
self.reset_parameter_to_last_successful()
self.step_sign = -1. if self.parameter > val else 1.
self.parameter_target_value = val
while True:
last_successful = self.get_last_successful(1)
failed_prev_step = (
self.prior_solutions and
not self.prior_solutions[-1].success
)
if (last_successful and
last_successful[0].parameter == val):
break
if (failed_prev_step and
abs(self.step_size)/max(abs(self.parameter),
initial_step_size) < self.stepper_rel_tol):
sufficient_progress = False
break
self.do_iteration()
if self.output_writer is not None and self.get_last_successful(1):
logging.getLogger('stepper').info('Writing output')
self.output_writer.write_output(self.solution, self.parameter)
if not sufficient_progress:
logging.getLogger('stepper').error('Insufficient progress, stopping.')
msg=("Solver unable to progress. Among many possible causes of this "
"error are: poorly formed problem, incorrect boundary conditions,"
"inappropriate material parameters, or insufficient meshing. If "
"none of these is the case, it is possible that changing "
"preconditioning settings in simudo.fem.newton_solver.do_linear_solve could help.")
raise StepperError(msg)
if self.break_criteria_cb is not None:
if self.break_criteria_cb(self.solution):
logging.getLogger('stepper').info('*** Break criteria met, stopping.')
break
[docs]class AdaptiveStepper(BaseAdaptiveStepper):
'''This implements a slightly more concrete class than
:py:class:`BaseAdaptiveStepper`.
Parameters
----------
to_save_objects: dict
Mapping of vector-like objects representing the current
:py:attr:`solution`, whose values to save (and restore upon solver
failure).
'''
to_save_objects = None
def _vector(self, x):
x = x.magnitude if hasattr(x, 'magnitude') else x
return x.vector()
[docs] def user_solver(self, solution, parameter):
self.user_apply_parameter_to_solution(solution, parameter)
solver = self.user_make_solver(solution)
solver.solve()
self.du = solver.solution.du
return solver.has_converged()
[docs] def user_apply_parameter_to_solution(self, solution, parameter):
raise NotImplementedError('override me')
[docs] def user_make_solver(self, solution):
raise NotImplementedError('override me')
[docs] def user_solution_save(self, solution):
return {k: self._vector(obj)[:]
for k, obj in self.to_save_objects.items()}
[docs] def user_solution_add(self, solution, solution_scale,
saved_solution, saved_scale):
V = self._vector
for k, obj in self.to_save_objects.items():
if solution_scale == 0:
V(obj)[:] = saved_solution[k]*saved_scale
else:
V(obj)[:] *= solution_scale
V(obj)[:] += saved_solution[k]*saved_scale
[docs]class ConstantStepperMixin():
'''
Parameters
----------
constants: :py:class:`pint.Quantity` wrapping :py:class:`dolfin.Constant`
On each iteration, the constant's value will be assigned to be the
current parameter value.
parameter_unit: :py:class:`pint.Quantity`, optional
Parameter unit.
'''
parameter_unit = None
[docs] def get_mapped_parameter(self):
x = self.parameter
if self.parameter_unit is not None:
x = self.parameter_unit * x
return x
@property
def unit_registry(self):
return self.solution.unit_registry
[docs] def user_apply_parameter_to_solution(self, solution, parameter_value):
x = self.get_mapped_parameter()
for c in self.constants:
c.magnitude.assign(x.m_as(c.units))