Source code for pyFTS.probabilistic.ProbabilityDistribution

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pyFTS.common import FuzzySet,SortedCollection,tree
from pyFTS.probabilistic import kde


[docs]class ProbabilityDistribution(object): """ Represents a discrete or continous probability distribution If type is histogram, the PDF is discrete If type is KDE the PDF is continuous """ def __init__(self, type = "KDE", **kwargs): self.uod = kwargs.get("uod", None) """Universe of discourse""" self.data = [] self.type = type """ If type is histogram, the PDF is discrete If type is KDE the PDF is continuous """ self.bins = kwargs.get("bins", None) """Number of bins on a discrete PDF""" self.labels = kwargs.get("bins_labels", None) """Bins labels on a discrete PDF""" data = kwargs.get("data", None) if self.type == "KDE": self.kde = kde.KernelSmoothing(kwargs.get("h", 0.5), kwargs.get("kernel", "epanechnikov")) _min = np.nanmin(data) _min = _min * .7 if _min > 0 else _min * 1.3 _max = np.nanmax(data) _max = _max * 1.3 if _max > 0 else _max * .7 self.uod = [_min, _max] self.nbins = kwargs.get("num_bins", 100) if self.bins is None: self.bins = np.linspace(int(self.uod[0]), int(self.uod[1]), int(self.nbins)).tolist() self.labels = [str(k) for k in self.bins] if self.uod is not None: self.resolution = (self.uod[1] - self.uod[0]) / self.nbins self.bin_index = SortedCollection.SortedCollection(iterable=sorted(self.bins)) self.quantile_index = None self.distribution = {} self.cdf = None self.qtl = None self.count = 0 for k in self.bins: self.distribution[k] = 0 if data is not None: self.append(data) self.name = kwargs.get("name", "")
[docs] def set(self, value, density): k = self.bin_index.find_ge(value) self.distribution[k] = density
[docs] def append(self, values): if self.type == "histogram": for k in values: v = self.bin_index.find_ge(k) self.distribution[v] += 1 self.count += 1 else: self.data.extend(values) self.distribution = {} dens = self.density(self.bins) for v,d in enumerate(dens): self.distribution[self.bins[v]] = d
[docs] def append_interval(self, intervals): if self.type == "histogram": for interval in intervals: for k in self.bin_index.inside(interval[0], interval[1]): self.distribution[k] += 1 self.count += 1
[docs] def density(self, values): ret = [] scalar = False if not isinstance(values, list): values = [values] scalar = True for k in values: if self.type == "histogram": v = self.bin_index.find_ge(k) ret.append(self.distribution[v] / (self.count + 1e-5)) elif self.type == "KDE": v = self.kde.probability(k, self.data) ret.append(v) else: v = self.bin_index.find_ge(k) ret.append(self.distribution[v]) if scalar: return ret[0] return ret
[docs] def differential_offset(self, value): nbins = [] dist = {} for k in self.bins: nk = k+value nbins.append(nk) dist[nk] = self.distribution[k] self.bins = nbins self.distribution = dist self.labels = [str(k) for k in self.bins] self.bin_index = SortedCollection.SortedCollection(iterable=sorted(self.bins)) self.quantile_index = None self.cdf = None self.qtl = None
[docs] def expected_value(self): return np.nansum([v * self.distribution[v] for v in self.bins])
[docs] def build_cdf_qtl(self): ret = 0.0 self.cdf = {} self.qtl = {} for k in sorted(self.bins): ret += self.density(k) if k not in self.cdf: self.cdf[k] = ret if str(ret) not in self.qtl: self.qtl[str(ret)] = [] self.qtl[str(ret)].append(k) _keys = [float(k) for k in sorted(self.qtl.keys())] self.quantile_index = SortedCollection.SortedCollection(iterable=_keys)
[docs] def cummulative(self, values): if self.cdf is None: self.build_cdf_qtl() if isinstance(values, list): ret = [] for val in values: k = self.bin_index.find_ge(val) ret.append(self.cdf[k]) else: k = self.bin_index.find_ge(values) return self.cdf[values]
[docs] def quantile(self, values): if self.qtl is None: self.build_cdf_qtl() if isinstance(values, list): ret = [] for val in values: k = self.quantile_index.find_ge(val) ret.append(self.qtl[str(k)][0]) else: k = self.quantile_index.find_ge(values) ret = self.qtl[str(k)] return ret
[docs] def entropy(self): h = -sum([self.distribution[k] * np.log(self.distribution[k]) if self.distribution[k] > 0 else 0 for k in self.bins]) return h
[docs] def crossentropy(self,q): h = -sum([self.distribution[k] * np.log(q.distribution[k]) if self.distribution[k] > 0 else 0 for k in self.bins]) return h
[docs] def kullbackleiblerdivergence(self,q): h = sum([self.distribution[k] * np.log(self.distribution[k]/q.distribution[k]) if self.distribution[k] > 0 else 0 for k in self.bins]) return h
[docs] def empiricalloglikelihood(self): _s = 0 for k in self.bins: if self.distribution[k] > 0: _s += np.log(self.distribution[k]) return _s
[docs] def pseudologlikelihood(self, data): densities = self.density(data) _s = 0 for k in densities: if k > 0: _s += np.log(k) return _s
[docs] def averageloglikelihood(self, data): densities = self.density(data) _s = 0 for k in densities: if k > 0: _s += np.log(k) return _s / len(data)
[docs] def plot(self,axis=None,color="black",tam=[10, 6], title = None): if axis is None: fig = plt.figure(figsize=tam) axis = fig.add_subplot(111) if self.type == "histogram": ys = [self.distribution[k]/self.count for k in self.bins] else: ys = [self.distribution[k] for k in self.bins] yp = [0 for k in self.data] axis.plot(self.data, yp, c="red") if title is None: title = self.name axis.plot(self.bins, ys, c=color) axis.set_title(title) axis.set_xlabel('Universe of Discourse') axis.set_ylabel('Probability')
def __str__(self): ret = "" for k in sorted(self.bins): ret += str(round(k,2)) + ':\t' if self.type == "histogram": ret += str(round(self.distribution[k] / self.count,3)) elif self.type == "KDE": ret += str(round(self.density(k),3)) else: ret += str(round(self.distribution[k], 6)) ret += '\n' return ret