#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (C) 2010 Modelon AB
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Module for writing optimization and simulation results to file.
"""
from operator import itemgetter, attrgetter
import array
import codecs
import re
import sys
import numpy as N
import numpy as np
import scipy.io
from . import xmlparser
import pyfmi.fmi as fmi
import pyfmi.fmi_util as fmi_util
from . import python3_flag
SYS_LITTLE_ENDIAN = sys.byteorder == 'little'
[docs]class Trajectory:
"""
Class for storing a time series.
"""
def __init__(self,t,x):
"""
Constructor for the Trajectory class.
Parameters::
t --
Abscissa of the trajectory.
x --
The ordinate of the trajectory.
"""
self.t = t
self.x = x
[docs]class ResultStorage:
pass
[docs] def get_variable_data(self, key):
raise NotImplementedError
[docs] def is_variable(self, var):
raise NotImplementedError
[docs] def is_negated(self, var):
raise NotImplementedError
[docs]class ResultHandler:
[docs] def simulation_start(self):
"""
This method is called before the simulation has started and
after the initialization call for the FMU.
"""
pass
[docs] def initialize_complete(self):
"""
This method is called after the initialization method of the FMU
has been been called.
"""
pass
[docs] def integration_point(self, solver=None):
"""
This method is called for each time-point for which result are
to be stored as indicated by the "number of communcation points"
provided to the simulation method.
"""
pass
[docs] def simulation_end(self):
"""
This method is called at the end of a simulation.
"""
pass
[docs] def set_options(self, options):
"""
Options are the options dictionary provided to the simulation
method.
"""
pass
[docs] def get_result(self):
"""
Method for retrieving the result. This method should return a
result of an instance of ResultStorage or of an instance of a
subclass of ResultStorage.
"""
raise NotImplementedError
[docs]class ResultDymola:
"""
Base class for representation of a result file.
"""
[docs] def get_variable_index(self,name):
"""
Retrieve the index in the name vector of a given variable.
Parameters::
name --
Name of variable.
Returns::
In integer index.
"""
#Strip name of spaces, for instace a[2, 1] to a[2,1]
name = name.replace(" ", "")
try:
#return self.name.index(name)
return self.name_lookup[name]
except KeyError as ex:
#except ValueError as ex:
#Variable was not found so check if it was a derivative variable
#and check if there exists a variable with another naming
#convention
if self._check_if_derivative_variable(name):
try:
#First do a simple search for the other naming convention
#return self.name.index(self._convert_dx_name(name))
return self.name_lookup[self._convert_dx_name(name)]
#except ValueError as ex:
except KeyError as ex:
return self._exhaustive_search_for_derivatives(name)
else:
raise VariableNotFoundError("Cannot find variable " +
name + " in data file.")
def _check_if_derivative_variable(self, name):
"""
Check if a variable is a derivative variable or not.
"""
if name.startswith("der(") or name.split(".")[-1].startswith("der("):
return True
else:
return False
def _exhaustive_search_for_derivatives(self, name):
"""
Perform an exhaustive search for a derivative variable by
first retrieving the underlying state and for each its alias
check if there exists a derivative variable.
"""
#Find alias for name
state = self._find_underlying_state(name)
index = self.get_variable_index(state)
alias_index = N.where(self.dataInfo[:,1]==self.dataInfo[index,1])[0]
#Loop through all alias
for ind in alias_index:
#Get the trial name
trial_name = self.name[ind]
#Create the derivative name
der_trial_name = self._create_derivative_from_state(trial_name)
try:
#return self.name.index(der_trial_name)
return self.name_lookup[der_trial_name]
#except ValueError as ex:
except KeyError as ex:
try:
#return self.name.index(self._convert_dx_name(der_trial_name))
return self.name_lookup[self._convert_dx_name(der_trial_name)]
except KeyError as ex:
#except ValueError as ex:
pass
else:
raise VariableNotFoundError("Cannot find variable " +
name + " in data file.")
def _find_underlying_state(self, name):
"""
Finds the underlying state of a derivative variable. der(PI.x)
-> PI.x.
"""
spl = name.split(".")
if spl[0].startswith("der("):
spl[0] = spl[0][4:] #Remove der(
spl[-1] = spl[-1][:-1] #Remove )
return ".".join(spl)
elif spl[-1].startswith("der("):
spl[-1] = spl[-1][4:] #Remove der(
spl[-1] = spl[-1][:-1] #Remove )
return ".".join(spl)
else:
return name
def _create_derivative_from_state(self, name):
"""
Create a derivative variable from a state by adding for instance
to PI.x -> der(PI.x).
"""
return "der("+name+")"
def _convert_dx_name(self, name):
"""
Internal method for converting the derivative variable into the
"other" convention. A derivative variable can either be on the
form PI.der(x) and der(PI.x).
Returns the original name if the name was not a derivative name.
"""
spl = name.split(".") #Split name
if spl[0].startswith("der("): #der(PI.x)
spl[0] = spl[0][4:] #Remove der
spl[-1] = "der("+spl[-1] #Add der
return ".".join(spl) #PI.der(x)
elif spl[-1].startswith("der("): #PI.der(x)
spl[0] = "der("+spl[0] #Add der
spl[-1] = spl[-1][4:] #Remove der
return ".".join(spl)
else: #Variable was not a derivative variable
return name
[docs]class ResultCSVTextual:
def __init__(self, filename, delimiter=";"):
fid = codecs.open(filename,'r','utf-8')
if delimiter == ";":
name = fid.readline().strip().split(delimiter)
elif delimiter == ",":
name = [s[1:-1] for s in re.findall('".+?"', fid.readline().strip())]
else:
raise JIOError('Unsupported separator.')
self.name = name
self.data_matrix = {}
for i,n in enumerate(name):
self.data_matrix[n] = i
data = []
while True:
row = fid.readline().strip().split(delimiter)
if row[-1] == "" or row[-1] == "\n":
break
data.append([float(d) for d in row])
self.data = N.array(data)
[docs] def get_variable_data(self,name):
"""
Retrieve the data sequence for a variable with a given name.
Parameters::
name --
Name of the variable.
Returns::
A Trajectory object containing the time vector and the data vector
of the variable.
"""
if name == 'time':
return Trajectory(self.data[:,0],self.data[:,0])
else:
return Trajectory(self.data[:,0],self.data[:,self.data_matrix[name]])
[docs] def is_variable(self, name):
return True
[docs] def is_negated(self, name):
return False
[docs] def get_data_matrix(self):
"""
Returns the result matrix.
Returns::
The result data matrix.
"""
return self.data
[docs]class ResultWriter():
"""
Base class for writing results to file.
"""
[docs] def write_point():
"""
This method does the writing of the actual result.
"""
pass
[docs] def write_finalize():
"""
The finalize method can be used to for instance close the file.
"""
pass
[docs]class ResultWriterDymola(ResultWriter):
"""
Export an optimization or simulation result to file in Dymola's result file
format.
"""
def __init__(self, model, format='txt'):
"""
Export an optimization or simulation result to file in Dymolas result
file format.
Parameters::
model --
A FMIModel object.
format --
A text string equal either to 'txt' for textual format or 'mat'
for binary Matlab format.
Default: 'txt'
Limitations::
Currently only textual format is supported.
"""
self.model = model
if format!='txt':
raise JIOError('The format is currently not supported.')
#Internal values
self._file_open = False
self._npoints = 0
[docs] def write_point(self, data=None, parameter_data=[]):
"""
Writes the current status of the model to file. If the header has not
been written previously it is written now. If data is specified it is
written instead of the current status.
Parameters::
data --
A one dimensional array of variable trajectory data. data
should consist of information about the status in the order
specified by FMUModel.save_time_point()
Default: None
"""
f = self._file
data_order = self._data_order
#If data is none, store the current point from the model
if data==None:
#Retrieves the time-point
[r,i,b] = self.model.save_time_point()
data = N.append(N.append(N.append(self.model.time,r),i),b)
#Write the point
str_text = (" %.14E" % data[0])
for j in range(self._nvariables-1):
str_text = str_text + (" %.14E" % (data[1+data_order[j]]))
for j in range(len(parameter_data)):
str_text = str_text + (" %.14E" % (parameter_data[j]))
f.write(str_text+'\n')
#Update number of points
self._npoints+=1
[docs] def write_finalize(self):
"""
Finalize the writing by filling in the blanks in the created file. The
blanks consists of the number of points and the final time (in data set
1). Also closes the file.
"""
#If open, finalize and close
if self._file_open:
f = self._file
f.seek(self._point_last_t)
f.write('%.14E'%self.model.time)
f.seek(self._point_npoints)
f.write('%d,%d)' % (self._npoints, self._nvariables+self._nvariables_sens))
#f.write('%d'%self._npoints)
f.seek(-1,2)
#Close the file
f.write('\n')
f.close()
self._file_open = False
[docs]class ResultStorageMemory(ResultDymola):
"""
Class representing a simulation result that is kept in MEMORY.
"""
def __init__(self, model, data, vars_ref, vars):
"""
Load result from the ResultHandlerMemory
Parameters::
model --
Instance of the FMUModel.
data --
The simulation data.
"""
self.model = model
self.vars = vars
self.name = [var.name for var in vars.values()]
self.data = {}
self.data_matrix = data
#time real integer boolean
real_val_ref = vars_ref[0]
integer_val_ref = vars_ref[1]
boolean_val_ref = vars_ref[2]
self.time = data[:,0]
for i,ref in enumerate(real_val_ref+integer_val_ref+boolean_val_ref):
self.data[ref] = data[:,i+1]
[docs] def get_variable_data(self,name):
"""
Retrieve the data sequence for a variable with a given name.
Parameters::
name --
Name of the variable.
Returns::
A Trajectory object containing the time vector and the data vector
of the variable.
"""
if name == 'time':
return Trajectory(self.time,self.time)
else:
try:
var = self.vars[name]
except KeyError as ex:
raise VariableNotFoundError("Cannot find variable " +
name + " in data file.")
factor = -1 if var.alias == fmi.FMI_NEGATED_ALIAS else 1
if var.variability == fmi.FMI_CONSTANT or var.variability == fmi.FMI_PARAMETER:
return Trajectory([self.time[0],self.time[-1]],N.array([self.model.get(name),self.model.get(name)]).ravel())
else:
return Trajectory(self.time,factor*self.data[var.value_reference])
[docs] def is_variable(self, name):
"""
Returns True if the given name corresponds to a time-varying variable.
Parameters::
name --
Name of the variable/parameter/constant.
Returns::
True if the variable is time-varying.
"""
if name == 'time':
return True
variability = self.vars[name].variability
if variability == fmi.FMI_CONSTANT or variability == fmi.FMI_PARAMETER:
return False
else:
return True
[docs] def is_negated(self, name):
"""
Returns True if the given name corresponds to a negated result vector.
Parameters::
name --
Name of the variable/parameter/constant.
Returns::
True if the result should be negated
"""
alias = self.vars[name].alias
if alias == fmi.FMI_NEGATED_ALIAS:
return True
else:
return False
[docs] def get_column(self, name):
"""
Returns the column number in the data matrix where the values of the
variable are stored.
Parameters::
name --
Name of the variable/parameter/constant.
Returns::
The column number.
"""
raise NotImplementedError
[docs] def get_data_matrix(self):
"""
Returns the result matrix.
Returns::
The result data matrix.
"""
return self.data_matrix
[docs]class ResultDymolaTextual(ResultDymola):
"""
Class representing a simulation or optimization result loaded from a Dymola
binary file.
"""
def __init__(self,fname):
"""
Load a result file written on Dymola textual format.
Parameters::
fname --
Name of file.
"""
fid = codecs.open(fname,'r','utf-8')
result = [];
# Read Aclass section
nLines = self._find_phrase(fid, 'char Aclass')
nLines = int(nLines[0])
Aclass = [fid.readline().strip() for i in range(nLines)]
#Aclass = []
#for i in range(0,nLines):
# Aclass.append(fid.readline().strip())
self.Aclass = Aclass
# Read name section
nLines = self._find_phrase(fid, 'char name')
nLines = int(nLines[0])
name = [fid.readline().strip().replace(" ","") for i in range(nLines)]
#name = []
#for i in range(0,nLines):
# name.append(fid.readline().strip().replace(" ",""))
self.name = name
self.name_lookup = {key:ind for ind,key in enumerate(self.name)}
# Read description section
nLines = self._find_phrase(fid, 'char description')
nLines = int(nLines[0])
description = [fid.readline().strip() for i in range(nLines)]
#description = []
#for i in range(0,nLines):
# description.append(fid.readline().strip())
self.description = description
# Read dataInfo section
nLines = self._find_phrase(fid, 'int dataInfo')
nCols = nLines[2].partition(')')
nLines = int(nLines[0])
nCols = int(nCols[0])
if python3_flag:
dataInfo = [list(map(int,fid.readline().split()[0:nCols])) for i in range(nLines)]
else:
dataInfo = [map(int,fid.readline().split()[0:nCols]) for i in range(nLines)]
self.dataInfo = N.array(dataInfo)
# Find out how many data matrices there are
nData = max(self.dataInfo[:,0])
self.data = []
for i in range(0,nData):
l = fid.readline()
tmp = l.partition(' ')
while tmp[0]!='float' and tmp[0]!='double' and l!='':
l = fid.readline()
tmp = l. partition(' ')
if l=='':
raise JIOError('The result does not seem to be of a supported format.')
tmp = tmp[2].partition('(')
nLines = tmp[2].partition(',')
nCols = nLines[2].partition(')')
nLines = int(nLines[0])
nCols = int(nCols[0])
data = []
for i in range(0,nLines):
info = []
while len(info) < nCols and l != '':
l = fid.readline()
info.extend(l.split())
try:
if python3_flag:
data.append(list(map(float,info[0:nCols])))
else:
data.append(map(float,info[0:nCols]))
except ValueError: #Handle 1.#INF's and such
if python3_flag:
data.append(list(map(robust_float,info[0:nCols])))
else:
data.append(map(robust_float,info[0:nCols]))
if len(info) == 0 and i < nLines-1:
raise JIOError("Inconsistent number of lines in the result data.")
del(info)
self.data.append(N.array(data))
if len(self.data) == 0:
raise JIOError('Could not find any variable data in the result file.')
def _find_phrase(self,fid, phrase):
l = fid.readline()
tmp = l.partition('(')
while tmp[0]!=phrase and l!='':
l = fid.readline()
tmp = l. partition('(')
if l=='':
raise JIOError("The result does not seem to be of a supported format.")
return tmp[2].partition(',')
[docs] def get_variable_data(self,name):
"""
Retrieve the data sequence for a variable with a given name.
Parameters::
name --
Name of the variable.
Returns::
A Trajectory object containing the time vector and the data vector
of the variable.
"""
if name == 'time' or name== 'Time':
varInd = 0
else:
varInd = self.get_variable_index(name)
dataInd = self.dataInfo[varInd][1]
factor = 1
if dataInd < 0:
factor = -1
dataInd = -dataInd -1
else:
dataInd = dataInd - 1
dataMat = self.dataInfo[varInd][0]-1
if dataMat < 0:
# Take into account that the 'Time' variable has data matrix index 0
# and that 'time' is called 'Time' in Dymola results
dataMat = 1 if len(self.data) > 1 else 0
return Trajectory(
self.data[dataMat][:,0],factor*self.data[dataMat][:,dataInd])
[docs] def is_variable(self, name):
"""
Returns True if the given name corresponds to a time-varying variable.
Parameters::
name --
Name of the variable/parameter/constant
Returns::
True if the variable is time-varying.
"""
if name == 'time' or name== 'Time':
return True
varInd = self.get_variable_index(name)
dataMat = self.dataInfo[varInd][0]-1
if dataMat<0:
dataMat = 0
if dataMat == 0:
return False
else:
return True
[docs] def is_negated(self, name):
"""
Returns True if the given name corresponds to a negated result vector.
Parameters::
name --
Name of the variable/parameter/constant.
Returns::
True if the result should be negated
"""
varInd = self.get_variable_index(name)
dataInd = self.dataInfo[varInd][1]
if dataInd<0:
return True
else:
return False
[docs] def get_column(self, name):
"""
Returns the column number in the data matrix where the values of the
variable are stored.
Parameters::
name --
Name of the variable/parameter/constant.
Returns::
The column number.
"""
if name == 'time' or name== 'Time':
return 0
if not self.is_variable(name):
raise VariableNotTimeVarying("Variable " +
name + " is not time-varying.")
varInd = self.get_variable_index(name)
dataInd = self.dataInfo[varInd][1]
factor = 1
if dataInd<0:
factor = -1
dataInd = -dataInd -1
else:
dataInd = dataInd - 1
return dataInd
[docs] def get_data_matrix(self):
"""
Returns the result matrix.
Returns::
The result data matrix.
"""
return self.data[1]
[docs] def shift_time(self,time_shift):
"""
Shift the time vector using a fixed offset.
Parameters::
time_shift --
The time shift offset.
"""
for i in range(len(self.data)):
for j in range(N.shape(self.data[i])[0]):
self.data[i][j,0] = self.data[i][j,0] + time_shift
[docs] def append(self, res):
"""
Append another simulation result. The time vector of the appended
trajectories is shifted so that the appended trajectories appears
after the original result trajectories.
Parameters::
res --
A simulation result object of type DymolaResultTextual.
"""
n_points = N.size(res.data[1],0)
time_shift = self.data[1][-1,0]
self.data[1] = N.vstack((self.data[1],res.data[1]))
self.data[1][n_points:,0] = self.data[1][n_points:,0] + time_shift
[docs]class ResultDymolaBinary(ResultDymola):
"""
Class representing a simulation or optimization result loaded from a Dymola
binary file.
"""
def __init__(self,fname):
"""
Load a result file written on Dymola binary format.
Parameters::
fname --
Name of file.
"""
self._fname = fname
self.raw = scipy.io.loadmat(fname,chars_as_strings=False, variable_names=["name", "dataInfo", "data_1", "data_2"])
name = self.raw['name']
#self.name = ["".join(name[:,i]).rstrip() for i in range(name[0,:].size)]
#self.name = fmi_util.convert_array_names_list_names(name)
self.name = fmi_util.convert_array_names_list_names_int(name.view(np.int32))
self.dataInfo = self.raw['dataInfo'].transpose()
#self.name = [
# array.array(
# 'u',
# name[:,i].tolist()).tounicode().rstrip().replace(" ","") \
# for i in range(0,name[0,:].size)]
self.name_lookup = {key:ind for ind,key in enumerate(self.name)}
self._description = None
def _get_description(self):
if not self._description:
self.name = ["".join(name[:,i]).rstrip() for i in range(name[0,:].size)]
description = scipy.io.loadmat(fname,chars_as_strings=False, variable_names=["description"])
self._description = ["".join(description[:,i]).rstrip() for i in range(description[0,:].size)]
#self._description = [
#array.array(
# 'u',
# description[:,i].tolist()).tounicode().rstrip() \
# for i in range(0,description[0,:].size)]
return self._description
description = property(_get_description, doc =
"""
Property for accessing the description vector.
""")
[docs] def get_variable_data(self,name):
"""
Retrieve the data sequence for a variable with a given name.
Parameters::
name --
Name of the variable.
Returns::
A Trajectory object containing the time vector and the data vector
of the variable.
"""
if name == 'time' or name== 'Time':
varInd = 0;
else:
varInd = self.get_variable_index(name)
dataInd = self.raw['dataInfo'][1][varInd]
dataMat = self.raw['dataInfo'][0][varInd]
factor = 1
if dataInd<0:
factor = -1
dataInd = -dataInd -1
else:
dataInd = dataInd - 1
if dataMat == 0:
# Take into account that the 'Time' variable has data matrix index 0
# and that 'time' is called 'Time' in Dymola results
dataMat = 2 if len(self.raw['data_2'])> 0 else 1
return Trajectory(self.raw['data_%d'%dataMat][0,:],factor*self.raw['data_%d'%dataMat][dataInd,:])
[docs] def is_variable(self, name):
"""
Returns True if the given name corresponds to a time-varying variable.
Parameters::
name --
Name of the variable/parameter/constant.
Returns::
True if the variable is time-varying.
"""
if name == 'time' or name== 'Time':
return True
varInd = self.get_variable_index(name)
dataMat = self.raw['dataInfo'][0][varInd]
if dataMat<1:
dataMat = 1
if dataMat == 1:
return False
else:
return True
[docs] def is_negated(self, name):
"""
Returns True if the given name corresponds to a negated result vector.
Parameters::
name --
Name of the variable/parameter/constant.
Returns::
True if the result should be negated
"""
varInd = self.get_variable_index(name)
dataInd = self.raw['dataInfo'][1][varInd]
if dataInd<0:
return True
else:
return False
[docs] def get_column(self, name):
"""
Returns the column number in the data matrix where the values of the
variable are stored.
Parameters::
name --
Name of the variable/parameter/constant.
Returns::
The column number.
"""
if name == 'time' or name== 'Time':
return 0
if not self.is_variable(name):
raise VariableNotTimeVarying("Variable " +
name + " is not time-varying.")
varInd = self.get_variable_index(name)
dataInd = self.raw['dataInfo'][1][varInd]
factor = 1
if dataInd<0:
factor = -1
dataInd = -dataInd -1
else:
dataInd = dataInd - 1
return dataInd
[docs] def get_data_matrix(self):
"""
Returns the result matrix.
Returns::
The result data matrix.
"""
return self.raw['data_%d'%2]
[docs]class ResultHandlerMemory(ResultHandler):
def __init__(self, model):
self.model = model
[docs] def simulation_start(self):
"""
This method is called before the simulation has started and before
the initialization of the model.
"""
model = self.model
opts = self.options
self.vars = model.get_model_variables(filter=opts["filter"])
#Store the continuous and discrete variables for result writing
self.real_var_ref, self.int_var_ref, self.bool_var_ref = model.get_model_time_varying_value_references(filter=opts["filter"])
self.real_sol = []
self.int_sol = []
self.bool_sol = []
self.time_sol = []
self.param_sol= []
self.model = model
[docs] def initialize_complete(self):
"""
This method is called after the initialization method of the FMU
has been been called.
"""
pass
[docs] def integration_point(self, solver = None):
"""
This method is called for each time-point for which result are
to be stored as indicated by the "number of communcation points"
provided to the simulation method.
"""
model = self.model
#Retrieves the time-point
self.time_sol += [model.time]
self.real_sol += [model.get_real(self.real_var_ref)]
self.int_sol += [model.get_integer(self.int_var_ref)]
self.bool_sol += [model.get_boolean(self.bool_var_ref)]
#Sets the parameters, if any
if solver and self.options["sensitivities"]:
self.param_sol += [N.array(solver.interpolate_sensitivity(model.time, 0)).flatten()]
[docs] def simulation_end(self):
"""
The finalize method can be used to for instance close the file.
ANd this method is called after the simulation has completed.
"""
pass
[docs] def get_result(self):
"""
Method for retrieving the result. This method should return a
result of an instance of ResultBase or of an instance of a
subclass of ResultBase.
"""
t = N.array(self.time_sol)
r = N.array(self.real_sol)
data = N.c_[t,r]
if len(self.int_sol) > 0 and len(self.int_sol[0]) > 0:
i = N.array(self.int_sol)
data = N.c_[data,i]
if len(self.bool_sol) > 0 and len(self.bool_sol[0]) > 0:
b = N.array(self.bool_sol)
data = N.c_[data,b]
return ResultStorageMemory(self.model, data, [self.real_var_ref,self.int_var_ref,self.bool_var_ref], self.vars)
[docs] def set_options(self, options):
"""
Options are the options dictionary provided to the simulation
method.
"""
self.options = options
[docs]class ResultHandlerCSV(ResultHandler):
def __init__(self, model, delimiter=";"):
self.model = model
self.delimiter = delimiter
[docs] def initialize_complete(self):
pass
[docs] def simulation_start(self):
"""
Opens the file and writes the header. This includes the information
about the variables and a table determining the link between variables
and data.
"""
opts = self.options
model = self.model
#Internal values
self.file_open = False
self.nbr_points = 0
delimiter = self.delimiter
self.file_name = opts["result_file_name"]
try:
self.parameters = opts["sensitivities"]
except KeyError:
self.parameters = None
if self.file_name == "":
self.file_name=self.model.get_identifier() + '_result.csv'
vars = model.get_model_variables(filter=opts["filter"])
const_valref_real = []
const_name_real = []
const_alias_real = []
const_valref_int = []
const_name_int = []
const_alias_int = []
const_valref_bool = []
const_name_bool = []
const_alias_bool = []
cont_valref_real = []
cont_name_real = []
cont_alias_real = []
cont_valref_int = []
cont_name_int = []
cont_alias_int = []
cont_valref_bool = []
cont_name_bool = []
cont_alias_bool = []
for name in vars.keys():
var = vars[name]
if var.type == fmi.FMI_REAL:
if var.variability == fmi.FMI_CONSTANT or var.variability == fmi.FMI_PARAMETER:
const_valref_real.append(var.value_reference)
const_name_real.append(var.name)
const_alias_real.append(-1 if var.alias == fmi.FMI_NEGATED_ALIAS else 1)
else:
cont_valref_real.append(var.value_reference)
cont_name_real.append(var.name)
cont_alias_real.append(-1 if var.alias == fmi.FMI_NEGATED_ALIAS else 1)
elif var.type == fmi.FMI_INTEGER or var.type == fmi.FMI_ENUMERATION:
if var.variability == fmi.FMI_CONSTANT or var.variability == fmi.FMI_PARAMETER:
const_valref_int.append(var.value_reference)
const_name_int.append(var.name)
const_alias_int.append(-1 if var.alias == fmi.FMI_NEGATED_ALIAS else 1)
else:
cont_valref_int.append(var.value_reference)
cont_name_int.append(var.name)
cont_alias_int.append(-1 if var.alias == fmi.FMI_NEGATED_ALIAS else 1)
elif var.type == fmi.FMI_BOOLEAN:
if var.variability == fmi.FMI_CONSTANT or var.variability == fmi.FMI_PARAMETER:
const_valref_bool.append(var.value_reference)
const_name_bool.append(var.name)
const_alias_bool.append(-1 if var.alias == fmi.FMI_NEGATED_ALIAS else 1)
else:
cont_valref_bool.append(var.value_reference)
cont_name_bool.append(var.name)
cont_alias_bool.append(-1 if var.alias == fmi.FMI_NEGATED_ALIAS else 1)
# Open file
f = codecs.open(self.file_name,'w','utf-8')
self.file_open = True
if delimiter == ",":
name_str = '"time"'
for name in const_name_real+const_name_int+const_name_bool+cont_name_real+cont_name_int+cont_name_bool:
name_str += delimiter+'"'+name+'"'
else:
name_str = "time"
for name in const_name_real+const_name_int+const_name_bool+cont_name_real+cont_name_int+cont_name_bool:
name_str += delimiter+name
f.write(name_str+"\n")
const_val_real = model.get_real(const_valref_real)
const_val_int = model.get_integer(const_valref_int)
const_val_bool = model.get_boolean(const_valref_bool)
const_str = ""
for i,val in enumerate(const_val_real):
const_str += "%.14E"%(const_alias_real[i]*val)+delimiter
for i,val in enumerate(const_val_int):
const_str += "%.14E"%(const_alias_int[i]*val)+delimiter
for i,val in enumerate(const_val_bool):
const_str += "%.14E"%(const_alias_bool[i]*val)+delimiter
#for val in N.append(const_val_real,N.append(const_val_int,const_val_boolean)):
# const_str += "%.14E"%val+delimiter
self.const_str = const_str
self._file = f
self.cont_valref_real = cont_valref_real
self.cont_alias_real = N.array(cont_alias_real)
self.cont_valref_int = cont_valref_int
self.cont_alias_int = N.array(cont_alias_int)
self.cont_valref_bool = cont_valref_bool
self.cont_alias_bool = N.array(cont_alias_bool)
[docs] def integration_point(self, solver = None):
"""
Writes the current status of the model to file. If the header has not
been written previously it is written now. If data is specified it is
written instead of the current status.
Parameters::
data --
A one dimensional array of variable trajectory data. data
should consist of information about the status in the order
specified by FMUModel.save_time_point()
Default: None
"""
f = self._file
model = self.model
delimiter = self.delimiter
#Retrieves the time-point
t = model.time
r = model.get_real(self.cont_valref_real)*self.cont_alias_real
i = model.get_integer(self.cont_valref_int)*self.cont_alias_int
b = model.get_boolean(self.cont_valref_bool)*self.cont_alias_bool
data = N.append(N.append(r,i),b)
cont_str = ""
for val in data:
cont_str += "%.14E%s"%(val,delimiter)
f.write("%.14E%s"%(t,delimiter))
if len(cont_str) == 0:
f.write(self.const_str[:-1]+"\n")
else:
f.write(self.const_str)
f.write(cont_str[:-1]+"\n")
[docs] def simulation_end(self):
"""
Finalize the writing by filling in the blanks in the created file. The
blanks consists of the number of points and the final time (in data set
1). Also closes the file.
"""
#If open, finalize and close
if self.file_open:
self._file.close()
self.file_open = False
[docs] def get_result(self):
"""
Method for retrieving the result. This method should return a
result of an instance of ResultBase or of an instance of a
subclass of ResultBase.
"""
return ResultCSVTextual(self.file_name, self.delimiter)
[docs] def set_options(self, options):
"""
Options are the options dictionary provided to the simulation
method.
"""
self.options = options
[docs]class ResultHandlerFile(ResultHandler):
"""
Export an optimization or simulation result to file in Dymola's result file
format.
"""
def __init__(self, model):
self.model = model
[docs] def initialize_complete(self):
pass
[docs] def simulation_start(self):
"""
Opens the file and writes the header. This includes the information
about the variables and a table determining the link between variables
and data.
"""
opts = self.options
model = self.model
#Internal values
self.file_open = False
self.nbr_points = 0
self.file_name = opts["result_file_name"]
try:
self.parameters = opts["sensitivities"]
except KeyError:
self.parameters = None
if self.file_name == "":
self.file_name=self.model.get_identifier() + '_result.txt'
#Store the continuous and discrete variables for result writing
self.real_var_ref, self.int_var_ref, self.bool_var_ref = model.get_model_time_varying_value_references(filter=opts["filter"])
file_name = self.file_name
parameters = self.parameters
# Open file
f = codecs.open(file_name,'w','utf-8')
self.file_open = True
# Write header
f.write('#1\n')
f.write('char Aclass(3,11)\n')
f.write('Atrajectory\n')
f.write('1.1\n')
f.write('\n')
# all lists that we need for later
vrefs_alias = []
vrefs_noalias = []
vrefs = []
names_alias = []
names_noalias = []
names = []
aliases_alias = []
aliases = []
descriptions_alias = []
descriptions = []
variabilities_alias = []
variabilities_noalias = []
variabilities = []
types_alias = []
types_noalias = []
types = []
for var in self.model.get_model_variables(filter=self.options["filter"]).values():
if not var.type == fmi.FMI_STRING:
if var.alias == fmi.FMI_NO_ALIAS:
vrefs_noalias.append(var.value_reference)
names_noalias.append(var.name)
aliases.append(var.alias)
descriptions.append(var.description)
variabilities_noalias.append(var.variability)
types_noalias.append(var.type)
else:
vrefs_alias.append(var.value_reference)
names_alias.append(var.name)
aliases_alias.append(var.alias)
descriptions_alias.append(var.description)
variabilities_alias.append(var.variability)
types_alias.append(var.type)
# need to save these no alias lists for later
vrefs = vrefs_noalias[:]
names = names_noalias[:]
types = types_noalias[:]
variabilities = variabilities_noalias[:]
# merge lists
vrefs.extend(vrefs_alias)
names.extend(names_alias)
aliases.extend(aliases_alias)
descriptions.extend(descriptions_alias)
variabilities.extend(variabilities_alias)
types.extend(types_alias)
# zip to list of tuples and sort - non alias variables are now
# guaranteed to be first in list
names_noalias = sorted(zip(
tuple(vrefs_noalias),
tuple(names_noalias)),
key=itemgetter(0))
variabilities_noalias = sorted(zip(
tuple(vrefs_noalias),
tuple(variabilities_noalias)),
key=itemgetter(0))
types_noalias = sorted(zip(
tuple(vrefs_noalias),
tuple(types_noalias)),
key=itemgetter(0))
names = sorted(zip(
tuple(vrefs),
tuple(names)),
key=itemgetter(0))
aliases = sorted(zip(
tuple(vrefs),
tuple(aliases)),
key=itemgetter(0))
descriptions = sorted(zip(
tuple(vrefs),
tuple(descriptions)),
key=itemgetter(0))
variabilities = sorted(zip(
tuple(vrefs),
tuple(variabilities)),
key=itemgetter(0))
types = sorted(zip(
tuple(vrefs),
tuple(types)),
key=itemgetter(0))
num_vars = len(names)
names_sens = []
descs_sens = []
cont_vars = []
if parameters != None:
if isinstance(self.model, fmi.FMUModelME2):
vars = self.model.get_model_variables(type=fmi.FMI2_REAL,include_alias=False,variability=fmi.FMI2_CONTINUOUS,filter=self.options["filter"])
if python3_flag:
state_vars = [v.value_reference for i,v in self.model.get_states_list().items()]
else:
state_vars = [v.value_reference for i,v in self.model.get_states_list().iteritems()]
else:
vars = self.model.get_model_variables(type=fmi.FMI_REAL,include_alias=False,variability=fmi.FMI_CONTINUOUS,filter=self.options["filter"])
state_vars = self.model.get_state_value_references()
for i in state_vars:
for j in vars.keys():
if i == vars[j].value_reference:
cont_vars.append(vars[j].name)
for j in range(len(parameters)):
for i in range(len(self.model.continuous_states)):
names_sens += ['d'+cont_vars[i]+'/d'+parameters[j]]
descs_sens += ['Sensitivity of '+cont_vars[i]+' with respect to '+parameters[j]+'.']
# Find the maximum name and description length
max_name_length = len('Time')
max_desc_length = len('Time in [s]')
for i in range(len(names)):
name = names[i][1]
desc = descriptions[i][1]
if (len(name)>max_name_length):
max_name_length = len(name)
if (len(desc)>max_desc_length):
max_desc_length = len(desc)
for i in range(len(names_sens)):
name = names_sens[i]
desc = descs_sens[i]
if (len(name)>max_name_length):
max_name_length = len(name)
if (len(desc)>max_desc_length):
max_desc_length = len(desc)
f.write('char name(%d,%d)\n' % (num_vars+len(names_sens)+1, max_name_length))
f.write('time\n')
for name in names:
f.write(name[1] +'\n')
for name in names_sens:
f.write(name + '\n')
f.write('\n')
# Write descriptions
f.write('char description(%d,%d)\n' % (num_vars+len(names_sens) + 1, max_desc_length))
f.write('Time in [s]\n')
# Loop over all variables, not only those with a description
for desc in descriptions:
f.write(desc[1] +'\n')
for desc in descs_sens:
f.write(desc + '\n')
f.write('\n')
# Write data meta information
f.write('int dataInfo(%d,%d)\n' % (num_vars+len(names_sens) + 1, 4))
f.write('0 1 0 -1 # time\n')
lst_real_cont = dict(zip(self.real_var_ref,range(len(self.real_var_ref))))
lst_int_cont = dict(zip(self.int_var_ref,[len(self.real_var_ref)+x for x in range(len(self.int_var_ref))]))
lst_bool_cont = dict(zip(self.bool_var_ref,[len(self.real_var_ref)+len(self.int_var_ref)+x for x in range(len(self.bool_var_ref))]))
valueref_of_continuous_states = []
list_of_parameters = []
cnt_1 = 1
cnt_2 = 1
n_parameters = 0
datatable1 = False
last_real_vref = -1; last_int_vref = -1; last_bool_vref = -1
for i, name in enumerate(names):
update = False
if (types[i][1] == fmi.FMI_REAL and last_real_vref != name[0]):
last_real_vref = name[0]
update = True
if ((types[i][1] == fmi.FMI_INTEGER or types[i][1] == fmi.FMI_ENUMERATION) and last_int_vref != name[0]):
last_int_vref = name[0]
update = True
if (types[i][1] == fmi.FMI_BOOLEAN and last_bool_vref != name[0]):
last_bool_vref = name[0]
update = True
if update:
if aliases[i][1] == 0:
if variabilities[i][1] == fmi.FMI_PARAMETER or \
variabilities[i][1] == fmi.FMI_CONSTANT:
cnt_1 += 1
n_parameters += 1
datatable1 = True
list_of_parameters.append((types[i][0],types[i][1]))
else:
cnt_2 += 1
#valueref_of_continuous_states.append(
# list_of_continuous_states[name[0]])
if types[i][1] == fmi.FMI_REAL:
valueref_of_continuous_states.append(lst_real_cont[name[0]])
elif types[i][1] == fmi.FMI_INTEGER or types[i][1] == fmi.FMI_ENUMERATION:
valueref_of_continuous_states.append(lst_int_cont[name[0]])
else:
valueref_of_continuous_states.append(lst_bool_cont[name[0]])
datatable1 = False
else:
base_var = self.model.get_variable_alias_base(name[1])
variability = self.model.get_variable_variability(base_var)
data_type = self.model.get_variable_data_type(base_var)
if data_type != types[i][1]:
raise Exception
if variability == fmi.FMI_PARAMETER or \
variability == fmi.FMI_CONSTANT:
cnt_1 += 1
n_parameters += 1
datatable1 = True
list_of_parameters.append((types[i][0],types[i][1]))
else:
cnt_2 += 1
#valueref_of_continuous_states.append(
# list_of_continuous_states[name[0]])
if types[i][1] == fmi.FMI_REAL:
valueref_of_continuous_states.append(lst_real_cont[name[0]])
elif types[i][1] == fmi.FMI_INTEGER or types[i][1] == fmi.FMI_ENUMERATION:
valueref_of_continuous_states.append(lst_int_cont[name[0]])
else:
valueref_of_continuous_states.append(lst_bool_cont[name[0]])
datatable1 = False
if aliases[i][1] == 0: # no alias
#if variabilities[i][1] == fmi.FMI_PARAMETER or \
# variabilities[i][1] == fmi.FMI_CONSTANT:
if datatable1:
#cnt_1 += 1
#n_parameters += 1
f.write('1 %d 0 -1 # ' % cnt_1 + name[1]+'\n')
#datatable1 = True
else:
#cnt_2 += 1
#valueref_of_continuous_states.append(
# list_of_continuous_states[name[0]])
f.write('2 %d 0 -1 # ' % cnt_2 + name[1] +'\n')
#datatable1 = False
elif aliases[i][1] == 1: # alias
if datatable1:
f.write('1 %d 0 -1 # ' % cnt_1 + name[1]+'\n')
else:
f.write('2 %d 0 -1 # ' % cnt_2 + name[1] +'\n')
else:
if datatable1:
f.write('1 -%d 0 -1 # ' % cnt_1 + name[1]+'\n')
else:
f.write('2 -%d 0 -1 # ' % cnt_2 + name[1] +'\n')
for i, name in enumerate(names_sens):
cnt_2 += 1
f.write('2 %d 0 -1 # ' % cnt_2 + name +'\n')
f.write('\n')
# Write data
# Write data set 1
f.write('float data_1(%d,%d)\n' % (2, n_parameters + 1))
f.write("%.14E" % self.model.time)
str_text = ''
# write constants and parameters
for i, dtype in enumerate(list_of_parameters):
vref = dtype[0]
if dtype[1] == fmi.FMI_REAL:
str_text = str_text + (
" %.14E" % (self.model.get_real([vref])))
elif dtype[1] == fmi.FMI_INTEGER or dtype[1] == fmi.FMI_ENUMERATION:
str_text = str_text + (
" %.14E" % (self.model.get_integer([vref])))
elif dtype[1] == fmi.FMI_BOOLEAN:
str_text = str_text + (
" %.14E" % (float(
self.model.get_boolean([vref])[0])))
f.write(str_text)
f.write('\n')
self._point_last_t = f.tell()
f.write("%s" % ' '*28)
f.write(str_text)
f.write('\n\n')
self._nvariables = len(valueref_of_continuous_states)+1
self._nvariables_sens = len(names_sens)
f.write('float data_2(')
self._point_npoints = f.tell()
f.write(' '*(14+4+14))
f.write('\n')
#f.write('%s,%d)\n' % (' '*14, self._nvariables))
self._file = f
self._data_order = N.array(valueref_of_continuous_states)
self.real_var_ref = N.array(self.real_var_ref)
self.int_var_ref = N.array(self.int_var_ref)
self.bool_var_ref = N.array(self.bool_var_ref)
[docs] def integration_point(self, solver = None):#parameter_data=[]):
"""
Writes the current status of the model to file. If the header has not
been written previously it is written now. If data is specified it is
written instead of the current status.
Parameters::
data --
A one dimensional array of variable trajectory data. data
should consist of information about the status in the order
specified by FMUModel.save_time_point()
Default: None
"""
f = self._file
data_order = self._data_order
model = self.model
#Retrieves the time-point
r = model.get_real(self.real_var_ref)
i = model.get_integer(self.int_var_ref)
b = model.get_boolean(self.bool_var_ref)
data = N.append(N.append(r,i),b)
#Write the point
str_text = (" %.14E" % self.model.time) + ''.join([" %.14E" % (data[data_order[j]]) for j in range(self._nvariables-1)])
#Sets the parameters, if any
if solver and self.options["sensitivities"]:
parameter_data = N.array(solver.interpolate_sensitivity(model.time, 0)).flatten()
for j in range(len(parameter_data)):
str_text = str_text + (" %.14E" % (parameter_data[j]))
f.write(str_text+'\n')
#Update number of points
self.nbr_points+=1
[docs] def simulation_end(self):
"""
Finalize the writing by filling in the blanks in the created file. The
blanks consists of the number of points and the final time (in data set
1). Also closes the file.
"""
#If open, finalize and close
if self.file_open:
f = self._file
f.seek(self._point_last_t)
f.write('%.14E'%self.model.time)
f.seek(self._point_npoints)
f.write('%d,%d)' % (self.nbr_points, self._nvariables+self._nvariables_sens))
#f.write('%d'%self._npoints)
f.seek(-1,2)
#Close the file
f.write('\n')
f.close()
self.file_open = False
[docs] def get_result(self):
"""
Method for retrieving the result. This method should return a
result of an instance of ResultBase or of an instance of a
subclass of ResultBase.
"""
return ResultDymolaTextual(self.file_name)
[docs] def set_options(self, options):
"""
Options are the options dictionary provided to the simulation
method.
"""
self.options = options
[docs]class ResultHandlerDummy(ResultHandler):
def __init__(self, model):
self.model = model
[docs] def get_result(self):
return None
[docs]class JIOError(Exception):
"""
Base class for exceptions specific to this module.
"""
def __init__(self, message):
"""
Create new error with a specific message.
Parameters::
message --
The error message.
"""
self.message = message
def __str__(self):
"""
Print error message when class instance is printed.
Overrides the general-purpose special method such that a string
representation of an instance of this class will be the error message.
"""
return self.message
[docs]class VariableNotFoundError(JIOError):
"""
Exception that is thrown when a variable is not found in a data file.
"""
pass
[docs]class VariableNotTimeVarying(JIOError):
"""
Exception that is thrown when a column is asked for a parameter/constant.
"""
pass
[docs]def robust_float(value):
"""
Function for robust handling of float values such as INF and NAN.
"""
try:
return float(value)
except ValueError:
if value.startswith("1.#INF"):
return float(N.inf)
elif value.startswith("-1.#INF"):
return float(-N.inf)
elif value.startswith("1.#QNAN") or value.startswith("-1.#IND"):
return float(N.nan)
else:
raise ValueError
mat4_template = {'header': [('type', 'i4'),
('mrows', 'i4'),
('ncols', 'i4'),
('imagf', 'i4'),
('namlen', 'i4')]}
[docs]class ResultHandlerBinaryFile(ResultHandler):
"""
Export an optimization or simulation result to file in Dymola's binary result file
format (MATLAB v4 format).
"""
def __init__(self, model):
self.model = model
def _data_header(self, name, nbr_rows, nbr_cols, data_type):
if data_type == "int":
t = 10*2
elif data_type == "double":
t = 10*0
elif data_type == "char":
t = 10*5 + 1
header = np.empty((), mat4_template["header"])
header["type"] = (not SYS_LITTLE_ENDIAN) * 1000 + t
header["mrows"] = nbr_rows
header["ncols"] = nbr_cols
header["imagf"] = 0
header["namlen"] = len(name) + 1
return header
def _write_header(self, name, nbr_rows, nbr_cols, data_type):
header = self._data_header(name, nbr_rows, nbr_cols, data_type)
self._file.write(header.tostring(order="F"))
self._file.write(np.compat.asbytes(name +"\0"))
[docs] def convert_char_array(self, data):
data = np.array(data)
dtype = data.dtype
dims = [data.shape[0], int(data.dtype.str[2:])]
data = np.ndarray(shape=(dims[0], int(dtype.str[2:])), dtype=dtype.str[:2]+"1", buffer=data)
data[data == ""] = " "
if dtype.kind == "U":
tmp = np.ndarray(shape=(), dtype=(dtype.str[:2] + str(np.product(dims))), buffer=data)
buf = tmp.item().encode('latin-1')
data = np.ndarray(shape=dims, dtype="S1", buffer=buf)
return data
[docs] def dump_data(self, data):
self._file.write(data.tostring(order="F"))
[docs] def dump_native_data(self, data):
self._file.write(data)
[docs] def initialize_complete(self):
pass
[docs] def simulation_start(self):
"""
Opens the file and writes the header. This includes the information
about the variables and a table determining the link between variables
and data.
"""
opts = self.options
model = self.model
#Internal values
self.file_open = False
self.nbr_points = 0
self.file_name = opts["result_file_name"]
try:
self.parameters = opts["sensitivities"]
except KeyError:
self.parameters = False
if self.parameters:
raise fmi.FMUException("Storing sensitivity results are not supported using this format. Use the file format instead.")
if self.file_name == "":
self.file_name=self.model.get_identifier() + '_result.mat'
file_name = self.file_name
parameters = self.parameters
self._file = open(file_name,'wb')
aclass_data = ["Atrajectory", "1.1", " ", "binTrans"]
aclass_data = self.convert_char_array(aclass_data)
self._write_header("Aclass", aclass_data.shape[0], aclass_data.shape[1], "char")
self.dump_data(aclass_data)
# Open file
vars_real = self.model.get_model_variables(type=fmi.FMI_REAL, filter=self.options["filter"], _as_list=True)#.values()
vars_int = self.model.get_model_variables(type=fmi.FMI_INTEGER, filter=self.options["filter"], _as_list=True)#.values()
vars_bool = self.model.get_model_variables(type=fmi.FMI_BOOLEAN, filter=self.options["filter"], _as_list=True)#.values()
vars_enum = self.model.get_model_variables(type=fmi.FMI_ENUMERATION, filter=self.options["filter"], _as_list=True)#.values()
sorted_vars_real = sorted(vars_real, key=attrgetter("value_reference"))
sorted_vars_int = sorted(vars_int, key=attrgetter("value_reference"))
sorted_vars_bool = sorted(vars_bool, key=attrgetter("value_reference"))
sorted_vars_enum = sorted(vars_enum, key=attrgetter("value_reference"))
sorted_vars = sorted_vars_real+sorted_vars_int+sorted_vars_enum+sorted_vars_bool
self._sorted_vars = sorted_vars
len_name_items = len(sorted_vars)+1
len_desc_items = len_name_items
name_data = ["time"] + [var.name for var in sorted_vars]
desc_data = ["Time in [s]"] + [var.description for var in sorted_vars]
len_name_data, name_data = fmi_util.convert_str_list(name_data)
len_desc_data, desc_data = fmi_util.convert_str_list(desc_data)
self._write_header("name", len_name_data, len_name_items, "char")
self.dump_native_data(name_data)
self._write_header("description", len_desc_data, len_desc_items, "char")
self.dump_native_data(desc_data)
#Create the data info structure (and return parameters)
data_info = np.zeros((4, len_name_items), dtype=np.int32)
[parameter_data, sorted_vars_real_vref, sorted_vars_int_vref, sorted_vars_bool_vref] = fmi_util.prepare_data_info(data_info, sorted_vars, self.model)
self._write_header("dataInfo", data_info.shape[0], data_info.shape[1], "int")
self.dump_data(data_info)
#Dump parameters to file
self._write_header("data_1", len(parameter_data), 2, "double")
self.dump_data(parameter_data)
#Record the position so that we can later modify the end time
self.data_1_header_position = self._file.tell()
self.dump_data(parameter_data)
#Record the position so that we can later modify the number of result points stored
self.data_2_header_position = self._file.tell()
self._write_header("data_2", len(sorted_vars_real_vref)+len(sorted_vars_int_vref)+len(sorted_vars_bool_vref)+1, 1, "double")
self.real_var_ref = np.array(sorted_vars_real_vref)
self.int_var_ref = np.array(sorted_vars_int_vref)
self.bool_var_ref = np.array(sorted_vars_bool_vref)
self.nbr_points = 0
[docs] def integration_point(self, solver = None):
"""
Writes the current status of the model to file. If the header has not
been written previously it is written now. If data is specified it is
written instead of the current status.
Parameters::
data --
A one dimensional array of variable trajectory data. data
should consist of information about the status in the order
specified by FMUModel.save_time_point()
Default: None
"""
f = self._file
model = self.model
#Retrieves the time-point
r = model.get_real(self.real_var_ref)
i = model.get_integer(self.int_var_ref).astype(float)
b = model.get_boolean(self.bool_var_ref).astype(float)
#self._write_data(data)
self.dump_data(np.array(float(model.time)))
self.dump_data(r)
self.dump_data(i)
self.dump_data(b)
#Increment number of points
self.nbr_points += 1
[docs] def simulation_end(self):
"""
Finalize the writing by filling in the blanks in the created file. The
blanks consists of the number of points and the final time (in data set
1). Also closes the file.
"""
#If open, finalize and close
f = self._file
if f:
f.seek(self.data_1_header_position)
t = np.array([float(self.model.time)])
self.dump_data(t)
f.seek(self.data_2_header_position)
self._write_header("data_2", len(self.real_var_ref)+len(self.int_var_ref)+len(self.bool_var_ref)+1, self.nbr_points, "double")
f.close()
self._file = None
[docs] def get_result(self):
"""
Method for retrieving the result. This method should return a
result of an instance of ResultBase or of an instance of a
subclass of ResultBase.
"""
return ResultDymolaBinary(self.file_name)
[docs] def set_options(self, options):
"""
Options are the options dictionary provided to the simulation
method.
"""
self.options = options