Source code for pyFTS.models.nonstationary.common

"""
Non Stationary Fuzzy Sets

GARIBALDI, Jonathan M.; JAROSZEWSKI, Marcin; MUSIKASUWAN, Salang. Nonstationary fuzzy sets.
IEEE Transactions on Fuzzy Systems, v. 16, n. 4, p. 1072-1086, 2008.
"""

import numpy as np
from pyFTS import *
from pyFTS.common import FuzzySet as FS, Membership, FLR
from pyFTS.partitioners import partitioner
from pyFTS.models.nonstationary import perturbation


[docs]class FuzzySet(FS.FuzzySet): """ Non Stationary Fuzzy Sets """ def __init__(self, name, mf, parameters, **kwargs): """ Constructor """ super(FuzzySet, self).__init__(name=name, mf=mf, parameters=parameters, centroid=None, alpha=1.0, **kwargs) self.location = kwargs.get("location", None) """Pertubation function that affects the location of the membership function""" self.location_params = kwargs.get("location_params", None) """Parameters for location pertubation function""" self.location_roots = kwargs.get("location_roots", 0) self.width = kwargs.get("width", None) """Pertubation function that affects the width of the membership function""" self.width_params = kwargs.get("width_params", None) """Parameters for width pertubation function""" self.width_roots = kwargs.get("width_roots", 0) self.noise = kwargs.get("noise", None) """Pertubation function that adds noise on the membership function""" self.noise_params = kwargs.get("noise_params", None) """Parameters for noise pertubation function""" self.perturbated_parameters = {} self.type = 'nonstationary' if self.location is not None and not isinstance(self.location, (list, set)): self.location = [self.location] self.location_params = [self.location_params] self.location_roots = [self.location_roots] if self.width is not None and not isinstance(self.width, (list, set)): self.width = [self.width] self.width_params = [self.width_params] self.width_roots = [self.width_roots]
[docs] def perform_location(self, t, param): if self.location is None: inc = t else: l = len(self.location) inc = sum([self.location[k](t + self.location_roots[k], self.location_params[k]) for k in np.arange(0, l)]) if self.mf == Membership.gaussmf: # changes only the mean parameter return [param[0] + inc, param[1]] elif self.mf == Membership.sigmf: # changes only the midpoint parameter return [param[0], param[1] + inc] elif self.mf == Membership.bellmf: return [param[0], param[1], param[2] + inc] else: # translate all parameters return [k + inc for k in param]
[docs] def perform_width(self, t, param): if self.width is None: inc = t else: l = len(self.width) inc = sum([self.width[k](t + self.width_roots[k], self.width_params[k]) for k in np.arange(0, l)]) if self.mf == Membership.gaussmf: # changes only the variance parameter return [param[0], param[1] + inc] elif self.mf == Membership.sigmf: # changes only the smooth parameter return [param[0] + inc, param[1]] elif self.mf == Membership.trimf: tmp = inc / 2 return [param[0] - tmp, param[1], param[2] + tmp] elif self.mf == Membership.trapmf: l = (param[3] - param[0]) rab = (param[1] - param[0]) / l rcd = (param[3] - param[2]) / l return [param[0] - inc, param[1] - inc * rab, param[2] + inc * rcd, param[3] + inc] else: return param
[docs] def membership(self, x, t): """ Calculate the membership value of a given input :param x: input value :param t: time displacement or perturbation parameters :return: membership value of x at this fuzzy set """ self.perturbate_parameters(t) tmp = self.mf(x, self.perturbated_parameters[str(t)]) if self.noise is not None: tmp += self.noise(t, self.noise_params) return tmp
[docs] def perturbate_parameters(self, t): if str(t) not in self.perturbated_parameters: param = self.parameters if isinstance(t, (list, set)): param = self.perform_location(t[0], param) param = self.perform_width(t[1], param) else: param = self.perform_location(t, param) param = self.perform_width(t, param) self.perturbated_parameters[str(t)] = param
[docs] def get_midpoint(self, t): self.perturbate_parameters(t) param = self.perturbated_parameters[str(t)] if self.mf == Membership.gaussmf: return param[0] elif self.mf == Membership.sigmf: return param[1] elif self.mf == Membership.trimf: return param[1] elif self.mf == Membership.trapmf: return (param[2] - param[1]) / 2 else: return param
[docs] def get_lower(self, t): self.perturbate_parameters(t) param = self.perturbated_parameters[str(t)] if self.mf == Membership.gaussmf: return param[0] - 3*param[1] elif self.mf == Membership.sigmf: return param[0] - param[1] elif self.mf == Membership.trimf: return param[0] elif self.mf == Membership.trapmf: return param[0] else: return param
[docs] def get_upper(self, t): self.perturbate_parameters(t) param = self.perturbated_parameters[str(t)] if self.mf == Membership.gaussmf: return param[0] + 3*param[1] elif self.mf == Membership.sigmf: return param[0] + param[1] elif self.mf == Membership.trimf: return param[2] elif self.mf == Membership.trapmf: return param[3] else: return param
def __str__(self): tmp = "" if self.location is not None: tmp += "Location: " for ct, f in enumerate(self.location): tmp += str(f.__name__) + "(" + str(["{0:.2f}".format(p) for p in self.location_params[ct]]) + ") " if self.width is not None: tmp += "Width: " for ct, f in enumerate(self.width): tmp += str(f.__name__) + "(" + str(["{0:.2f}".format(p) for p in self.width_params[ct]]) + ") " tmp = "(" + str(["{0:.2f}".format(p) for p in self.parameters]) + ") " + tmp return self.name + ": " + str(self.mf.__name__) + tmp
[docs]def fuzzify(inst, t, fuzzySets): """ Calculate the membership values for a data point given nonstationary fuzzy sets :param inst: data points :param t: time displacement of the instance :param fuzzySets: list of fuzzy sets :return: array of membership values """ ret = [] if not isinstance(inst, list): inst = [inst] for t, i in enumerate(inst): mv = np.array([fs.membership(i, t) for fs in fuzzySets]) ret.append(mv) return ret
[docs]def fuzzySeries(data, fuzzySets, ordered_sets, window_size=1, method='fuzzy', const_t= None): fts = [] for t, i in enumerate(data): tdisp = window_index(t, window_size) if const_t is None else const_t mv = np.array([fuzzySets[fs].membership(i, tdisp) for fs in ordered_sets]) if len(mv) == 0: sets = [check_bounds(i, fuzzySets, tdisp)] else: if method == 'fuzzy': ix = np.ravel(np.argwhere(mv > 0.0)) elif method == 'maximum': mx = max(mv) ix = np.ravel(np.argwhere(mv == mx)) sets = [fuzzySets[ordered_sets[i]] for i in ix] fts.append(sets) return fts
[docs]def window_index(t, window_size): if isinstance(t, (list, set)): return t return t - (t % window_size)
[docs]def check_bounds(data, partitioner, t): if data < partitioner.lower_set().get_lower(t): return partitioner.lower_set() elif data > partitioner.upper_set().get_upper(t): return partitioner.upper_set()
[docs]def check_bounds_index(data, partitioner, t): if data < partitioner.lower_set().get_lower(t): return 0 elif data > partitioner.upper_set().get_upper(t): return len(partitioner.sets) -1