#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (C) 2014 Modelon AB
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
This file contains code for mapping our JMI Models to the Problem specifications
required by Assimulo.
"""
import logging
import logging as logging_module
import numpy as N
import numpy.linalg as LIN
import scipy.sparse as sp
import time
from pyfmi.common.io import ResultWriterDymola
import pyfmi.fmi as fmi
from pyfmi.common import python3_flag
from pyfmi.common.core import TrajectoryLinearInterpolation
from timeit import default_timer as timer
try:
import assimulo
assimulo_present = True
except:
logging.warning(
'Could not load Assimulo module. Check pyfmi.check_packages()')
assimulo_present = False
if assimulo_present:
from assimulo.problem import Implicit_Problem
from assimulo.problem import Explicit_Problem
from assimulo.exception import *
else: #Create dummy classes
class Implicit_Problem:
pass
class Explicit_Problem:
pass
[docs]class FMIModel_Exception(Exception):
"""
A FMIModel Exception.
"""
pass
[docs]def write_data(simulator,write_scaled_result=False, result_file_name=''):
"""
Writes simulation data to a file. Takes as input a simulated model.
"""
#Determine the result file name
if result_file_name == '':
result_file_name=simulator.problem._model.get_name()+'_result.txt'
model = simulator.problem._model
t = N.array(simulator.problem._sol_time)
r = N.array(simulator.problem._sol_real)
data = N.c_[t,r]
if len(simulator.problem._sol_int) > 0 and len(simulator.problem._sol_int[0]) > 0:
i = N.array(simulator.problem._sol_int)
data = N.c_[data,i]
if len(simulator.problem._sol_bool) > 0 and len(simulator.problem._sol_bool[0]) > 0:
#b = N.array(simulator.problem._sol_bool).reshape(
# -1,len(model._save_bool_variables_val))
b = N.array(simulator.problem._sol_bool)
data = N.c_[data,b]
export = ResultWriterDymola(model)
export.write_header(file_name=result_file_name)
map(export.write_point,(row for row in data))
export.write_finalize()
#fmi.export_result_dymola(model, data)
[docs]def createLogger(model, minimum_level):
"""
Creates a logger.
"""
filename = model.get_name()+'.log'
log = logging.getLogger(filename)
log.setLevel(minimum_level)
#ch = logging.StreamHandler()
ch = logging.FileHandler(filename, mode='w', delay=True)
ch.setLevel(0)
formatter = logging.Formatter("%(name)s - %(message)s")
ch.setFormatter(formatter)
log.addHandler(ch)
return log
[docs]class FMIODE(Explicit_Problem):
"""
An Assimulo Explicit Model extended to FMI interface.
"""
def __init__(self, model, input=None, result_file_name='',
with_jacobian=False, start_time=0.0, logging=False, result_handler=None):
"""
Initialize the problem.
"""
self._model = model
self._adapt_input(input)
self.timings = {"handle_result": 0.0}
#Set start time to the model
self._model.time = start_time
self.t0 = start_time
self.y0 = self._model.continuous_states
self.name = self._model.get_name()
[f_nbr, g_nbr] = self._model.get_ode_sizes()
self._f_nbr = f_nbr
self._g_nbr = g_nbr
if g_nbr > 0:
self.state_events = self.g
self.time_events = self.t
#If there is no state in the model, add a dummy
#state der(y)=0
if f_nbr == 0:
self.y0 = N.array([0.0])
#Determine the result file name
if result_file_name == '':
self.result_file_name = model.get_name()+'_result.txt'
else:
self.result_file_name = result_file_name
self.debug_file_name = model.get_name().replace(".","_")+'_debug.txt'
self.debug_file_object = None
#Default values
self.export = result_handler
#Internal values
self._sol_time = []
self._sol_real = []
self._sol_int = []
self._sol_bool = []
self._logg_step_event = []
self._write_header = True
self._logging = logging
#Stores the first time point
#[r,i,b] = self._model.save_time_point()
#self._sol_time += [self._model.t]
#self._sol_real += [r]
#self._sol_int += [i]
#self._sol_bool += b
self._jm_fmu = self._model.get_generation_tool() == "JModelica.org"
if with_jacobian:
raise fmi.FMUException("Jacobians are not supported using FMI 1.0, please use FMI 2.0")
def _adapt_input(self, input):
if input != None:
input_value_refs = []
input_alias_type = []
if isinstance(input[0],str):
input_value_refs.append(self._model.get_variable_valueref(input[0]))
input_alias_type.append(-1.0 if self._model.get_variable_alias(input[0])[input[0]] == -1 else 1.0)
else:
for name in input[0]:
input_value_refs.append(self._model.get_variable_valueref(name))
input_alias_type.append(-1.0 if self._model.get_variable_alias(name)[name] == -1 else 1.0)
self.input_value_refs = input_value_refs
self.input_alias_type = input_alias_type if N.any(input_alias_type==-1) else 1.0
self.input = input
[docs] def rhs(self, t, y, sw=None):
"""
The rhs (right-hand-side) for an ODE problem.
"""
#Moving data to the model
self._model.time = t
#Check if there are any states
if self._f_nbr != 0:
self._model.continuous_states = y
#Sets the inputs, if any
if self.input!=None:
self._model.set_real(self.input_value_refs, self.input[1].eval(t)[0,:]*self.input_alias_type)
#Evaluating the rhs
try:
rhs = self._model.get_derivatives()
except fmi.FMUException:
raise AssimuloRecoverableError
#If there is no state, use the dummy
if self._f_nbr == 0:
rhs = N.array([0.0])
return rhs
[docs] def g(self, t, y, sw):
"""
The event indicator function for a ODE problem.
"""
#Moving data to the model
self._model.time = t
#Check if there are any states
if self._f_nbr != 0:
self._model.continuous_states = y
#Sets the inputs, if any
if self.input!=None:
self._model.set_real(self.input_value_refs, self.input[1].eval(t)[0,:]*self.input_alias_type)
#Evaluating the event indicators
eventInd = self._model.get_event_indicators()
return eventInd
[docs] def t(self, t, y, sw):
"""
Time event function.
"""
eInfo = self._model.get_event_info()
if eInfo.upcomingTimeEvent == True:
return eInfo.nextEventTime
else:
return None
[docs] def handle_result(self, solver, t, y):
"""
Post processing (stores the time points).
"""
time_start = timer()
#Moving data to the model
if t != self._model.time or (not self._f_nbr == 0 and not (self._model.continuous_states == y).all()):
#Moving data to the model
self._model.time = t
#Check if there are any states
if self._f_nbr != 0:
self._model.continuous_states = y
#Sets the inputs, if any
if self.input!=None:
self._model.set_real(self.input_value_refs, self.input[1].eval(t)[0,:]*self.input_alias_type)
#Evaluating the rhs (Have to evaluate the values in the model)
rhs = self._model.get_derivatives()
if self.export != None:
self.export.integration_point()
self.timings["handle_result"] += timer() - time_start
[docs] def handle_event(self, solver, event_info):
"""
This method is called when Assimulo finds an event.
"""
#Moving data to the model
if solver.t != self._model.time or (not self._f_nbr == 0 and not (self._model.continuous_states == solver.y).all()):
self._model.time = solver.t
#Check if there are any states
if self._f_nbr != 0:
self._model.continuous_states = solver.y
#Sets the inputs, if any
if self.input!=None:
self._model.set_real(self.input_value_refs, self.input[1].eval(N.array([solver.t]))[0,:]*self.input_alias_type)
#Evaluating the rhs (Have to evaluate the values in the model)
rhs = self._model.get_derivatives()
if self._logging:
str_ind = ""
for i in self._model.get_event_indicators():
str_ind += " %.14E"%i
str_states = ""
if self._f_nbr != 0:
for i in solver.y:
str_states += " %.14E"%i
str_der = ""
for i in self._model.get_derivatives():
str_der += " %.14E"%i
fwrite = self._get_debug_file_object()
fwrite.write("\nDetected event at t = %.14E \n"%solver.t)
fwrite.write(" State event info: "+" ".join(str(i) for i in event_info[0])+ "\n")
fwrite.write(" Time event info: "+str(event_info[1])+ "\n")
eInfo = self._model.get_event_info()
eInfo.iterationConverged = False
while eInfo.iterationConverged == False:
self._model.event_update(intermediateResult=False)
eInfo = self._model.get_event_info()
#Retrieve solutions (if needed)
#if eInfo.iterationConverged == False:
# pass
#Check if the event affected the state values and if so sets them
if eInfo.stateValuesChanged:
if self._f_nbr == 0:
solver.y[0] = 0.0
else:
solver.y = self._model.continuous_states
#Get new nominal values.
if eInfo.stateValueReferencesChanged:
if self._f_nbr == 0:
solver.atol = 0.01*solver.rtol*1
else:
solver.atol = 0.01*solver.rtol*self._model.nominal_continuous_states
#Check if the simulation should be terminated
if eInfo.terminateSimulation:
raise TerminateSimulation #Exception from Assimulo
if self._logging:
str_ind2 = ""
for i in self._model.get_event_indicators():
str_ind2 += " %.14E"%i
str_states2 = ""
if self._f_nbr != 0:
for i in solver.y:
str_states2 += " %.14E"%i
str_der2 = ""
for i in self._model.get_derivatives():
str_der2 += " %.14E"%i
fwrite.write(" Indicators (pre) : "+str_ind + "\n")
fwrite.write(" Indicators (post): "+str_ind2+"\n")
fwrite.write(" States (pre) : "+str_states + "\n")
fwrite.write(" States (post): "+str_states2 + "\n")
fwrite.write(" Derivatives (pre) : "+str_der + "\n")
fwrite.write(" Derivatives (post): "+str_der2 + "\n\n")
header = "Time (simulated) | Time (real) | "
if solver.__class__.__name__=="CVode": #Only available for CVode
header += "Order | Error (Weighted)"
if self._g_nbr > 0:
header += "Indicators"
fwrite.write(header+"\n")
[docs] def step_events(self, solver):
"""
Method which is called at each successful step.
"""
#Moving data to the model
if solver.t != self._model.time or (not self._f_nbr == 0 and not (self._model.continuous_states == solver.y).all()):
self._model.time = solver.t
#Check if there are any states
if self._f_nbr != 0:
self._model.continuous_states = solver.y
#Sets the inputs, if any
if self.input!=None:
self._model.set_real(self.input_value_refs, self.input[1].eval(N.array([solver.t]))[0,:]*self.input_alias_type)
#self._model.set(self.input[0],self.input[1].eval(N.array([solver.t]))[0,:])
#Evaluating the rhs (Have to evaluate the values in the model)
rhs = self._model.get_derivatives()
if self._logging:
if self._jm_fmu:
solver_name = solver.__class__.__name__
preface = "[INFO][FMU status:OK] "
msg = preface + '<%s>Successfully computed a step at <value name="time"> %.14E</value>. Elapsed time for the step <value name="t"> %.14E</value>"'%(solver_name,solver.t,solver.get_elapsed_step_time())
self._model.append_log_message("Model", 6, msg)
if solver_name == "CVode":
msg = preface + ' <vector name="weighted_error">' + " ".join([" %.14E"%e for e in solver.get_local_errors()*solver.get_error_weights()])+"</vector>"
self._model.append_log_message("Model", 6, msg)
msg = preface + ' <vector name="order">%d</vector>'%solver.get_last_order()
self._model.append_log_message("Model", 6, msg)
#End tag
msg = preface + "</%s>"%solver_name
self._model.append_log_message("Model", 6, msg)
data_line = "%.14E"%solver.t+" | %.14E"%(solver.get_elapsed_step_time())
if solver.__class__.__name__=="CVode": #Only available for CVode
ele = solver.get_local_errors()
eweight = solver.get_error_weights()
err = ele*eweight
str_err = " |"
for i in err:
str_err += " %.14E"%i
data_line += " | %d"%solver.get_last_order()+str_err
if self._g_nbr > 0:
str_ev = " |"
for i in self._model.get_event_indicators():
str_ev += " %.14E"%i
data_line += str_ev
fwrite = self._get_debug_file_object()
fwrite.write(data_line+"\n")
if self._model.completed_integrator_step():
self._logg_step_event += [solver.t]
#Event have been detect, call event iteration.
#print "Step event detected at: ", solver.t
#self.handle_event(solver,[0])
return 1 #Tell to reinitiate the solver.
else:
return 0
[docs] def print_step_info(self):
"""
Prints the information about step events.
"""
print('\nStep-event information:\n')
for i in range(len(self._logg_step_event)):
print('Event at time: %e'%self._logg_step_event[i])
print('\nNumber of events: ',len(self._logg_step_event))
def _get_debug_file_object(self):
if not self.debug_file_object:
self.debug_file_object = open(self.debug_file_name, 'a')
return self.debug_file_object
[docs] def initialize(self, solver):
if self._logging:
self.debug_file_object = open(self.debug_file_name, 'w')
f = self.debug_file_object
model_valref = self._model.get_state_value_references()
names = ""
for i in model_valref:
names += self._model.get_variable_by_valueref(i) + ", "
f.write("Solver: %s \n"%solver.__class__.__name__)
f.write("State variables: "+names+ "\n")
str_y = ""
if self._f_nbr != 0:
for i in solver.y:
str_y += " %.14E"%i
f.write("Initial values: t = %.14E \n"%solver.t)
f.write("Initial values: y ="+str_y+"\n\n")
header = "Time (simulated) | Time (real) | "
if solver.__class__.__name__=="CVode": #Only available for CVode
header += "Order | Error (Weighted)"
f.write(header+"\n")
[docs] def finalize(self, solver):
if self.export != None:
self.export.simulation_end()
if self.debug_file_object:
self.debug_file_object.close()
self.debug_file_object = None
def _set_input(self, input):
self.__input = input
def _get_input(self):
return self.__input
input = property(_get_input, _set_input, doc =
"""
Property for accessing the input. The input must be a 2-tuple with the first
object as a list of names of the input variables and with the other as a
subclass of the class Trajectory.
""")
[docs]class FMIODESENS(FMIODE):
"""
FMIODE extended with sensitivity simulation capabilities
"""
def __init__(self, model, input=None, result_file_name='',
with_jacobian=False, start_time=0.0, parameters=None, logging=False, result_handler=None):
#Call FMIODE init method
FMIODE.__init__(self, model, input, result_file_name, with_jacobian,
start_time,logging,result_handler)
#Store the parameters
if parameters != None:
if not isinstance(parameters,list):
raise FMIModel_Exception("Parameters must be a list of names.")
self.p0 = N.array(model.get(parameters)).flatten()
self.pbar = N.array([N.abs(x) if N.abs(x) > 0 else 1.0 for x in self.p0])
self.parameters = parameters
[docs] def rhs(self, t, y, p=None, sw=None):
#Sets the parameters, if any
if self.parameters != None:
self._model.set(self.parameters, p)
return FMIODE.rhs(self,t,y,sw)
[docs] def j(self, t, y, p=None, sw=None):
#Sets the parameters, if any
if self.parameters != None:
self._model.set(self.parameters, p)
return FMIODE.j(self,t,y,sw)
[docs] def handle_result(self, solver, t, y):
#
#Post processing (stores the time points).
#
time_start = timer()
#Moving data to the model
if t != self._model.time or (not self._f_nbr == 0 and not (self._model.continuous_states == y).all()):
#Moving data to the model
self._model.time = t
#Check if there are any states
if self._f_nbr != 0:
self._model.continuous_states = y
#Sets the inputs, if any
if self.input!=None:
self._model.set(self.input[0], self.input[1].eval(t)[0,:])
#Evaluating the rhs (Have to evaluate the values in the model)
rhs = self._model.get_derivatives()
#Sets the parameters, if any
if self.parameters != None:
p_data = N.array(solver.interpolate_sensitivity(t, 0)).flatten()
self.export.integration_point(solver)#parameter_data=p_data)
self.timings["handle_result"] += timer() - time_start
[docs]class FMIODE2(Explicit_Problem):
"""
An Assimulo Explicit Model extended to FMI interface.
"""
def __init__(self, model, input=None, result_file_name='',
with_jacobian=False, start_time=0.0, logging=False,
result_handler=None, extra_equations=None):
"""
Initialize the problem.
"""
self._model = model
self._adapt_input(input)
self.input_names = []
self.timings = {"handle_result": 0.0}
#Set start time to the model
self._model.time = start_time
self.t0 = start_time
self.y0 = self._model.continuous_states
self.problem_name = self._model.get_name()
[f_nbr, g_nbr] = self._model.get_ode_sizes()
self._f_nbr = f_nbr
self._g_nbr = g_nbr
self._A = None
if g_nbr > 0:
self.state_events = self.g
self.time_events = self.t
#If there is no state in the model, add a dummy state der(y)=0
if f_nbr == 0:
self.y0 = N.array([0.0])
#Determine the result file name
if result_file_name == '':
self.result_file_name = model.get_name()+'_result.txt'
else:
self.result_file_name = result_file_name
self.debug_file_name = model.get_name().replace(".","_")+'_debug.txt'
self.debug_file_object = None
#Default values
self.export = result_handler
#Internal values
self._sol_time = []
self._sol_real = []
self._sol_int = []
self._sol_bool = []
self._logg_step_event = []
self._write_header = True
self._logging = logging
self._sparse_representation = False
if f_nbr > 0 and with_jacobian:
self.jac = self.j #Activates the jacobian
#Need to calculate the nnz.
[derv_state_dep, derv_input_dep] = model.get_derivatives_dependencies()
self.jac_nnz = N.sum([len(derv_state_dep[key]) for key in derv_state_dep.keys()])+f_nbr
if extra_equations:
self._extra_f_nbr = extra_equations.get_size()
self._extra_y0 = extra_equations.y0
self.y0 = N.append(self.y0, self._extra_y0)
self._extra_equations = extra_equations
if hasattr(self._extra_equations, "jac"):
if hasattr(self._extra_equations, "jac_nnz"):
self.jac_nnz += extra_equations.jac_nnz
else:
self.jac_nnz += len(self._extra_f_nbr)*len(self._extra_f_nbr)
else:
self._extra_f_nbr = 0
def _adapt_input(self, input):
if input != None:
input_names = input[0]
self.input_len_names = len(input_names)
self.input_real_value_refs = []
self.input_real_mask = []
self.input_other = []
self.input_other_mask = []
if isinstance(input_names,str):
input_names = [input_names]
for i,name in enumerate(input_names):
if self._model.get_variable_causality(name) != fmi.FMI2_INPUT:
raise fmi.FMUException("Variable '%s' is not an input. Only variables specified to be inputs are allowed."%name)
if self._model.get_variable_data_type(name) == fmi.FMI2_REAL:
self.input_real_value_refs.append(self._model.get_variable_valueref(name))
self.input_real_mask.append(i)
else:
self.input_other.append(name)
self.input_other_mask.append(i)
self.input_real_mask = N.array(self.input_real_mask)
self.input_other_mask = N.array(self.input_other_mask)
self.input = input
def _set_input_values(self, t):
if self.input is not None:
values = self.input[1].eval(t)[0,:]
if self.input_real_value_refs:
self._model.set_real(self.input_real_value_refs, values[self.input_real_mask])
if self.input_other:
self._model.set(self.input_other, values[self.input_other_mask])
[docs] def rhs(self, t, y, sw=None):
"""
The rhs (right-hand-side) for an ODE problem.
"""
if self._extra_f_nbr > 0:
y_extra = y[-self._extra_f_nbr:]
y = y[:-self._extra_f_nbr]
#Moving data to the model
self._model.time = t
#Check if there are any states
if self._f_nbr != 0:
self._model.continuous_states = y
#Sets the inputs, if any
self._set_input_values(t)
#Evaluating the rhs
try:
rhs = self._model.get_derivatives()
except fmi.FMUException:
raise AssimuloRecoverableError
#If there is no state, use the dummy
if self._f_nbr == 0:
rhs = N.array([0.0])
if self._extra_f_nbr > 0:
rhs = N.append(rhs, self._extra_equations.rhs(y_extra))
return rhs
[docs] def j(self, t, y, sw=None):
"""
The jacobian function for an ODE problem.
"""
if self._extra_f_nbr > 0:
y_extra = y[-self._extra_f_nbr:]
y = y[:-self._extra_f_nbr]
#Moving data to the model
self._model.time = t
#Check if there are any states
if self._f_nbr != 0:
self._model.continuous_states = y
#Sets the inputs, if any
self._set_input_values(t)
#Evaluating the jacobian
#If there are no states return a dummy jacobian.
if self._f_nbr == 0:
return N.array([[0.0]])
A = self._model._get_A(add_diag=True, output_matrix=self._A)
if self._A is None:
self._A = A
if self._extra_f_nbr > 0:
if hasattr(self._extra_equations, "jac"):
if self._sparse_representation:
Jac = A.tocoo() #Convert to COOrdinate
A2 = self._extra_equations.jac(y_extra).tocoo()
data = N.append(Jac.data, A2.data)
row = N.append(Jac.row, A2.row+self._f_nbr)
col = N.append(Jac.col, A2.col+self._f_nbr)
#Convert to compresssed sparse column
Jac = sp.coo_matrix((data, (row, col)))
Jac = Jac.tocsc()
else:
Jac = N.zeros((self._f_nbr+self._extra_f_nbr,self._f_nbr+self._extra_f_nbr))
Jac[:self._f_nbr,:self._f_nbr] = A if isinstance(A, N.ndarray) else A.toarray()
Jac[self._f_nbr:,self._f_nbr:] = self._extra_equations.jac(y_extra)
else:
raise fmi.FMUException("No Jacobian provided for the extra equations")
else:
Jac = A
return Jac
[docs] def g(self, t, y, sw):
"""
The event indicator function for a ODE problem.
"""
if self._extra_f_nbr > 0:
y_extra = y[-self._extra_f_nbr:]
y = y[:-self._extra_f_nbr]
#Moving data to the model
self._model.time = t
#Check if there are any states
if self._f_nbr != 0:
self._model.continuous_states = y
#Sets the inputs, if any
self._set_input_values(t)
#Evaluating the event indicators
eventInd = self._model.get_event_indicators()
return eventInd
[docs] def t(self, t, y, sw):
"""
Time event function.
"""
eInfo = self._model.get_event_info()
if eInfo.nextEventTimeDefined == True:
return eInfo.nextEventTime
else:
return None
[docs] def handle_result(self, solver, t, y):
"""
Post processing (stores the time points).
"""
time_start = timer()
if self._extra_f_nbr > 0:
y_extra = y[-self._extra_f_nbr:]
y = y[:-self._extra_f_nbr]
#Moving data to the model
if t != self._model.time or (not self._f_nbr == 0 and not (self._model.continuous_states == y).all()):
#Moving data to the model
self._model.time = t
#Check if there are any states
if self._f_nbr != 0:
self._model.continuous_states = y
#Sets the inputs, if any
self._set_input_values(t)
#Evaluating the rhs (Have to evaluate the values in the model)
rhs = self._model.get_derivatives()
self.export.integration_point(solver)
if self._extra_f_nbr > 0:
self._extra_equations.handle_result(self.export, y_extra)
self.timings["handle_result"] += timer() - time_start
[docs] def handle_event(self, solver, event_info):
"""
This method is called when Assimulo finds an event.
"""
if self._extra_f_nbr > 0:
y_extra = solver.y[-self._extra_f_nbr:]
y = solver.y[:-self._extra_f_nbr]
else:
y = solver.y
#Moving data to the model
if solver.t != self._model.time or (not self._f_nbr == 0 and not (self._model.continuous_states == solver.y).all()):
self._model.time = solver.t
#Check if there are any states
if self._f_nbr != 0:
self._model.continuous_states = y
#Sets the inputs, if any
self._set_input_values(solver.t)
#Evaluating the rhs (Have to evaluate the values in the model)
rhs = self._model.get_derivatives()
if self._logging:
str_ind = ""
for i in self._model.get_event_indicators():
str_ind += " %.14E"%i
str_states = ""
if self._f_nbr != 0:
for i in y:
str_states += " %.14E"%i
str_der = ""
for i in self._model.get_derivatives():
str_der += " %.14E"%i
fwrite = self._get_debug_file_object()
fwrite.write("\nDetected event at t = %.14E \n"%solver.t)
fwrite.write(" State event info: "+" ".join(str(i) for i in event_info[0])+ "\n")
fwrite.write(" Time event info: "+str(event_info[1])+ "\n")
#Enter event mode
self._model.enter_event_mode()
self._model.event_update()
eInfo = self._model.get_event_info()
#Check if the event affected the state values and if so sets them
if eInfo.valuesOfContinuousStatesChanged:
if self._extra_f_nbr > 0:
solver.y = self._model.continuous_states.append(solver.y[-self._extra_f_nbr:])
else:
solver.y = self._model.continuous_states
#Get new nominal values.
if eInfo.nominalsOfContinuousStatesChanged:
solver.atol = 0.01*solver.rtol*self._model.nominal_continuous_states
#Check if the simulation should be terminated
if eInfo.terminateSimulation:
raise TerminateSimulation #Exception from Assimulo
if self._logging:
str_ind2 = ""
for i in self._model.get_event_indicators():
str_ind2 += " %.14E"%i
str_states2 = ""
if self._f_nbr != 0:
for i in solver.y:
str_states2 += " %.14E"%i
str_der2 = ""
for i in self._model.get_derivatives():
str_der2 += " %.14E"%i
fwrite = self._get_debug_file_object()
fwrite.write(" Indicators (pre) : "+str_ind + "\n")
fwrite.write(" Indicators (post): "+str_ind2+"\n")
fwrite.write(" States (pre) : "+str_states + "\n")
fwrite.write(" States (post): "+str_states2 + "\n")
fwrite.write(" Derivatives (pre) : "+str_der + "\n")
fwrite.write(" Derivatives (post): "+str_der2 + "\n\n")
header = "Time (simulated) | Time (real) | "
if solver.__class__.__name__=="CVode" or solver.__class__.__name__=="Radau5ODE": #Only available for CVode
header += "Order | Error (Weighted)"
if self._g_nbr > 0:
header += "Indicators"
fwrite.write(header+"\n")
#Enter continuous mode again
self._model.enter_continuous_time_mode()
[docs] def step_events(self, solver):
"""
Method which is called at each successful step.
"""
if self._extra_f_nbr > 0:
y_extra = solver.y[-self._extra_f_nbr:]
y = solver.y[:-self._extra_f_nbr]
else:
y = solver.y
#Moving data to the model
if solver.t != self._model.time or (not self._f_nbr == 0 and not (self._model.continuous_states == y).all()):
self._model.time = solver.t
#Check if there are any states
if self._f_nbr != 0:
self._model.continuous_states = y
#Sets the inputs, if any
self._set_input_values(solver.t)
#Evaluating the rhs (Have to evaluate the values in the model)
rhs = self._model.get_derivatives()
if self._logging:
data_line = "%.14E"%solver.t+" | %.14E"%(solver.get_elapsed_step_time())
if solver.__class__.__name__=="CVode" or solver.__class__.__name__=="Radau5ODE":
err = solver.get_weighted_local_errors()
str_err = " |"
for i in err:
str_err += " %.14E"%i
if solver.__class__.__name__=="CVode":
data_line += " | %d"%solver.get_last_order()+str_err
else:
data_line += " | 5"+str_err
if self._g_nbr > 0:
str_ev = " |"
for i in self._model.get_event_indicators():
str_ev += " %.14E"%i
data_line += str_ev
fwrite = self._get_debug_file_object()
fwrite.write(data_line+"\n")
enter_event_mode, terminate_simulation = self._model.completed_integrator_step()
if enter_event_mode:
self._logg_step_event += [solver.t]
#Event have been detect, call event iteration.
self.handle_event(solver,[0])
return 1 #Tell to reinitiate the solver.
else:
return 0
def _get_debug_file_object(self):
if not self.debug_file_object:
self.debug_file_object = open(self.debug_file_name, 'a')
return self.debug_file_object
[docs] def print_step_info(self):
"""
Prints the information about step events.
"""
print('\nStep-event information:\n')
for i in range(len(self._logg_step_event)):
print('Event at time: %e'%self._logg_step_event[i])
print('\nNumber of events: ',len(self._logg_step_event))
[docs] def initialize(self, solver):
if hasattr(solver,"linear_solver"):
if solver.linear_solver == "SPARSE":
self._sparse_representation = True
if self._logging:
self.debug_file_object = open(self.debug_file_name, 'w')
f = self.debug_file_object
names = ""
for i in range(self._f_nbr):
names += self._model.get_states_list().keys()[i] + ", "
f.write("Solver: %s \n"%solver.__class__.__name__)
f.write("State variables: "+names+ "\n")
str_y = ""
if self._f_nbr != 0:
for i in solver.y:
str_y += " %.14E"%i
f.write("Initial values: t = %.14E \n"%solver.t)
f.write("Initial values: y ="+str_y+"\n\n")
header = "Time (simulated) | Time (real) | "
if solver.__class__.__name__=="CVode" or solver.__class__.__name__=="Radau5ODE": #Only available for CVode
header += "Order | Error (Weighted)"
f.write(header+"\n")
[docs] def finalize(self, solver):
self.export.simulation_end()
if self.debug_file_object:
self.debug_file_object.close()
self.debug_file_object = None
def _set_input(self, input):
self.__input = input
def _get_input(self):
return self.__input
input = property(_get_input, _set_input, doc =
"""
Property for accessing the input. The input must be a 2-tuple with the first
object as a list of names of the input variables and with the other as a
subclass of the class Trajectory.
""")
[docs]class FMIODESENS2(FMIODE2):
"""
FMIODE2 extended with sensitivity simulation capabilities
"""
def __init__(self, model, input=None, result_file_name='',
with_jacobian=False, start_time=0.0, logging=False,
result_handler=None, extra_equations=None, parameters=None):
#Call FMIODE init method
FMIODE2.__init__(self, model, input, result_file_name, with_jacobian,
start_time,logging, result_handler, extra_equations)
#Store the parameters
if parameters != None:
if not isinstance(parameters,list):
raise FMIModel_Exception("Parameters must be a list of names.")
self.p0 = N.array(model.get(parameters)).flatten()
self.pbar = N.array([N.abs(x) if N.abs(x) > 0 else 1.0 for x in self.p0])
self.param_valref = [model.get_variable_valueref(x) for x in parameters]
for param in parameters:
if model.get_variable_causality(param) != fmi.FMI2_INPUT and \
(model.get_generation_tool() != "JModelica.org" and model.get_generation_tool() != "Optimica Compiler Toolkit"):
raise FMIModel_Exception("The sensitivity parameters must be specified as inputs!")
self.parameters = parameters
if python3_flag:
self.derivatives = [v.value_reference for i,v in model.get_derivatives_list().items()]
else:
self.derivatives = [v.value_reference for i,v in model.get_derivatives_list().iteritems()]
if self._model.get_capability_flags()['providesDirectionalDerivatives']:
use_rhs_sens = True
for param in parameters:
if model.get_variable_causality(param) != fmi.FMI2_INPUT and \
(model.get_generation_tool() == "JModelica.org" or model.get_generation_tool() == "Optimica Compiler Toolkit"):
use_rhs_sens = False
logging_module.warning("The sensitivity parameters must be specified as inputs in order to set up the sensitivity " \
"equations using directional derivatives. Disabling and using finite differences instead.")
if use_rhs_sens:
self.rhs_sens = self.s #Activates the jacobian
[docs] def rhs(self, t, y, p=None, sw=None):
#Sets the parameters, if any
if self.parameters != None:
self._model.set(self.parameters, p)
return FMIODE2.rhs(self,t,y,sw)
[docs] def j(self, t, y, p=None, sw=None):
#Sets the parameters, if any
if self.parameters != None:
self._model.set(self.parameters, p)
return FMIODE2.j(self,t,y,sw)
[docs] def s(self, t, y, s, p=None, sw=None):
# ds = df/dy s + df/dp
J = self.j(t,y,p,sw)
sens_rhs = J.dot(s)
for i,param in enumerate(self.param_valref):
dfdpi = self._model.get_directional_derivative([param], self.derivatives, [1])
sens_rhs[:,i] += dfdpi
return sens_rhs