Source code for assimulo.solvers.runge_kutta

#!/usr/bin/env python 
# -*- coding: utf-8 -*-

# Copyright (C) 2010 Modelon AB
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import numpy as N

from assimulo.ode import *
from assimulo.explicit_ode import Explicit_ODE

from assimulo.exception import *

from assimulo.lib import dopri5

[docs]class Dopri5(Explicit_ODE): """ Explicit Runge-Kutta method of order (4)5 with step-size control and continuous output. Based on the method by Dormand and Prince. Based on the FORTRAN code DOPRI5 by E.Hairer and G.Wanner, which can be found here: http://www.unige.ch/~hairer/software.html Details about the implementation (FORTRAN) can be found in the book,:: Solving Ordinary Differential Equations I, Nonstiff Problems Authors: E. Hairer, S. P. Norsett and G. Wanner Springer-Verlag, ISBN: 3-540-56670-8 """ def __init__(self, problem): """ Initiates the solver. Parameters:: problem - The problem to be solved. Should be an instance of the 'Explicit_Problem' class. """ Explicit_ODE.__init__(self, problem) #Calls the base class #Default values self.options["safe"] = 0.9 #Safety factor self.options["fac1"] = 0.2 #Parameters for step-size selection (lower bound) self.options["fac2"] = 10.0 #Parameters for step-size selection (upper bound) self.options["beta"] = 0.04 self.options["maxh"] = N.inf #Maximum step-size. self.options["inith"] = 0.0 self.options["atol"] = 1.0e-6*N.ones(self.problem_info["dim"]) #Absolute tolerance self.options["rtol"] = 1.0e-6 #Relative tolerance self.options["maxsteps"] = 100000 #Solver support self.supports["report_continuously"] = True self.supports["interpolated_output"] = True self.supports["state_events"] = True #Internal self._leny = len(self.y) #Dimension of the problem def initialize(self): #Reset statistics self.statistics.reset() def set_problem_data(self): if self.problem_info["state_events"]: def event_func(t, y): return self.problem.state_events(t, y, self.sw) def f(t, y): return self.problem.rhs(t, y, self.sw) self.f = f self.event_func = event_func self._event_info = [0] * self.problem_info["dimRoot"] self.g_old = self.event_func(self.t, self.y) else: self.f = self.problem.rhs def interpolate(self, time): y = N.empty(self._leny) for i in range(self._leny): y[i] = dopri5.contd5(i+1, time, self.cont, self.lrc) return y def _solout(self, nrsol, told, t, y, cont, lrc, irtrn): """ This method is called after every successful step taken by Radau5 """ #Saved to be used by the interpolation function. self.cont = cont self.lrc = lrc if self.problem_info["state_events"]: flag, t, y = self.event_locator(told, t, y) #Convert to Fortram indicator. if flag == ID_PY_EVENT: irtrn = -1 if self._opts["report_continuously"]: initialize_flag = self.report_solution(t, y, self._opts) if initialize_flag: irtrn = -1 else: if self._opts["output_list"] is None: self._tlist.append(t) self._ylist.append(y.copy()) else: output_list = self._opts["output_list"] output_index = self._opts["output_index"] try: while output_list[output_index] <= t: self._tlist.append(output_list[output_index]) self._ylist.append(self.interpolate(output_list[output_index])) output_index += 1 except IndexError: pass self._opts["output_index"] = output_index if self.problem_info["state_events"] and flag == ID_PY_EVENT and len(self._tlist) > 0 and self._tlist[-1] != t: self._tlist.append(t) self._ylist.append(y) return irtrn def integrate(self, t, y, tf, opts): ITOL = 1 #Both atol and rtol are vectors IOUT = 2 #Dense out in solout WORK = N.array([0.0]*(8*self.problem_info["dim"]+5*self.problem_info["dim"]+21)) IWORK = N.array([0]*(self.problem_info["dim"]+21)) #Setting work options WORK[1] = self.safe WORK[2] = self.fac1 WORK[3] = self.fac2 WORK[4] = self.beta WORK[5] = self.maxh WORK[6] = self.inith #Setting iwork options IWORK[0] = self.maxsteps IWORK[4] = self.problem_info["dim"] #Check for initialization if opts["initialize"]: self.set_problem_data() self._tlist = [] self._ylist = [] #Store the opts self._opts = opts t, y, iwork, flag = dopri5.dopri5(self.f, t, y.copy(), tf, self.rtol*N.ones(self.problem_info["dim"]), self.atol, ITOL, self._solout, IOUT, WORK, IWORK) #Checking return if flag == 1: flag = ID_PY_COMPLETE elif flag == 2: flag = ID_PY_EVENT else: raise Exception("Dopri5 failed with flag %d"%flag) #Retrieving statistics self.statistics["nsteps"] += iwork[18] self.statistics["nfcns"] += iwork[16] #self.statistics["nstepstotal"] += iwork[17] self.statistics["nerrfails"] += iwork[19] return flag, self._tlist, self._ylist def state_event_info(self): return self._event_info def set_event_info(self, event_info): self._event_info = event_info
[docs] def print_statistics(self, verbose=NORMAL): """ Prints the run-time statistics for the problem. """ Explicit_ODE.print_statistics(self, verbose) #Calls the base class self.log_message('\nSolver options:\n', verbose) self.log_message(' Solver : Dopri5 ', verbose) self.log_message(' Tolerances (absolute) : ' + str(self._compact_atol()), verbose) self.log_message(' Tolerances (relative) : ' + str(self.options["rtol"]), verbose) self.log_message('', verbose)
def _set_atol(self,atol): self.options["atol"] = N.array(atol,dtype=N.float) if len(N.array(atol,dtype=N.float).shape)>0 else N.array([atol],dtype=N.float) if len(self.options["atol"]) == 1: self.options["atol"] = self.options["atol"]*N.ones(self._leny) elif len(self.options["atol"]) != self._leny: raise Dopri5_Exception("atol must be of length one or same as the dimension of the problem.") def _get_atol(self): """ Defines the absolute tolerance(s) that is to be used by the solver. Can be set differently for each variable. Parameters:: atol - Default '1.0e-6'. - Should be a positive float or a numpy vector of floats. Example: atol = [1.0e-4, 1.0e-6] """ return self.options["atol"] atol=property(_get_atol,_set_atol) def _set_rtol(self,rtol): try: self.options["rtol"] = float(rtol) except (ValueError, TypeError): raise Dopri5_Exception('Relative tolerance must be a (scalar) float.') if self.options["rtol"] <= 0.0: raise Dopri5_Exception('Relative tolerance must be a positive (scalar) float.') def _get_rtol(self): """ Defines the relative tolerance that is to be used by the solver. Parameters:: rtol - Default '1.0e-6'. - Should be a positive float. Example: rtol = 1.0e-4 """ return self.options["rtol"] rtol=property(_get_rtol,_set_rtol) def _get_maxsteps(self): """ The maximum number of steps allowed to be taken to reach the final time. Parameters:: maxsteps - Default 10000 - Should be a positive integer """ return self.options["maxsteps"] def _set_maxsteps(self, max_steps): try: max_steps = int(max_steps) except (TypeError, ValueError): raise Dopri5_Exception("Maximum number of steps must be a positive integer.") self.options["maxsteps"] = max_steps maxsteps = property(_get_maxsteps, _set_maxsteps) def _set_fac1(self, fac1): try: self.options["fac1"] = float(fac1) except (ValueError, TypeError): raise Dopri5_Exception('The fac1 must be an integer or float.') def _get_fac1(self): """ Parameters for step-size selection. The new step-size is chosen subject to the restriction fac1 <= current step-size / old step-size <= fac2. Parameters:: fac1 - Default 0.2 - Should be a float. Example: fac1 = 0.1 """ return self.options["fac1"] fac1 = property(_get_fac1, _set_fac1) def _set_fac2(self, fac2): try: self.options["fac2"] = float(fac2) except (ValueError, TypeError): raise Dopri5_Exception('The fac2 must be an integer or float.') def _get_fac2(self): """ Parameters for step-size selection. The new step-size is chosen subject to the restriction fac1 <= current step-size / old step-size <= fac2. Parameters:: fac2 - Default 8.0 - Should be a float. Example: fac2 = 10.0 """ return self.options["fac2"] fac2 = property(_get_fac2, _set_fac2) def _set_safe(self, safe): try: self.options["safe"] = float(safe) except (ValueError, TypeError): raise Dopri5_Exception('The safe must be an integer or float.') def _get_safe(self): """ The safety factor in the step-size prediction. Parameters:: safe - Default '0.9' - Should be float. Example: safe = 0.8 """ return self.options["safe"] safe = property(_get_safe, _set_safe) def _set_initial_step(self, initstep): try: self.options["inith"] = float(initstep) except (ValueError, TypeError): raise Dopri5_Exception('The initial step must be an integer or float.') def _get_initial_step(self): """ This determines the initial step-size to be used in the integration. Parameters:: inith - Default '0.01'. - Should be float. Example: inith = 0.01 """ return self.options["inith"] inith = property(_get_initial_step,_set_initial_step) def _set_max_h(self,max_h): try: self.options["maxh"] = float(max_h) except (ValueError,TypeError): raise Dopri5_Exception('Maximal stepsize must be a (scalar) float.') if self.options["maxh"] < 0: raise Dopri5_Exception('Maximal stepsize must be a positiv (scalar) float.') def _get_max_h(self): """ Defines the maximal step-size that is to be used by the solver. Parameters:: maxh - Default final time - current time. - Should be a float. Example: maxh = 0.01 """ return self.options["maxh"] maxh=property(_get_max_h,_set_max_h) def _set_beta(self, beta): self.options["beta"] = beta def _get_beta(self): """ Option for stabilized step-size control. Parameters:: beta - Default 0.04 - Should be a float. """ return self.options["beta"] beta = property(_get_beta, _set_beta)
[docs]class RungeKutta34(Explicit_ODE): """ Adaptive Runge-Kutta of order four. Obs. Step rejection not implemented. """ def __init__(self, problem): """ Initiates the solver. Parameters:: problem - The problem to be solved. Should be an instance of the 'Explicit_Problem' class. """ Explicit_ODE.__init__(self, problem) #Calls the base class #Solver options self.options["atol"] = 1.0e-6 self.options["rtol"] = 1.0e-6 self.options["inith"] = 0.01 self.options["maxsteps"] = 10000 #Internal temporary result vector self.Y1 = N.array([0.0]*len(self.y0)) self.Y2 = N.array([0.0]*len(self.y0)) self.Y3 = N.array([0.0]*len(self.y0)) self.Y4 = N.array([0.0]*len(self.y0)) self.Z3 = N.array([0.0]*len(self.y0)) #Solver support self.supports["report_continuously"] = True self.supports["interpolated_output"] = True self.supports["state_events"] = True def initialize(self): #Reset statistics self.statistics.reset() def set_problem_data(self): if self.problem_info["state_events"]: def event_func(t, y): return self.problem.state_events(t, y, self.sw) def f(dy ,t, y): try: dy[:] = self.problem.rhs(t, y, self.sw) except: return False return True self.f = f self.event_func = event_func self._event_info = [0] * self.problem_info["dimRoot"] self.g_old = self.event_func(self.t, self.y) else: self.f = self.problem.rhs_internal def _set_initial_step(self, initstep): try: initstep = float(initstep) except (ValueError, TypeError): raise Explicit_ODE_Exception('The initial step must be an integer or float.') self.options["inith"] = initstep def _get_initial_step(self): """ This determines the initial step-size to be used in the integration. Parameters:: inith - Default '0.01'. - Should be float. Example: inith = 0.01 """ return self.options["inith"] inith = property(_get_initial_step,_set_initial_step) def _set_atol(self,atol): try: atol_arr = N.array(atol, dtype=float) if (atol_arr <= 0.0).any(): raise Explicit_ODE_Exception('Absolute tolerance must be a positive float or a float vector.') except (ValueError,TypeError): raise Explicit_ODE_Exception('Absolute tolerance must be a positive float or a float vector.') if atol_arr.size == 1: self.options["atol"] = float(atol) elif atol_arr.size == len(self.y): self.options["atol"] = [float(x) for x in atol] else: raise Explicit_ODE_Exception('Absolute tolerance must be a float vector of same dimension as the problem or a scalar.') def _get_atol(self): """ Sets the absolute tolerance to be used in the integration. Parameters:: atol - Default 1.0e-6. - Should be float or an array/list of len(y) Example: atol=1.e5 atol=[1.e-5,1.e-4] """ return self.options["atol"] atol = property(_get_atol,_set_atol) def _set_rtol(self, rtol): try: rtol = float(rtol) except (TypeError,ValueError): raise Explicit_ODE_Exception('Relative tolerance must be a float.') if rtol <= 0.0: raise Explicit_ODE_Exception('Relative tolerance must be a positive (scalar) float.') self.options["rtol"] = rtol def _get_rtol(self): """ The relative tolerance to be used in the integration. Parameters:: rtol - Default 1.0e-6 - Should be a float. """ return self.options["rtol"] rtol = property(_get_rtol, _set_rtol) def _get_maxsteps(self): """ The maximum number of steps allowed to be taken to reach the final time. Parameters:: maxsteps - Default 10000 - Should be a positive integer """ return self.options["maxsteps"] def _set_maxsteps(self, max_steps): try: max_steps = int(max_steps) except (TypeError, ValueError): raise Explicit_ODE_Exception("Maximum number of steps must be a positive integer.") self.options["maxsteps"] = max_steps maxsteps = property(_get_maxsteps, _set_maxsteps) def step(self, t, y, tf, opts): initialize = opts["initialize"] if initialize: self.solver_iterator = self._iter(t,y,tf,opts) return self.solver_iterator.next()
[docs] def integrate(self, t, y, tf, opts): """ Integrates (t,y) values until t > tf """ [flags, tlist, ylist] = zip(*list(self._iter(t, y, tf,opts))) return flags[-1], tlist, ylist
def _iter(self,t,y,tf,opts): if opts["initialize"]: self.set_problem_data() maxsteps = self.options["maxsteps"] h = self.options["inith"] h = min(h, N.abs(tf-t)) self.f(self.Y1, t, y) flag = ID_PY_OK for i in range(maxsteps): if t+h < tf and flag == ID_PY_OK: t, y, error = self._step(t, y, h) self.statistics["nsteps"] += 1 if self.problem_info["state_events"]: flag, t, y = self.event_locator(t-h , t, y) if opts["report_continuously"]: initialize_flag = self.report_solution(t, y, opts) if initialize_flag: flag = ID_PY_EVENT yield flag, t,y elif opts["output_list"] is None: yield flag, t, y else: output_list = opts["output_list"] output_index = opts["output_index"] try: while output_list[output_index] <= t: yield flag, output_list[output_index], self.interpolate(output_list[output_index]) output_index = output_index + 1 except IndexError: pass opts["output_index"] = output_index h=self.adjust_stepsize(h,error) h=min(h, N.abs(tf-t)) else: break else: raise Explicit_ODE_Exception('Final time not reached within maximum number of steps') #If no event has been detected, do the last step. if flag == ID_PY_OK: t, y, error = self._step(t, y, h) self.statistics["nsteps"] += 1 if self.problem_info["state_events"]: flag, t, y = self.event_locator(t-h , t, y) if flag == ID_PY_OK: flag = ID_PY_COMPLETE if opts["report_continuously"]: initialize_flag = self.report_solution(t, y, opts) if initialize_flag: flag = ID_PY_EVENT else: flag = ID_PY_COMPLETE yield flag, t,y elif opts["output_list"] is None: yield flag, t,y else: output_list = opts["output_list"] output_index = opts["output_index"] try: while output_list[output_index] <= t: yield flag, output_list[output_index], self.interpolate(output_list[output_index]) output_index = output_index + 1 except IndexError: pass opts["output_index"] = output_index
[docs] def adjust_stepsize(self, h, error): """ Adjusts the stepsize. """ if error == 0.0: fac = 2.0 else: fac=min((1.0/error)**(1.0/4.0),2.) h *= fac return h
def _step(self, t, y, h): """ This calculates the next step in the integration. """ self.statistics["nfcns"] += 5 scaling = N.array(abs(y)*self.rtol + self.atol) # to normalize the error f = self.f f(self.Y2, t + h/2., y + h*self.Y1/2.) f(self.Y3, t + h/2., y + h*self.Y2/2.) f(self.Z3, t + h, y - h*self.Y1 + 2.0*h*self.Y2) f(self.Y4, t + h, y + h*self.Y3) error = N.linalg.norm(h/6.0*(2.0*self.Y2 + self.Z3 - 2.0*self.Y3 - self.Y4)/scaling) #normalized t_next = t + h y_next = y + h/6.0*(self.Y1 + 2.0*self.Y2 + 2.0*self.Y3 + self.Y4) f_low = self.Y1.copy() f(self.Y1, t_next, y_next) #Hermitian interpolation for the solution in [t, t_next] def interpolate(time): thetha = (time - t) / (t_next - t) f_high = self.Y1 return (1 - thetha) * y + thetha * y_next + thetha * \ (thetha - 1) * ((1 - 2*thetha) * (y_next - y) + \ (thetha - 1) * h * f_low + thetha * h * f_high) self.interpolate = interpolate return t_next, y_next, error def state_event_info(self): return self._event_info def set_event_info(self, event_info): self._event_info = event_info
[docs] def print_statistics(self, verbose): """ Should print the statistics. """ Explicit_ODE.print_statistics(self, verbose) #Calls the base class self.log_message('\nSolver options:\n', verbose) self.log_message(' Solver : RungeKutta34', verbose) self.log_message(' Solver type : Adaptive', verbose) self.log_message(' Relative tolerance : ' + str(self.options["rtol"]), verbose) self.log_message(' Absolute tolerance : ' + str(self._compact_atol()) + '\n', verbose)
[docs]class RungeKutta4(Explicit_ODE): """ This solver solves an explicit ordinary differential equation using a Runge-Kutta method of order 4. We want to approximate the solution to the ordinary differential equation of the form, .. math:: \dot{y} = f(t,y), \quad y(t_0) = y_0 . Using a Runge-Kutta method of order 4, the approximation is defined as follow, .. math:: y_{n+1} = y_n + \\frac{1}{6}(k_1+2k_2+2k_3+k_4) where, .. math:: k_1 = hf(t_n,y_n) k_2 = hf(t_n+\\frac{1}{2}h,y_n+\\frac{1}{2}k_1) k_3 = hf(t_n+\\frac{1}{2}h,y_n+\\frac{1}{2}k_2) k_4 = hf(t_n+h,y_n+k_3) with :math:`h` being the step-size and :math:`y_n` the previous solution to the equation. """ def __init__(self, problem): Explicit_ODE.__init__(self, problem) #Calls the base class #Solver options self.options["h"] = 0.01 #Internal temporary result vector self.Y1 = N.array([0.0]*len(self.y0)) self.Y2 = N.array([0.0]*len(self.y0)) self.Y3 = N.array([0.0]*len(self.y0)) self.Y4 = N.array([0.0]*len(self.y0)) #RHS-Function self.f = problem.rhs_internal #Solver support self.supports["one_step_mode"] = True def step(self, t, y, tf, opts): initialize = opts["initialize"] if initialize: self.solver_iterator = self._iter(t,y,tf) return self.solver_iterator.next()
[docs] def integrate(self, t, y, tf, opts): """ Integrates (t,y) values until t > tf """ [flags, tlist, ylist] = zip(*list(self._iter(t, y, tf))) return flags[-1], tlist, ylist
def _set_h(self,h): try: self.options["h"] = float(h) except: raise AssimuloException("Step-size must be a (scalar) float.") def _get_h(self): """ Defines the step-size that is to be used by the solver. Parameters:: maxh - Default '0.01'. - Should be a float. Example: maxh = 0.01 """ return self.options["h"] h=property(_get_h,_set_h) def _iter(self,t,y,tf): h = self.options["h"] h = min(h, N.abs(tf-t)) while t+h < tf: t, y = self._step(t, y, h) yield ID_PY_OK, t,y h=min(h, N.abs(tf-t)) else: t, y = self._step(t, y, h) yield ID_PY_COMPLETE, t, y def _step(self, t, y, h): """ This calculates the next step in the integration. """ f = self.f f(self.Y1, t, y) f(self.Y2, t + h/2., y + h*self.Y1/2.) f(self.Y3, t + h/2., y + h*self.Y2/2.) f(self.Y4, t + h, y + h*self.Y3) return t+h, y + h/6.*(self.Y1 + 2.*self.Y2 + 2.*self.Y3 + self.Y4)
[docs] def print_statistics(self, verbose): """ Should print the statistics. """ self.log_message('Final Run Statistics: %s \n' % self.problem.name, verbose) self.log_message(' Step-length : %s '%(self.options["h"]), verbose) self.log_message('\nSolver options:\n', verbose) self.log_message(' Solver : RungeKutta4', verbose) self.log_message(' Solver type : Fixed step\n', verbose)