#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (C) 2010 Modelon AB
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Module containing base classes and functionality.
"""
import zipfile
import tempfile
import platform as PL
import os
import sys
import shutil
import numpy as N
import numpy.ctypeslib as Nct
# location for temporary JModelica files
def get_temp_location():
if "USER" in os.environ:
return os.path.join(tempfile._get_default_tempdir(),os.environ['USER'],'JModelica.org')
else:
return os.path.join(tempfile._get_default_tempdir(),'JModelica.org')
tmp_location = get_temp_location()
[docs]class ModelBase(object):
"""
Abstract Model class containing base functionality.
"""
def __init__(self):
raise Exception("This is an abstract class it can not be instantiated.")
[docs] def optimize(self):
raise NotImplementedError('This method is not available in BaseModel.')
[docs] def optimize_options(self, algorithm):
raise NotImplementedError('This method is not available in BaseModel.')
[docs] def simulate(self):
raise NotImplementedError('This method is not available in BaseModel.')
[docs] def simulate_options(self, algorithm):
raise NotImplementedError('This method is not available in BaseModel.')
[docs] def initialize(self):
raise NotImplementedError('This method is not available in BaseModel.')
[docs] def initialize_options(self, algorithm):
raise NotImplementedError('This method is not available in BaseModel.')
[docs] def set_real(self, valueref, value):
raise NotImplementedError('This method is currently not supported.')
[docs] def get_real(self, valueref):
raise NotImplementedError('This method is currently not supported.')
[docs] def set_integer(self, valueref, value):
raise NotImplementedError('This method is currently not supported.')
[docs] def get_integer(self, valueref):
raise NotImplementedError('This method is currently not supported.')
[docs] def set_boolean(self, valueref, value):
raise NotImplementedError('This method is currently not supported.')
[docs] def get_boolean(self, valueref):
raise NotImplementedError('This method is currently not supported.')
[docs] def set_string(self, valueref, value):
raise NotImplementedError('This method is currently not supported.')
[docs] def get_string(self, valueref):
raise NotImplementedError('This method is currently not supported.')
[docs] def set(self, variable_name, value):
"""
Sets the given value(s) to the specified variable name(s) into the
model. The method both accept a single variable and a list of variables.
Parameters::
variable_name --
The name of the variable(s) as string/list.
value --
The value(s) to set.
Example::
(FMU)Model.set('damper.d', 1.1)
(FMU)Model.set(['damper.d','gear.a'], [1.1, 10])
"""
if isinstance(variable_name, basestring):
self._set(variable_name, value) #Scalar case
else:
for i in xrange(len(variable_name)): #A list of variables
self._set(variable_name[i], value[i])
[docs] def get(self, variable_name):
"""
Returns the value(s) of the specified variable(s). The method both
accept a single variable and a list of variables.
Parameters::
variable_name --
The name of the variable(s) as string/list.
Returns::
The value(s).
Example::
# Returns the variable d
(FMU)Model.get('damper.d')
# Returns a list of the variables
(FMU)Model.get(['damper.d','gear.a'])
"""
if isinstance(variable_name, basestring):
return self._get(variable_name) #Scalar case
else:
ret = []
for i in xrange(len(variable_name)): #A list of variables
ret += [self._get(variable_name[i])]
return ret
def _exec_algorithm(self, module, algorithm, options):
"""
Helper function which performs all steps of an algorithm run which are
common to all initialize and optimize algorithms.
Raises::
Exception if algorithm is not a subclass of
common.algorithm_drivers.AlgorithmBase.
"""
base_path = 'algorithm_drivers'
algdrive = __import__(base_path, globals(), locals(), [], -1)
AlgorithmBase = getattr(algdrive, 'AlgorithmBase')
if isinstance(algorithm, basestring):
module = __import__(module, globals(), locals(), [algorithm], -1)
algorithm = getattr(module, algorithm)
if not issubclass(algorithm, AlgorithmBase):
raise Exception(str(algorithm)+
" must be a subclass of common.algorithm_drivers.AlgorithmBase")
# initialize algorithm
alg = algorithm(self, options)
# solve optimization problem/initialize
alg.solve()
# get and return result
return alg.get_result()
def _exec_simulate_algorithm(self,
start_time,
final_time,
input,
module,
algorithm,
options):
"""
Helper function which performs all steps of an algorithm run which are
common to all simulate algorithms.
Raises::
Exception if algorithm is not a subclass of
common.algorithm_drivers.AlgorithmBase.
"""
base_path = 'algorithm_drivers'
algdrive = __import__(base_path, globals(), locals(), [], -1)
AlgorithmBase = getattr(algdrive, 'AlgorithmBase')
if isinstance(algorithm, basestring):
module = __import__(module, globals(), locals(), [algorithm], -1)
algorithm = getattr(module, algorithm)
if not issubclass(algorithm, AlgorithmBase):
raise Exception(str(algorithm)+
" must be a subclass of common.algorithm_drivers.AlgorithmBase")
# initialize algorithm
alg = algorithm(start_time, final_time, input, self, options)
# simulate
alg.solve()
# get and return result
return alg.get_result()
def _default_options(self, module, algorithm):
"""
Help method. Gets the options class for the algorithm specified in
'algorithm'.
"""
module = __import__(module, globals(), locals(), [algorithm], -1)
algorithm = getattr(module, algorithm)
return algorithm.get_default_options()
[docs]def rename_to_tmp(filename, path ='.', filetype = 'dynamic_lib'):
"""
Take a file and give it a random temporary name.
Parameters::
filename --
Name of file to rename.
path --
Path to the file to rename. This is also where the renamed file will
end up.
Default: Current directory.
filetype --
Type of file to rename, used so that platform specific suffixes can
be taken into consideration. Currently only dynamic libary is possible.
Default: 'dynamic_lib'
"""
tempfilename = tempfile.mktemp(suffix=get_platform_suffix(filetype), dir=path)
shutil.move(os.path.join(path, filename), tempfilename)
return tempfilename
[docs]def get_files_in_archive(path):
"""
Get paths to all unit files and directories in archive.
Parameters::
path --
The path to the archive directory.
Returns::
Dict with path to the file or directory as value or None if not found.
Keys are used to access the unit specific files or directories, possible
values are:
- root : Root of archive (same as path)
- model_desc : XML description of model (required)
- image : Image file of model icon (optional)
- documentation_dir : Directory containing the model documentation (optional)
- sources_dir : Directory containing source files (optional)
- binaries_dir : Directory containing the binaries (required)
- resources_dir : Directory containing resources needed by the model (optional)
"""
files = {'root':path, 'model_desc':None, 'image': None,
'documentation_dir': None, 'sources_dir':None,
'binaries_dir': None, 'resources_dir':None}
# model description XML file
filepath = os.path.join(path, 'modelDescription.xml')
if os.path.exists(filepath):
files['model_desc'] = filepath
# model image file
filepath = os.path.join(path, 'model.png')
if os.path.exists(filepath):
files['image'] = filepath
# documentation directory
filepath = os.path.join(path, 'documentation')
if os.path.exists(filepath):
files['documentation_dir'] = filepath
# source directory
filepath = os.path.join(path, 'sources')
if os.path.exists(filepath):
files['sources_dir'] = filepath
# binaries directory
filepath = os.path.join(path, 'binaries', get_platform_dir())
if os.path.exists(filepath):
files['binaries_dir'] = filepath
# resource directory
filepath = os.path.join(path, 'resources')
if os.path.exists(filepath):
files['resources_dir'] = filepath
return files
[docs]def unzip_unit(archive, path='.'):
"""
Unzip a unit file.
Extracts all files in archive in temporary location as returned by
get_temp_location().
Parameters::
archive --
The archive file name.
path --
The path to the archive file.
Default: Current directory.
Returns::
Path to the root of the extracted archive.
"""
# return arg
#ret_val = {'model_desc':None, 'model_values':None, 'binary':None}
# unzip whole archive
try:
archive = zipfile.ZipFile(os.path.join(path,archive))
except IOError:
raise IOError('Could not locate the file: ' + str(archive))
# create temporary directory
tmpdir = create_temp_dir()
# extract all into temp_dir
archive.extractall(path=tmpdir)
return tmpdir
[docs]def create_temp_dir():
"""
Create a temporary directory for extracting an FMU in or similar
"""
# create JModelica directory for temporary files (if not already created)
if not os.path.exists(tmp_location):
try: #Account for race conditions
os.makedirs(tmp_location)
except OSError:
if not os.path.exists(tmp_location):
raise
# create temporary directory
tmpdir = tempfile.mkdtemp(prefix='jm_tmp', dir=tmp_location)
return tmpdir
[docs]def create_temp_file():
"""
Create a temporary file.
"""
# create JModelica directory for temporary files (if not already created)
if not os.path.exists(tmp_location):
os.makedirs(tmp_location)
# create temporary file
tmpfile = tempfile.mkstemp(suffix='.txt', prefix='jm_tmp', dir=tmp_location, text=True)
os.close(tmpfile[0])
return tmpfile[1]
[docs]def delete_temp_file(tmp_file):
"""
This method deletes a temporary file that has been created by
for instance create_temp_file.
"""
#Check if the directory exists
if os.path.exists(tmp_file):
os.remove(tmp_file)
[docs]def delete_temp_dir(tmp_dir):
"""
This method deletes a temporary directory that has been created by
for instance create_temp_dir.
"""
#Check if the directory exists
if os.path.exists(tmp_dir):
shutil.rmtree(tmp_dir)
[docs]def get_unit_name(class_name, unit_type='FMU'):
"""
Computes the unit name from a class name.
Parameters::
class_name --
The name of the model.
unit_type --
The unit type. Possible values: FMU, FMUX.
Default: 'FMU'
Returns::
The unit name (replaced dots with underscores).
"""
if unit_type == 'FMU':
return class_name.replace('.','_')+'.fmu'
elif unit_type == 'FMUX':
return class_name.replace('.','_')+'.fmux'
else:
raise Exception("The unit type %s is unknown" %unit_type)
[docs]def get_temp_location():
"""
Get the directory where the temporary files are placed.
Returns::
The location of temporary files.
"""
return tmp_location
[docs]def list_to_string(item_list):
"""
Helper function that takes a list of items, which are typed to str and
returned as a string with the list items separated by platform dependent
path separator. For example::
(platform = win)
item_list = [1, 2, 3]
return value: '1;2;3'
"""
ret_str = ''
for l in item_list:
ret_str =ret_str+str(l)+os.pathsep
return ret_str
[docs]class Trajectory:
"""
Base class for representation of trajectories.
"""
def __init__(self, abscissa, ordinate, tol=1e-8):
"""
Default constructor for creating a tracjectory object.
Parameters::
abscissa --
One dimensional numpy array containing the n abscissa
(independent) values.
ordinate --
Two dimensional n x m numpy matrix containing the ordiate
values. The matrix has the same number of rows as the abscissa
has elements. The number of columns is equal to the number of
output variables.
tol --
Minimum distance between abcissae. If two abscissae are closer
than the given tolerance, the largest one is moved.
"""
self._abscissa = abscissa.astype('float')
self._ordinate = ordinate
self._n = N.size(abscissa)
self._x0 = abscissa[0]
self._xf = abscissa[-1]
if not N.all(N.diff(self.abscissa) >= 0):
raise Exception("The abscissae must be increasing.")
[double_point_indices] = N.nonzero(N.abs(N.diff(self.abscissa)) <= tol)
while (len(double_point_indices) > 0):
for i in double_point_indices:
self.abscissa[i+1] = self.abscissa[i+1] + tol
[double_point_indices] = N.nonzero(
N.abs(N.diff(self.abscissa)) <= tol)
[docs] def eval(self,x):
"""
Evaluate the trajectory at a specifed abscissa.
Parameters::
x --
One dimensional numpy array, or scalar number, containing n
abscissa value(s).
Returns::
Two dimensional n x m matrix containing the ordinate values
corresponding to the argument x.
"""
pass
def _set_abscissa(self, absscissa):
self._abscissa[:] = abscissa
def _get_abscissa(self):
return self._abscissa
abscissa = property(_get_abscissa, _set_abscissa, doc=
"""
Property for accessing the abscissa of the trajectory.
""")
def _set_ordinate(self, absscissa):
self._ordinate[:] = ordinate
def _get_ordinate(self):
return self._ordinate
ordinate = property(_get_ordinate, _set_ordinate, doc=
"""
Property for accessing the ordinate of the trajectory.
""")
[docs]class TrajectoryLinearInterpolation(Trajectory):
[docs] def eval(self,x):
"""
Evaluate the trajectory at a specifed abscissa.
Parameters::
x --
One dimensional numpy array, or scalar number, containing n
abscissa value(s).
Returns::
Two dimensional n x m matrix containing the ordinate values
corresponding to the argument x.
"""
y = N.zeros([N.size(x),N.size(self.ordinate,1)])
for i in range(N.size(y,1)):
y[:,i] = N.interp(x,self.abscissa,self.ordinate[:,i])
return y
[docs]class TrajectoryUserFunction(Trajectory):
def __init__(self, func):
"""
Constructor for creating a user defined trajectory function.
Parameters::
func --
A function which calculates the ordinate values.
"""
self.traj = func
[docs] def eval(self, x):
"""
Evaluate the trajectory at a specifed abscissa.
Parameters::
x --
One dimensional numpy array, or scalar number, containing a
abscissa value.
Returns::
Two dimensional n x m matrix containing the ordinate values
corresponding to the argument x.
"""
try:
y = N.array(N.matrix(self.traj(float(x))))
except TypeError:
y = N.array(N.matrix(self.traj(x)).transpose())
#In order to guarantee that the
#return values are on the correct
#form. May need to be evaluated
#for speed improvements.
return y