# Réseaux Génératifs Antagonistes

Dans ce TP nous allons mettre en place l'entraînement d'un réseau de neurone génératif, entraîné de manière antagoniste à l'aide d'un réseau discriminateur. 

<center> <img src="https://drive.google.com/uc?id=1_ADmA-Js37z6R-0o476dzX4jMG5WHLtr" width=600></center>
<caption><center> Schéma global de fonctionnement d'un GAN ([Goodfellow 2014]) </center></caption>

Dans un premier temps, nous allons illustrer le fonctionnement du GAN sur l'exemple simple, canonique, de la base de données MNIST. 
Votre objectif sera par la suite d'adapter cet exemple à la base de données *Labelled Faces in the Wild*, et éventuellement d'implémenter quelques astuces permettant d'améliorer l'entrainement.


In [1]:
import tensorflow as tf
from tensorflow import keras
from keras import layers
import numpy as np
import os
import matplotlib.pyplot as plt

On commence par définir les réseaux discriminateur et générateur, en suivant les recommandations de DCGAN (activation *LeakyReLU*, *stride*, *Batch Normalization*, activation de sortie *tanh* pour le générateur)

In [3]:
latent_dim = 128
discriminator = keras.Sequential(
    [
        keras.Input(shape=(28, 28, 1)),
        layers.Conv2D(64, (3, 3), strides=(2, 2), padding="same"),
        layers.BatchNormalization(momentum = 0.8),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(128, (3, 3), strides=(2, 2), padding="same"),
        layers.BatchNormalization(momentum = 0.8),
        layers.LeakyReLU(alpha=0.2),
        layers.GlobalMaxPooling2D(),
        layers.Dense(1, activation="sigmoid"),
    ],
    name="discriminator",
)
discriminator.summary()

generator = keras.Sequential(
    [
        keras.Input(shape=(latent_dim,)),
        layers.Dense(7 * 7 * 128),        
        layers.BatchNormalization(momentum = 0.8),
        layers.LeakyReLU(alpha=0.2),
        layers.Reshape((7, 7, 128)),
        layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
        layers.BatchNormalization(momentum = 0.8),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
        layers.BatchNormalization(momentum = 0.8),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(1, (7, 7), padding="same", activation="tanh"),
    ],
    name="generator",
)
generator.summary()

Model: "discriminator"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d_3 (Conv2D)           (None, 14, 14, 64)        640       
                                                                 
 batch_normalization_5 (Batc  (None, 14, 14, 64)       256       
 hNormalization)                                                 
                                                                 
 leaky_re_lu_5 (LeakyReLU)   (None, 14, 14, 64)        0         
                                                                 
 conv2d_4 (Conv2D)           (None, 7, 7, 128)         73856     
                                                                 
 batch_normalization_6 (Batc  (None, 7, 7, 128)        512       
 hNormalization)                                                 
                                                                 
 leaky_re_lu_6 (LeakyReLU)   (None, 7, 7, 128)       

Le code suivant décrit ce qui se passe à chaque itération de l'algorithme, ce qui est également résumé dans le cours sur le slide suivant : 

<center> <img src="https://drive.google.com/uc?id=1I6KesJZeSN_p_mx5nkAsVUeMmUKfIYB_" width=600></center>


In [4]:
# Instanciation de deux optimiseurs, l'un pour le discrimnateur et l'autre pour le générateur
d_optimizer = keras.optimizers.Adam(learning_rate=0.0008)
g_optimizer = keras.optimizers.Adam(learning_rate=0.0004)

# Instanciation d'une fonction de coût entropie croisée
loss_fn = keras.losses.BinaryCrossentropy(from_logits=True)


# La fonction prend en entrée un mini-batch d'images réelles
@tf.function
def train_step(real_images):
    batch_size = tf.shape(real_images)[0]

    # ENTRAINEMENT DU DISCRIMINATEUR
    # Échantillonnage d’un mini-batch de bruit
    random_latent_vectors = tf.random.normal(shape=(batch_size, latent_dim,))
    # Création d'un mini-batch d'images générées à partir du bruit
    generated_images = generator(random_latent_vectors)
    # Échantillonnage d’un mini-batch de données combinant images générées et réelles
    combined_images = tf.concat([generated_images, real_images], axis=0)

    # Création des labels associés au mini-batch de données créé précédemment
    # Pour l'entraînement du discriminateur :
    #   - les données générées sont labellisées "0" 
    #   - les données réelles sont labellisées "1" 
    labels = tf.concat([tf.zeros((batch_size, 1)), tf.ones((batch_size, 1))], axis=0)

    # Entraînement du discriminateur
    with tf.GradientTape() as tape:
        # L'appel d'un modèle (ici discriminator) à l'intérieur de Tf.GradientTape
        # permet de récupérer les gradients pour faire la mise à jour

        # Prédiction du discriminateur sur notre batch d'images réelles et générées
        predictions = discriminator(combined_images)
        # Calcul de la fonction de coût
        d_loss = loss_fn(labels, predictions)

    # Récupération des gradients de la fonction de coût par rapport aux paramètres du discriminateur
    grads = tape.gradient(d_loss, discriminator.trainable_weights)
    # Mise à jour des paramètres par l'optimiseur grâce aux gradients de la fonction de coût
    d_optimizer.apply_gradients(zip(grads, discriminator.trainable_weights))
    ### NOTE : ON N'ENTRAINE PAS LE GENERATEUR A CE MOMENT !

    # ENTRAINEMENT DU GENERATEUR
    # Échantillonnage d’un mini-batch de bruit
    random_latent_vectors = tf.random.normal(shape=(batch_size, latent_dim,))
    # Création des labels associés au mini-batch de données créé précédemment
    # Pour l'entraînement du générateur :
    #   - les données générées sont labellisées ici "1"  
    misleading_labels = tf.ones((batch_size, 1))

    # Entraînement du générateur sans toucher aux paramètres du discriminateur !
    with tf.GradientTape() as tape:
        predictions = discriminator(generator(random_latent_vectors))
        g_loss = loss_fn(misleading_labels, predictions)
        
    # Récupération des gradients de la fonction de coût par rapport aux paramètres du générateur
    grads = tape.gradient(g_loss, generator.trainable_weights)
    # Mise à jour des paramètres par l'optimiseur grâce aux gradients de la fonction de coût
    g_optimizer.apply_gradients(zip(grads, generator.trainable_weights))

    return d_loss, g_loss, generated_images

Il reste à écrire l'algorithme final qui va faire appel au code d'itération écrit précédemment

In [5]:
# Préparation de la base de données : on utilise toutes les images (entraînement + test) de MNIST
batch_size = 32
(x_train, _), (x_test, _) = keras.datasets.mnist.load_data()
all_digits = np.concatenate([x_train, x_test])
all_digits = (all_digits.astype("float32")-127.5) / 127.5 # Images normalisées
all_digits = np.reshape(all_digits, (-1, 28, 28, 1))
dataset = tf.data.Dataset.from_tensor_slices(all_digits)
dataset = dataset.shuffle(buffer_size=1024).batch(batch_size)

epochs = 20  # Une 20aine d'epochs est nécessaire pour voir des chiffres qui semblent réalistes

for epoch in range(epochs):
    print("\nStart epoch", epoch)

    for step, real_images in enumerate(dataset):
        # Descente de gradient simultanée du discrimnateur et du générateur
        d_loss, g_loss, generated_images = train_step(real_images)

        # Affichage régulier d'images générées.
        if step % 200 == 0:
            # Métriques
            print("Perte du discriminateur à l'étape %d: %.2f" % (step, d_loss))
            print("Perte du générateur à l'étape %d: %.2f" % (step, g_loss))

            plt.figure(figsize=(20, 4))
            for i in range(10):
              plt.subplot(1,10, i+1)
              plt.imshow(generated_images[i, :, :, 0]*128+128, cmap='gray')
              
            plt.show()



Start epoch 0


TypeError: in user code:

    File "/tmp/ipykernel_11002/1607979120.py", line 26, in train_step  *
        labels = tf.concat(tf.zeros((batch_size, 1)), tf.ones((batch_size, 1)), axis=0)

    TypeError: Got multiple values for argument 'axis'


# Travail à faire :

Prenez le temps de lire, de comprendre et de compléter le code qui vous est fourni. Observez attentivement l'évolution des métriques ainsi que les images générées au cours de l'entraînement. L'objectif de ce TP est d'abord de vous fournir un exemple de code implémentant les GANs, mais surtout de vous faire sentir la difficulté d'entraîner ces modèles.

Dans la suite du TP, nous vous fournissons ci-dessous un code de chargement de la base de données de visages *Labelled Faces in the Wild*. Votre objectif est donc d'adapter le code précédent pour générer non plus des chiffres mais des visages.

Quelques précisions importantes, et indications : 


*   MNIST est une base de données d'images noir et blanc de dimension 28 $\times$ 28, LFW est une base de données d'images couleur de dimension 32 $\times$ 32 $\times$ 3
*   La diversité des visages est bien plus grande que celle des chiffres ; votre générateur doit donc être un peu plus complexe que celui utilisé ici (plus de couches, et/ou plus de filtres par exemple) 
*   Pour faire fonctionner ce second exemple, il pourrait être nécessaire de modifier quelques hyperparamètres (dimension de l'espace latent, taux d'apprentissage des générateur et discriminateur, etc.)




Le code suivant télécharge et prépare les données de la base LFW.

In [None]:
import pandas as pd
import tarfile, tqdm, cv2, os
from sklearn.model_selection import train_test_split
import numpy as np

# Télécharger les données de la base de données "Labelled Faces in the Wild"
!wget http://www.cs.columbia.edu/CAVE/databases/pubfig/download/lfw_attributes.txt
!wget http://vis-www.cs.umass.edu/lfw/lfw-deepfunneled.tgz
!wget http://vis-www.cs.umass.edu/lfw/lfw.tgz
  
ATTRS_NAME = "lfw_attributes.txt"
IMAGES_NAME = "lfw-deepfunneled.tgz"
RAW_IMAGES_NAME = "lfw.tgz"

def decode_image_from_raw_bytes(raw_bytes):
    img = cv2.imdecode(np.asarray(bytearray(raw_bytes), dtype=np.uint8), 1)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    return img

def load_lfw_dataset(
        use_raw=False,
        dx=80, dy=80,
        dimx=45, dimy=45):

    # Read attrs
    df_attrs = pd.read_csv(ATTRS_NAME, sep='\t', skiprows=1)
    df_attrs = pd.DataFrame(df_attrs.iloc[:, :-1].values, columns=df_attrs.columns[1:])
    imgs_with_attrs = set(map(tuple, df_attrs[["person", "imagenum"]].values))

    # Read photos
    all_photos = []
    photo_ids = []

    # tqdm in used to show progress bar while reading the data in a notebook here, you can change
    # tqdm_notebook to use it outside a notebook
    with tarfile.open(RAW_IMAGES_NAME if use_raw else IMAGES_NAME) as f:
        for m in tqdm.tqdm_notebook(f.getmembers()):
            # Only process image files from the compressed data
            if m.isfile() and m.name.endswith(".jpg"):
                # Prepare image
                img = decode_image_from_raw_bytes(f.extractfile(m).read())

                # Crop only faces and resize it
                img = img[dy:-dy, dx:-dx]
                img = cv2.resize(img, (dimx, dimy))

                # Parse person and append it to the collected data
                fname = os.path.split(m.name)[-1]
                fname_splitted = fname[:-4].replace('_', ' ').split()
                person_id = ' '.join(fname_splitted[:-1])
                photo_number = int(fname_splitted[-1])
                if (person_id, photo_number) in imgs_with_attrs:
                    all_photos.append(img)
                    photo_ids.append({'person': person_id, 'imagenum': photo_number})

    photo_ids = pd.DataFrame(photo_ids)
    all_photos = np.stack(all_photos).astype('uint8')

    # Preserve photo_ids order!
    all_attrs = photo_ids.merge(df_attrs, on=('person', 'imagenum')).drop(["person", "imagenum"], axis=1)

    return all_photos, all_attrs

# Prépare le dataset et le charge dans la variable X
X, attr = load_lfw_dataset(use_raw=True, dimx=32, dimy=32)
# Normalise les images
X = (X.astype("float32")-127.5)/127.5
