PCA probabilistico

Probabilistica analisi principale componenti (PCA) è una tecnica di riduzione dimensionalità che analizza i dati tramite uno spazio latente dimensionale inferiore ( ribaltamento e Bishop 1999 ). Viene spesso utilizzato quando mancano valori nei dati o per il ridimensionamento multidimensionale.

Importazioni

import functools
import warnings

import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns

import tensorflow.compat.v2 as tf
import tensorflow_probability as tfp

from tensorflow_probability import bijectors as tfb
from tensorflow_probability import distributions as tfd

tf
.enable_v2_behavior()

plt
.style.use("ggplot")
warnings
.filterwarnings('ignore')

Il modello

Si consideri un insieme di dati X={xn} di N punti di dati, in cui ogni punto dati è D-dimensionale,  mathbfxn in mathbbRD\(.Weaimtorepresenteach\) mathbfxN sotto una variabile latente znRK con dimensione inferiore, K<D\(.Thesetofprincipalaxes\) mathbfW riguarda le variabili latenti ai dati.

Nello specifico, assumiamo che ogni variabile latente sia distribuita normalmente,

znN(0,I).

Il punto dati corrispondente viene generato tramite una proiezione,

xnznN(Wzn,σ2I),

dove la matrice WRD×K sono noti come assi principali. In PCA probabilistico, stiamo tipicamente interessati a stimare gli assi principali W e il termine rumoreσ2.

La PCA probabilistica generalizza la PCA classica. Emarginando la variabile latente, la distribuzione di ciascun punto dati è

xnN(0,WW+σ2I).

PCA classica è il caso specifico di PCA probabilistica quando la covarianza del rumore diventa infinitamente piccolo, σ20.

Abbiamo impostato il nostro modello di seguito. Nella nostra analisi, assumiamo σ è noto, invece di punto stimare W come parametro modello, poniamo una prima su di esso al fine di dedurre una distribuzione su assi principali. Ci esprimiamo il modello come una TFP JointDistribution, in particolare, useremo JointDistributionCoroutineAutoBatched .

def probabilistic_pca(data_dim, latent_dim, num_datapoints, stddv_datapoints):
  w
= yield tfd.Normal(loc=tf.zeros([data_dim, latent_dim]),
                 scale
=2.0 * tf.ones([data_dim, latent_dim]),
                 name
="w")
  z
= yield tfd.Normal(loc=tf.zeros([latent_dim, num_datapoints]),
                 scale
=tf.ones([latent_dim, num_datapoints]),
                 name
="z")
  x
= yield tfd.Normal(loc=tf.matmul(w, z),
                       scale
=stddv_datapoints,
                       name
="x")
num_datapoints = 5000
data_dim
= 2
latent_dim
= 1
stddv_datapoints
= 0.5

concrete_ppca_model
= functools.partial(probabilistic_pca,
    data_dim
=data_dim,
    latent_dim
=latent_dim,
    num_datapoints
=num_datapoints,
    stddv_datapoints
=stddv_datapoints)

model
= tfd.JointDistributionCoroutineAutoBatched(concrete_ppca_model)

I dati

Possiamo usare il modello per generare dati campionando dalla distribuzione precedente congiunta.

actual_w, actual_z, x_train = model.sample()

print("Principal axes:")
print(actual_w)
Principal axes:
tf.Tensor(
[[ 2.2801023]
 [-1.1619819]], shape=(2, 1), dtype=float32)

Visualizziamo il dataset.

plt.scatter(x_train[0, :], x_train[1, :], color='blue', alpha=0.1)
plt
.axis([-20, 20, -20, 20])
plt
.title("Data set")
plt
.show()

png

Inferenza massima a posteriori

Per prima cosa cerchiamo la stima puntuale delle variabili latenti che massimizzi la densità di probabilità a posteriori. Questo è noto come massimo a posteriori (MAP) inferenza, e viene fatto calcolando i valori di W e Z che massimizzano la densità posteriori p(W,ZX)p(W,Z,X).

w = tf.Variable(tf.random.normal([data_dim, latent_dim]))
z
= tf.Variable(tf.random.normal([latent_dim, num_datapoints]))

target_log_prob_fn
= lambda w, z: model.log_prob((w, z, x_train))
losses
= tfp.math.minimize(
   
lambda: -target_log_prob_fn(w, z),
    optimizer
=tf.optimizers.Adam(learning_rate=0.05),
    num_steps
=200)
plt.plot(losses)
[<matplotlib.lines.Line2D at 0x7f19897a42e8>]

png

Possiamo utilizzare il modello di dati di esempio per i valori desunti per W e Z, e confrontare il set di dati effettivo che condizionati su.

print("MAP-estimated axes:")
print(w)

_
, _, x_generated = model.sample(value=(w, z, None))

plt
.scatter(x_train[0, :], x_train[1, :], color='blue', alpha=0.1, label='Actual data')
plt
.scatter(x_generated[0, :], x_generated[1, :], color='red', alpha=0.1, label='Simulated data (MAP)')
plt
.legend()
plt
.axis([-20, 20, -20, 20])
plt
.show()
MAP-estimated axes:
<tf.Variable 'Variable:0' shape=(2, 1) dtype=float32, numpy=
array([[ 2.9135954],
       [-1.4826864]], dtype=float32)>

png

Inferenza variazionale

MAP può essere utilizzato per trovare la modalità (o una delle modalità) della distribuzione a posteriori, ma non fornisce altri approfondimenti al riguardo. Abbiamo poi usiamo inferenza variazionale, dove il posteriore distribtion p(W,ZX) viene approssimata con una distribuzione variazionale q(W,Z) parametrizzato per λ. L'obiettivo è quello di trovare i parametri variazionali λ che minimizzano la divergenza KL tra q e posteriore, KL(q(W,Z)∣∣p(W,ZX)), o equivalentemente, che massimizza l'evidenza limite inferiore, Eq(W,Z;λ)[logp(W,Z,X)logq(W,Z;λ)].

qw_mean = tf.Variable(tf.random.normal([data_dim, latent_dim]))
qz_mean
= tf.Variable(tf.random.normal([latent_dim, num_datapoints]))
qw_stddv
= tfp.util.TransformedVariable(1e-4 * tf.ones([data_dim, latent_dim]),
                                        bijector
=tfb.Softplus())
qz_stddv
= tfp.util.TransformedVariable(
   
1e-4 * tf.ones([latent_dim, num_datapoints]),
    bijector
=tfb.Softplus())
def factored_normal_variational_model():
  qw
= yield tfd.Normal(loc=qw_mean, scale=qw_stddv, name="qw")
  qz
= yield tfd.Normal(loc=qz_mean, scale=qz_stddv, name="qz")

surrogate_posterior
= tfd.JointDistributionCoroutineAutoBatched(
    factored_normal_variational_model
)

losses
= tfp.vi.fit_surrogate_posterior(
    target_log_prob_fn
,
    surrogate_posterior
=surrogate_posterior,
    optimizer
=tf.optimizers.Adam(learning_rate=0.05),
    num_steps
=200)
print("Inferred axes:")
print(qw_mean)
print("Standard Deviation:")
print(qw_stddv)

plt
.plot(losses)
plt
.show()
Inferred axes:
<tf.Variable 'Variable:0' shape=(2, 1) dtype=float32, numpy=
array([[ 2.4168603],
       [-1.2236133]], dtype=float32)>
Standard Deviation:
<TransformedVariable: dtype=float32, shape=[2, 1], fn="softplus", numpy=
array([[0.0042499 ],
       [0.00598824]], dtype=float32)>

png

posterior_samples = surrogate_posterior.sample(50)
_
, _, x_generated = model.sample(value=(posterior_samples))

# It's a pain to plot all 5000 points for each of our 50 posterior samples, so
# let's subsample to get the gist of the distribution.
x_generated
= tf.reshape(tf.transpose(x_generated, [1, 0, 2]), (2, -1))[:, ::47]

plt
.scatter(x_train[0, :], x_train[1, :], color='blue', alpha=0.1, label='Actual data')
plt
.scatter(x_generated[0, :], x_generated[1, :], color='red', alpha=0.1, label='Simulated data (VI)')
plt
.legend()
plt
.axis([-20, 20, -20, 20])
plt
.show()

png

Ringraziamenti

Questo tutorial è stato originariamente scritto in Edward 1.0 ( fonte ). Ringraziamo tutti coloro che hanno contribuito alla stesura e alla revisione di tale versione.

Riferimenti

[1]: Michael E. Tipping e Christopher M. Bishop. Analisi probabilistica delle componenti principali. Ufficiale della Royal Statistical Society: Serie B (metodologia statistica), 61 (3): 611-622, 1999.