Source code for tramp.likelihoods.piecewise_linear_likelihood

import numpy as np
from scipy.special import logsumexp, softmax
from .base_likelihood import Likelihood
from ..utils.integration import gaussian_measure, gaussian_measure_2d
from ..beliefs import truncated


class LinearRegionLikelihood(Likelihood):
    "Inference knowing z in [zmin, zmax] and x = x0 + slope*z"

    def __init__(self, zmin, zmax, x0, slope):
        assert zmin < zmax
        self.zmin = zmin
        self.zmax = zmax
        self.x0 = x0
        self.slope = slope
        self.repr_init()

    def x(self, z):
        return self.x0 + self.slope*z

    def strict_indicator(self, z):
        return (self.zmin < z) * (z < self.zmax)

    def sample(self, Z):
        # zero outside of the region
        X = self.x(Z) * (self.zmin <= Z) * (Z < self.zmax)
        return X

    def backward_mean(self, az, bz, y):
        if self.slope == 0:
            rz = truncated.r(az, bz, self.zmin, self.zmax)
        else:
            rz = (y - self.x0) / self.slope
        rz = np.where(self.contains(y), rz, 0)
        return rz

    def backward_variance(self, az, bz, y):
        if self.slope == 0:
            vz = truncated.v(az, bz, self.zmin, self.zmax)
        else:
            vz = 0
        vz = np.where(self.contains(y), vz, 0)
        return vz

    def contains(self, y):
        if self.slope == 0:
            y_in_region = (y == self.x0)
        else:
            z = (y - self.x0) / self.slope
            y_in_region = self.strict_indicator(z)
        return y_in_region

    def log_partitions(self, az, bz, y):
        "Element-wise log_partition"
        if self.slope == 0:
            logZ = truncated.A(az, bz, self.zmin, self.zmax)
        else:
            z = (y - self.x0) / self.slope
            logZ = -0.5*az*(z**2) + bz*z
        logZ = np.where(self.contains(y), logZ, -np.inf)
        return logZ

    def b_measure(self, mz_hat, qz_hat, tz0_hat, f):
        if self.slope == 0:
            mz_star = mz_hat**2 / qz_hat
            az_star = mz_star + tz0_hat

            def p_times_f(bz):
                bz_star = (mz_hat / qz_hat) * bz
                p = truncated.p(az_star, bz_star, self.zmin, self.zmax)
                return p * f(bz, self.x0)
            tz0 = 1 / tz0_hat
            sz_eff = np.sqrt(qz_hat + (mz_hat**2) * tz0)
            mu = gaussian_measure(0, sz_eff, p_times_f)
        else:
            def integrand(z, xi_b):
                """
                The integrand must return 1_R(z)*f(bz, y) but f(bz, y) is slow to
                compute. If z is outside R we directly return 0 to bypass the
                computation of f(bz, y).
                """
                if not self.strict_indicator(z):
                    return 0
                bz = mz_hat * z + np.sqrt(qz_hat) * xi_b
                y = self.x(z)
                return f(bz, y)
            tz0 = 1 / tz0_hat
            mu = gaussian_measure_2d(0, np.sqrt(tz0), 0, 1, integrand)
        return mu

    def bz_measure(self, mz_hat, qz_hat, tz0_hat, f):
        if self.slope == 0:
            mz_star = mz_hat**2 / qz_hat
            az_star = mz_star + tz0_hat

            def rp_times_f(bz):
                bz_star = (mz_hat / qz_hat) * bz
                r = truncated.r(az_star, bz_star, self.zmin, self.zmax)
                p = truncated.p(az_star, bz_star, self.zmin, self.zmax)
                return r * p * f(bz, self.x0)
            tz0 = 1 / tz0_hat
            sz_eff = np.sqrt(qz_hat + (mz_hat**2) * tz0)
            mu = gaussian_measure(0, sz_eff, rp_times_f)
        else:
            def integrand(z, xi_b):
                """
                The integrand must return 1_R(z)*z*f(bz, y) but f(bz, y) is slow
                to compute. If z is outside R we directly return 0 to bypass the
                computation of f(bz, y).
                """
                if not self.strict_indicator(z):
                    return 0
                bz = mz_hat * z + np.sqrt(qz_hat) * xi_b
                y = self.x(z)
                return z*f(bz, y)
            tz0 = 1 / tz0_hat
            mu = gaussian_measure_2d(0, np.sqrt(tz0), 0, 1, integrand)
        return mu

    def beliefs_measure(self, az, tau_z, f):
        mz_hat = az - 1 / tau_z
        assert mz_hat > 0 , "az must be greater than 1/ tau_z"

        if self.slope == 0:
            def integrand(bz):
                p = truncated.p(az, bz, self.zmin, self.zmax)
                return p * f(bz, self.x0)
            sz_eff = np.sqrt(mz_hat + (mz_hat**2) * tau_z)
            mu = gaussian_measure(0, sz_eff, integrand)
        else:
            def integrand(z, xi_b):
                """
                The integrand must return 1_R(z)*f(bz, y) but f(bz, y) is slow to
                compute. If z is outside R we directly return 0 to bypass the
                computation of f(bz, y).
                """
                if not self.strict_indicator(z):
                    return 0
                bz = mz_hat * z + np.sqrt(mz_hat) * xi_b
                y = self.x(z)
                return f(bz, y)
            mu = gaussian_measure_2d(0, np.sqrt(tau_z), 0, 1, integrand)
        return mu

    def measure(self, y, f):
        if not self.contains(y):
            return 0
        if self.slope == 0:
            return quad(f, self.zmin, self.zmax)[0]
        else:
            z = (y - self.x0) / self.slope
            return f(z)


class PiecewiseLinearLikelihood(Likelihood):
    def __init__(self, name, regions, y, y_name, isotropic):
        self.y_name = y_name
        self.size = self.get_size(y)
        self.isotropic = isotropic
        self.repr_init()
        self.name = name
        self.y = y
        self.regions = [LinearRegionLikelihood(**region) for region in regions]
        self.n_regions = len(regions)

    def sample(self, Z):
        X = sum(region.sample(Z) for region in self.regions)
        return X

    def math(self):
        return r"$\textrm{" + self.name + r"}$"

    def scalar_backward_mean(self, az, bz, y):
        rs = [region.backward_mean(az, bz, y) for region in self.regions]
        As = [region.log_partitions(az, bz, y) for region in self.regions]
        ps = softmax(As, axis=0)
        rz = sum(p*r for p, r in zip(ps, rs))
        return rz

    def scalar_backward_variance(self, az, bz, y):
        rs = [region.backward_mean(az, bz, y) for region in self.regions]
        vs = [region.backward_variance(az, bz, y) for region in self.regions]
        As = [region.log_partitions(az, bz, y) for region in self.regions]
        ps = softmax(As, axis=0)
        Dr = sum(
            ps[i]*ps[j]*(rs[i] - rs[j])**2
            for i in range(self.n_regions)
            for j in range(i+1, self.n_regions)
        )
        vz = sum(p*v for p, v in zip(ps, vs)) + Dr
        return vz

    def scalar_log_partition(self, az, bz, y):
        As = [region.log_partitions(az, bz, y) for region in self.regions]
        A = logsumexp(As)
        return A

    def compute_backward_posterior(self, az, bz, y):
        rs = [region.backward_mean(az, bz, y) for region in self.regions]
        vs = [region.backward_variance(az, bz, y) for region in self.regions]
        As = [region.log_partitions(az, bz, y) for region in self.regions]
        ps = softmax(As, axis=0)
        rz = sum(p*r for p, r in zip(ps, rs))
        Dr = sum(
            ps[i]*ps[j]*(rs[i] - rs[j])**2
            for i in range(self.n_regions)
            for j in range(i+1, self.n_regions)
        )
        vz = sum(p*v for p, v in zip(ps, vs)) + Dr
        if self.isotropic:
            vz = vz.mean()
        return rz, vz

    def compute_log_partition(self, az, bz, y):
        As = [region.log_partitions(az, bz, y) for region in self.regions]
        A = logsumexp(As, axis=0)
        return A.mean()

    def b_measure(self, mz_hat, qz_hat, tz0_hat, f):
        mu = sum(
            region.b_measure(mz_hat, qz_hat, tz0_hat, f) for region in self.regions
        )
        return mu

    def bz_measure(self, mz_hat, qz_hat, tz0_hat, f):
        mu = sum(
            region.bz_measure(mz_hat, qz_hat, tz0_hat, f) for region in self.regions
        )
        return mu

    def beliefs_measure(self, az, tau_z, f):
        mu = sum(
            region.beliefs_measure(az, tau_z, f) for region in self.regions
        )
        return mu

    def measure(self, y, f):
        mu = sum(region.measure(y, f) for region in self.regions)
        return mu


class SgnLikelihood(PiecewiseLinearLikelihood):
    def __init__(self, y, y_name="y", isotropic=True):
        neg = dict(zmin=-np.inf, zmax=0, slope=0, x0=-1)
        pos = dict(zmin=0, zmax=+np.inf, slope=0, x0=+1)
        regions = [pos, neg]
        super().__init__("sgn", regions, y, y_name, isotropic)


class AbsLikelihood(PiecewiseLinearLikelihood):
    def __init__(self, y, y_name="y", isotropic=True):
        neg = dict(zmin=-np.inf, zmax=0, slope=-1, x0=0)
        pos = dict(zmin=0, zmax=+np.inf, slope=+1, x0=0)
        regions = [pos, neg]
        super().__init__("abs", regions, y, y_name, isotropic)


[docs]class AsymmetricAbsLikelihood(PiecewiseLinearLikelihood): def __init__(self, y, y_name="y", isotropic=True, shift=1e-4): neg = dict(zmin=-np.inf, zmax=shift, slope=-1, x0=0) pos = dict(zmin=shift, zmax=+np.inf, slope=+1, x0=0) regions = [pos, neg] super().__init__("a-abs", regions, y, y_name, isotropic)
[docs]class ReluLikelihood(PiecewiseLinearLikelihood): def __init__(self, y, y_name="y", isotropic=True): neg = dict(zmin=-np.inf, zmax=0, slope=0, x0=0) pos = dict(zmin=0, zmax=+np.inf, slope=1, x0=0) regions = [pos, neg] super().__init__("relu", regions, y, y_name, isotropic)
[docs]class LeakyReluLikelihood(PiecewiseLinearLikelihood): def __init__(self, slope, y, y_name="y", isotropic=True): self.slope = slope neg = dict(zmin=-np.inf, zmax=0, slope=slope, x0=0) pos = dict(zmin=0, zmax=np.inf, slope=1, x0=0) regions = [pos, neg] super().__init__("l-relu", regions, y, y_name, isotropic)
[docs]class HardTanhLikelihood(PiecewiseLinearLikelihood): def __init__(self, y, y_name="y", isotropic=True): neg = dict(zmin=-np.inf, zmax=-1, slope=0, x0=-1) mid = dict(zmin=-1, zmax=+1, slope=1, x0=0) pos = dict(zmin=+1, zmax=+np.inf, slope=0, x0=+1) regions = [pos, mid, neg] super().__init__("h-tanh", regions, y, y_name, isotropic)
[docs]class HardSigmoidLikelihood(PiecewiseLinearLikelihood): def __init__(self, y, y_name="y", isotropic=True): l = 2.5 neg = dict(zmin=-np.inf, zmax=-l, slope=0, x0=0) mid = dict(zmin=-l, zmax=+l, slope=1/(2*l), x0=0.5) pos = dict(zmin=l, zmax=np.inf, slope=0, x0=1) regions = [pos, mid, neg] super().__init__("h-sigm", regions, y, y_name, isotropic)
[docs]class SymmetricDoorLikelihood(PiecewiseLinearLikelihood): def __init__(self, width, y, y_name="y", isotropic=True): self.width = width neg = dict(zmin=-np.inf, zmax=-width, slope=0, x0=+1) mid = dict(zmin=-width, zmax=+width, slope=0, x0=-1) pos = dict(zmin=+width, zmax=+np.inf, slope=0, x0=+1) regions = [pos, mid, neg] super().__init__("door", regions, y, y_name, isotropic)