Skip to content

valentinamilighetti/Distributed-image-search

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

38 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Ricerca di immagini simili con Hadoop, Spark, PyTorch e Milvus

Questo progetto realizza un motore di ricerca per immagini su un database distribuito, sfruttando tecniche di image embedding e un database vettoriale per l’indicizzazione e la ricerca per similarità.

L’obiettivo è fornire un’interfaccia semplice che permetta all’utente di caricare un’immagine e ottenere come risultato le k immagini più simili. Un sistema di questo tipo può risultare utile in molti ambiti, come ad esempio:

  • E-commerce: ricerca di prodotti visivamente simili
  • Copyright: individuazione di utilizzi non autorizzati di un’immagine
  • Social media: suggerimento di contenuti coerenti a quelli già apprezzati dall’utente
  • Arte e creatività: esplorazione di riferimenti visivi e fonti di ispirazione

Nello specifico, sono stati calcolati gli embedding, delle rappresentazioni vettoriali dense delle immagini provenienti da un dataset di dimensione circa 30000, tali per cui a immagini simili corrispondono vettori vicini. Una volta ricevuta un'immagine di query, viene calcolato il suo embedding e si ricercano le immagini con embedding più vicino, secondo la metrica coseno.

Il workflow appena descritto è sintetizzato dalla seguente immagine:

Processo ricerca per similarità

Fonte: Hugging Face

Per rendere il sistema scalabile ed efficiente, è stato utilizzato un cluster Hadoop con due nodi e Spark per il calcolo distribuito degli embedding. Le immagini sono state caricate su HDFS e gli embedding sul database vettoriale Milvus.


Tecnologie utilizzate

  • Hadoop su cluster a 2 VM (1 master namenode e 1 worker datanode1)
  • Spark 3.5.6 per il calcolo distribuito
  • PyTorch per il calcolo degli embedding delle immagini
  • Milvus come database vettoriale per l’indicizzazione e la ricerca
  • FastAPI come backend per l’esposizione delle API di ricerca

Configurazione Cluster

Di seguito sono descritti tutti i passaggi necessari alla configurazione delle due macchine per operare con le tecnologie sopra citate.

Hadoop

Come cluster Hadoop è stato realizzato un cluster minimale costituito da 2 VM VirtualBox:

  • 1 VM master chiamata namenode, che svolge sia il ruolo di NameNode che di DataNode. (NOTA: in un cluster reale è consigliato mantenere il namenode e i datanodes su macchine distinte per una maggiore stabilità e resilienza del cluster)
  • 1 VM worker chiamata datanode1, che svolge il ruolo di DataNode

Per l'installazione di Hadoop è stata seguita la seguente guida che spiega passo-passo l'installazione e la configurazione su 3 VM (1 NameNode e 2 DataNode) adattandola al caso specifico di 2 VM.

Di seguito sono descritti i passaggi chiave per l'installazione del cluster e, in particolare, sono riportati i comandi diversi rispetto alla guida:

Ambiente VirtualBox

  1. Creare in VirtualBox una nuova rete con NAT, a cui saranno connesse le 2 VM. Per questo esempio è stata creata una rete privata con indirizzo 192.168.100.1/24

VM Master NameNode

  1. Creare inizialmente 1 VM con una distribuzione Linux leggera basata su Ubuntu, ad esempio Lubuntu 24.04

    Si consiglia di assegnare alla VM che fungerà il ruolo di Master un minimo di 8 GB di RAM e 2 vCPU

  2. Installare e configurare SSH, necessario per la comunicazione tra i nodi del cluster

  3. Installare Java openjdk versione 11 (al posto della versione 8)

    sudo apt install openjdk-11-jdk
  4. Scaricare ed estrarre Hadoop versione 3.4.1 da apache.org

    # download archivio nella home
    sudo wget -P ~ https://dlcdn.apache.org/hadoop/common/hadoop-3.4.1/hadoop-3.4.1.tar.gz
    # estrarre e rinominare l\'archivio
    tar xzf hadoop-3.4.1.tar.gz
    mv hadoop-3.4.1 hadoop
    # spostare la cartella hadoop in usr/local/
    sudo mv hadoop /usr/local/hadoop

    modificare il file di configurazione hadoop_env.sh

    nano ~/hadoop/etc/hadoop/hadoop-env.sh
    
    # aggiungere in fondo al file il riferimento a JAVA_HOME
    export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64/
  5. Modificare variabili di ambiente per hadoop

    #modificare le variabili di sistema con il comando
    sudo nano /etc/environment
    
    #aggiungere al file i seguenti valori
    PATH="/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/usr/local/hadoop/bin:/usr/local/hadoop/sbin"
    JAVA_HOME="/usr/lib/jvm/java-11-openjdk-amd64/"
  6. creare un nuovo utente hadoopuser nel sistema, che verrà usato dai nodi del cluster, e fornirgli tutti i permessi di root

    NOTA: Da questo punto tutti i passaggi seguenti vanno eseguiti accedendo con il nuovo utente hadoopuser

Ambiente VirtualBox

  1. Nell'ambiente VirtualBox clonare la VM appena creata per instanziare la VM worker Datanode

    Si consiglia di assegnare alla VM che svolgerà il ruolo di Worker un minimo di 6 GB di RAM e 2 vCPU

VM Master e Worker

  1. Modificare gli hostname di entrambe le VM usando il comando

    sudo nano /etc/hostname

    in questo caso, la VM Master è stata chiamata namenode, mentre la VM worker è stata chiamata datanode1

  2. Modificare in entrambe le VM il file /etc/hosts che associa gli indirizzi ip delle macchine con il loro hostname

    # ottenere indirizzo ip associato a entrambe le VM
    ip addr
    #modificare il file hosts in ciascuna VM
    sudo nano /etc/hosts

    ad esempio, il file hosts in questo caso è il seguente (da adattare alla configurazione specifica del cluster):

    192.168.100.4 namenode
    192.168.100.5 datanode1
  3. In ciascuna VM creare la chiave SSH e condividerla con l'altra VM

VM Master

  1. Configurare la porta di Hadoop modificando il file core-site.xml

    sudo nano /usr/local/hadoop/etc/hadoop/core-site.xml
    
    # modificare il file
    # aggiungendo la seguente <property> nel segmento <configuration>
    <property>
      <name>fs.defaultFS</name>
      <value>hdfs://namenode:9000</value>
    </property>
  2. Configurare HDFS

    sudo nano /usr/local/hadoop/etc/hadoop/hdfs-site.xml
    
    # modificare il file
    # aggiungendo le seguenti <property> nel segmento <configuration>
    <property>
      <name>dfs.namenode.name.dir</name>
      <value>/usr/local/hadoop/data/nameNode</value>
    </property>
    <property>
      <name>dfs.datanode.data.dir</name>
      <value>/usr/local/hadoop/data/dataNode</value>
    </property>
    <property>
      <name>dfs.replication</name>
      <value>2</value>
    </property>
  3. Definire i nodi worker del cluster nel file /hadoop/workers, in questa configurazione entrambi i nodi svolgono il ruolo di worker

    sudo nano /usr/local/hadoop/etc/hadoop/workers
    
    # all'interno del file scrivere gli hostname dei worker
    namenode
    datanode1
  4. copiare i file di configurazione di hadoop nel nodo worker datanode1

    scp /usr/local/hadoop/etc/hadoop/* datanode1:/usr/local/hadoop/etc/hadoop/
  5. salvare le configurazioni e formattare per poi avviare HDFS

    source /etc/environment
    hdfs namenode -format
    start-dfs.sh

VM Master e Worker

  1. configurare Yarn, il Resource Manager per il cluster Hadoop, su entrambi i nodi:

    sudo nano /usr/local/hadoop/etc/hadoop/yarn-site.xml
  • Configurazione per il nodo master namenode

    # aggiungere le seguenti <property> nel segmento <configuration>
    <property>
      <name>yarn.resourcemanager.hostname</name>
      <value>namenode</value>
    </property>
    <property>
      <name>yarn.scheduler.maximum-allocation-mb</name>
      <value>6144</value>
    </property>
    <property>
      <name>yarn.scheduler.maximum-allocation-vcores</name>
      <value>2</value>
    </property>
    <property>
      <name>yarn.nodemanager.resource.memory-mb</name>
      <value>2048</value>
    </property>
    <property>
      <name>yarn.nodemanager.resource.cpu-vcores</name>
      <value>1</value>
    </property>
  • Configurazione per il nodo worker datanode1

    # aggiungere le seguenti <property> nel segmento <configuration>
    <property>
      <name>yarn.resourcemanager.hostname</name>
      <value>namenode</value>
    </property>
    <property>
      <name>yarn.nodemanager.resource.memory-mb</name>
      <value>4096</value>
    </property>
    <property>
      <name>yarn.nodemanager.resource.cpu-vcores</name>
      <value>2</value>
    </property>

    NOTA: adattare i valori delle proprietà alle risorse disponibili nel proprio cluster. In particolare le proprietà yarn.scheduler definiscono le risorse massime allocabili da Yarn per l'intero cluster, mentre le proprietà yarn.nodemanager.resource definiscono le risorse allocabili per il singolo nodo. Per questa configurazione il nodo namenode funge sia da master che da worker Yarn quindi ha indicate entrambe le proprietà, mentre nel nodo worker sono indicate solo le proprietà yarn.nodemanager.resource

  • Per avviare Yarn eseguire dal nodo master il comando:

    # avviare Yarn
    start-yarn.sh

Spark versione 3.5.6

Fare riferimento alla seguente guida

  1. Scaricare l'archivio di Spark dal sito di apache.org nella versione 3.5.6

    wget https://dlcdn.apache.org/spark/spark-3.5.6/spark-3.5.6-bin-hadoop3.tgz
  2. Creare una directory dedicata dove estrarre il file tar

    mkdir ~/spark
    mv spark-3.5.6-bin-hadoop3.tgz spark/
    cd ~/spark
    tar -xvzf spark-3.5.6-bin-hadoop3.tgz
  3. Configurare il file spark-env.sh presente nella cartella conf di spark nel nodo master

    cd ~/spark/spark-3.5.6-bin-hadoop3/conf
    cp spark-env.sh.template spark-env.sh
    sudo nano spark-env.sh
    
    # aggiungere le seguenti variabili 
    # modificare l'indirizzo ip con quello del nodo master
    export SPARK_MASTER_HOST=192.168.0.4
    export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
  4. Creare il file slaves sul nodo master e inserire i nomi dei nodi esecutori di Spark, in questo configurazione a 2 VM sono namenode e datanode1 che eseguiranno entrambi i job Spark:

    namenode
    datanode1
  5. Per avviare master e slave di Spark usare il comando:

    start-all.sh

Python e pacchetti necessari

  • Creare il Virtual Environment con Python 3.11 su entrambi i nodi Master e Worker

    NOTA: Python 3.12 ha problemi di compatibilità con Spark e Pyarrow

    sudo apt install python3.11 python3.11-venv
    python3.11 -m venv pytorch_env
    source pytorch_env/bin/activate 
  • Installare i pacchetti python:

    # su entrambi i nodi
    pip install pyspark==3.5.6
    pip install pyarrow==12.0.1
    pip install numpy==1.26.4 pandas
    pip install torch torchvision --index-url https://download.pytorch.org/whl/cpu
    pip install pymilvus
    
    # solo sul master
    pip install notebook
    pip install jupyter

Configurazione completa variabili d'ambiente

  • Modificare il file ~/.bashrc aggiungendo tutte le variabili d'ambiente necessarie per Hadoop, Spark e Jupyter

    # aprire il file bashrc
    nano ~/.bashrc
  • Aggiungere in fondo al file bashrc del nodo Master le seguenti variabili, adattandole alla propria configurazione:

    # variabili per hadoop e yarn
    export HADOOP_HOME="/usr/local/hadoop"
    export HADOOP_COMMON_HOME=$HADOOP_HOME
    export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
    export HADOOP_HDFS_HOME=$HADOOP_HOME
    export HADOOP_MAPRED_HOME=$HADOOP_HOME
    export HADOOP_YARN_HOME=$HADOOP_HOME
    # variabili per spark e pyspark
    # modificare i percorsi alle cartelle se differenti
    export SPARK_HOME=~/spark/spark-3.5.6-bin-hadoop3
    export PATH=$SPARK_HOME/bin:$SPARK_HOME/sbin:$PATH
    export PYSPARK_PYTHON=~/pytorch_env/bin/python3
    export PYSPARK_DRIVER_PYTHON=jupyter
    # modificare indirizzo ip del server jupyter con quello del nodo master
    export PYSPARK_DRIVER_PYTHON_OPTS='notebook --ip=192.168.100.4 --no-browser --port=8889'
    export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
    export CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath --glob):$HADOOP_CONF_DIR
  • Aggiungere in fondo al file bashrc del nodo Worker le seguenti variabili:

    # variabili per spark e pyspark
    # modificare i percorsi alle cartelle se differenti
    export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
    export SPARK_HOME=~/spark/spark-3.5.6-bin-hadoop3
    export PATH=$SPARK_HOME/bin:$SPARK_HOME/sbin:$PATH
    export PYSPARK_PYTHON=~/pytorch_env/bin/python3
  • Aggiornare le variabili su entrambi i nodi con il comando:

    source ~/.bashrc

Milvus (nodo Master)

La versione utilizzata per questo progetto è Milus Standalone tramite Docker Compose, che utilizza 3 container attivi sul nodo master:

  • milvus-standalone, che contiene le funzionalità principali di Milvus
  • etcd, utilizzato per memorizzare metadati usati dagli altri componenti di Milvus
  • minio, un database ad oggetti usato per memorizzare i dati in modo persistente

1. Installazione di Docker Compose (sul nodo master)

sudo apt-get update
sudo apt-get install ca-certificates curl gnupg lsb-release
sudo mkdir -p /etc/apt/keyrings
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /etc/apt/keyrings/docker.gpg
echo \  "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.gpg] https://download.docker.com/linux/ubuntu \
  (lsb_release -cs) stable" | sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
sudo apt-get install docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin
  • Per avviare il servizio:
sudo systemctl start docker
# Abilita l'avvio di Docker all'avvio del sistema
sudo systemctl enable docker
# Verifica che il servizio Docker sia in esecuzione
sudo systemctl status docker
# Verifica l’istallazione di docker con l’immagine hello world:
sudo docker run hello-world
# Gestire Docker come utente non root: creazione del gruppo docker
sudo groupadd docker
# Aggiungi l’user al gruppo
sudo usermod -aG docker $USER
newgrp docker
# Verifica: esegui hello world senza sudo:
docker run hello-world
# Verifica l’istallazione del plugin docker compose
docker compose version

2. Download del file di configurazione Docker per Milvus

I seguenti comandi fanno riferimento alla guida per la configurazione di Milvus Standalone

# creare una cartella apposita per milvus
mkdir -p ~/milvus
cd ~/milvus
# scaricare il file docker dal repository di milvus
wget https://github.com/milvus-io/milvus/releases/download/v2.6.0/milvus-standalone-docker-compose.yml -O docker-compose.yml

Il file docker-compose.yml è stato modificato rispetto a quello predefinito, in modo da adattare i limiti di risorse utilizzate dai container in base a quelle del nodo master. È stata inoltre modificata la porta su cui è in ascolto minio, poichè già in uso da Hadoop.

  • Per avviare i container Milvus eseguire il seguente comando:

    docker compose up -d
    # per verificare lo stato dei container
    docker compose ps
  • Per terminare e chiudere i container:

    docker compose down

Backend API (nodo Master)

  • Il Backend API è realizzato in python con FastAPI ed eseguito in un web server Uvicorn

  • All'interno del virtual environment python sul nodo Master installare i seguenti pacchetti:

    pip install fastapi "uvicorn[standard]" python-multipart
  • Copiare i file necessari per il funzionamento del Server FastAPI presenti nella cartella /ServerFastAPI nel nodo master. In particolare:

    • il file main.py per il backend deve essere copiato nella directory home
    • i file index.html e style.css per il frontend devono essere copiati all'interno di una cartella /static in home
  • Per avviare il server basta portarsi nella directory home dove si trova il file main.py ed eseguire il comando:

    uvicorn main:app --reload
  • Il codice per il funzionamento del server è descritto in dettaglio nella sezione Applicazione web per la ricerca delle immagini

Dataset

Il progetto utilizza il dataset Flickr30k Images, che contiene circa 30000 immagini reali provenienti da Flickr.

Dopo il download del dataset è necessario il suo caricamento su HDFS, attraverso il comando

hadoop distcp file:///home/hadoopuser/flickr30k_images hdfs:///user/hadoopuser/flickr30k_images

Notebook: calcolo degli embedding e caricamento su Milvus

Il notebook Jupyter image_embedding_spark.ipynb ha il compito di leggere le immagini dal cluster HDFS, calcolare i vettori di embedding per ciascuna di esse in modo distribuito utilizzando Spark e, infine, indicizzare questi vettori nel database Milvus per renderli ricercabili.

Di seguito sono illustrati i passaggi chiave del processo.

1. Caricamento del modello ResNet50

Per il calcolo degli embedding dalle immagini è stato utilizzato il modello ResNet50, una rete convoluzionale molto utilizzata per questa tipologia di task. Il modello pre-addestrato è disponibile su torchvision come torchvision.models.resnet50 con pesi ResNet50_Weights.DEFAULT.

  • I pesi del modello vengono salvati e distribuiti agli esecutori Spark
    import torch
    import torchvision.models as models
    
    # Carica il modello sul nodo driver
    weights = models.ResNet50_Weights.DEFAULT
    model = models.resnet50(weights=weights)
    state_dict = model.state_dict()
    
    # Salva i pesi e li rende disponibili a tutti i nodi del cluster
    torch.save(state_dict, "/tmp/resnet50_statedict2.pth")
    sc.addFile("/tmp/resnet50_statedict2.pth")
    Il salvataggio dei pesi del modello in un file nello SparkContext permette di distribuire in modo efficiente il modello a tutti gli esecutori che ne avranno bisogno per il calcolo.

2. Lettura del dataset da HDFS

Le immagini vengono caricate da HDFS come file binari in un DataFrame Spark:

df = spark.read.format("binaryFile").load("hdfs:///user/hadoopuser/flickr30k_images/flickr30k_images/").select("path", "content")

3. Generazione distribuita degli Embedding

Questo è il passaggio computazionalmente più oneroso, per cui l'inferenza del modello viene eseguita in parallelo su tutti i nodi worker utilizzando una User Defined Function (UDF) ottimizzata per l'elaborazione in batch (predict_batch_udf). La logica è la seguente:

  • Inizializzazione per Worker: su ciascun esecutore Spark, una funzione (make_resnet_fn) carica il modello ResNet50 leggendo il file dei pesi distribuito in precedenza. Il modello viene preparato per l'estrazione delle feature rimuovendo l'ultimo layer di classificazione
  • Elaborazione in Batch: la UDF riceve in input un batch di immagini (rappresentate come byte). Per ogni immagine, esegue i passaggi di pre-processing necessari (ridimensionamento, normalizzazione) e la passa al modello per calcolare il vettore di embedding a 2048 dimensioni
  • Calcolo in parallelo: Spark gestisce automaticamente la distribuzione dei dati tra i vari esecutori
# Applica la UDF per calcolare gli embedding sulla colonna 'content'
df_with_emb = df.withColumn("embedding", resnet_udf(col("content")))

Il risultato è un DataFrame Spark con:

  • path: percorso dell'immagine su HDFS

  • embedding: vettore numerico

4. Salvataggio degli Embedding su HDFS (formato Parquet)

Una volta calcolati, gli embedding vengono salvati su HDFS in formato Parquet, un formato colonnare ottimizzato per letture veloci in Spark, indicato per dataset di grandi dimensioni.

df_embeddings.write.mode("overwrite").parquet(
    "hdfs:///user/hadoopuser/flickr_image_embeddings_parquet/"
)

5. Caricamento su Milvus

L'ultimo passaggio consiste nel caricare i dati ottenuti (percorso dell'immagine e relativo embedding) nel database vettoriale Milvus. Anche questa operazione viene parallelizzata per massimizzare l'efficienza e consiste nei seguenti passaggi:

  1. Creazione della Collezione: Dal nodo driver, viene stabilita una connessione a Milvus per creare una collezione (l'equivalente di una tabella in SQL) con uno schema ben definito: un ID univoco, il percorso dell'immagine e l'embedding

    import pymilvus
    
    connections.connect("default", host=MILVUS_HOST, port=MILVUS_PORT)
    # Definisce lo schema della collezione
    fields = [
        FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=False),
        FieldSchema(name="path", dtype=DataType.VARCHAR, max_length=65535), 
        FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=DIMENSION)
    ]
    schema = CollectionSchema(fields, description="Image embeddings generated with Spark")
    # Crea la collezione
    collection = Collection(name=COLLECTION_NAME, schema=schema)
  2. Caricamento in Parallelo (foreachPartition): Ogni worker stabilisce la propria connessione a Milvus e inserisce autonomamente il proprio sottoinsieme di dati. Questo approccio distribuisce il carico di rete e di scrittura, accelerando il caricamento

    df_with_ids = df_from_parquet.withColumn("id", monotonically_increasing_id())
    df_with_ids.foreachPartition(upload_partition_to_milvus)

    ciascun worker eseguirà la funzione upload_partition_to_milvus al cui interno caricherà la propria partizione nel database con:

    collection.insert(data_to_insert)
  3. Creazione dell'indice e caricamento in memoria: Una volta che tutti i dati sono stati inseriti, dal driver vengono inviati a Milvus i comandi finali:

    # Per assicurare che tutti i dati siano stati scritti su disco:
    collection.flush()
    # Definisce i parametri per l'indice
    index_params = {
        "metric_type": "COSINE",       # Metrica di distanza (cosine)
        "index_type": "IVF_FLAT",  
        "params": {"nlist": 1024}  
    }
    collection.create_index(field_name="embedding", index_params=index_params)
    # Carica la collezione in memoria per renderla disponibile alle ricerche
    collection.load()

    La metrica di distanza utilizzata per calcolare le similarità tra immagini è COSINE.

Applicazione web per la ricerca delle immagini

L'interfaccia utente per la ricerca delle immagini è un'applicazione web costruita con FastAPI come backend e un frontend basato su un singolo file HTML. Il codice è disponibile nella cartella /ServerFastAPI e comprende:

  • Frontend(index.html): è realizzata con HTML, CSS e JavaScript e permette di caricare un'immagine tramite click o trascinamento, selezionare il numero di risultati desiderati e visualizzare le immagini simili restituite dal sistema.

  • Backend(main.py) un server web basato su FastAPI che espone le API necessarie per la ricerca. All'avvio, il server si connette a Milvus, carica in memoria la collezione di immagini e inizializza il modello ResNet50.

    Il backend gestisce due endpoint principali:

    • POST /search_similar: riceve l'immagine caricata dall'utente, esegue la ricerca sul database e restituisce i risultati con percorso delle immagini e valore di similarità.

        @app.post("/search_similar")
        async def search_similar_images_api(file: UploadFile = File(...), count: int=6):
            try:
                image_data = await file.read()
                query_img = Image.open(BytesIO(image_data)).convert("RGB")
                # Esegui la ricerca sul db
                results = search_similar_images(query_img, topk=count)
                # restituisce i risultati come JSON
                response_data = []
                for hit in results:
                    response_data.append({
                        "path": hit.entity.get("path"),
                        "similarity": hit.distance
                    })
                return JSONResponse(content=response_data)

      Il codice Python prevede le due seguenti funzioni, per il rispettivo calcolo dell'embedding dell'immagine di input e la ricerca su Milvus di k immagini di output:

      def get_embedding(image: Image.Image) -> np.ndarray:
          """Converte un'immagine in embedding 2048-dim con ResNet50"""
          tensor = transform(image).unsqueeze(0).to(device)
          with torch.no_grad():
              emb = feature_extractor(tensor).cpu().numpy().flatten()
          return emb / np.linalg.norm(emb) 
      
      def search_similar_images(query_img: Image.Image, topk=5):
          """Cerca immagini simili in Milvus data un'immagine di query"""
          emb = get_embedding(query_img)
          results = collection.search(
              data=[emb.tolist()],
              anns_field="embedding",
              param={"metric_type": "COSINE", "params": {"nprobe": 10}},
              limit=topk,
              output_fields=["path"]
          )
          return results[0]
    • GET /image/: recupera e trasmette un file immagine direttamente da HDFS, dato il suo percorso. Utilizzato per trasmettere le immagini di output al frontend.

Come Eseguire il Progetto

Di seguito sono illustrati i vari passaggi necessari per eseguire la ricerca per similarità delle immagini. I passaggi vanno eseguiti tutti nel nodo master.

  1. Come primo passaggio, avviare HDFS, Yarn, Spark e Milvus:

    start.dfs.sh
    start-yarn.sh
    start-all.sh
    cd ~/milvus
    docker compose up -d
  2. Attivare il Virtual Environment Python e avviare pyspark e Jupyter

    source pytorch_env/bin/activate
    pyspark
  3. Aprire Jupyter Notebook nel browser ed eseguire tutte le celle del notebook image_embedding_spark.ipynb

    NOTA: l'indirizzo del server Jupyter è quello inserito in fase di Configurazione completa variabili d'ambiente e corrisponde all'indirizzo ip del master alla porta 8889, per questo progetto l'indirizzo è: 192.168.100.4:8889

  4. Avviare il server FastAPI

    uvicorn main:app --reload --host 192.168.100.4
  5. Collegarsi a 192.168.100.4:8000 (è possibile collegarsi sia dal master che dal worker)

    landing page

  6. Caricare un'immagine e selezionare il numero di immagini da mostrare in output

  7. Visualizzare i risultati

    landing page

NOTA: il passaggio 2 viene eseguito solo la prima volta, perché successivamente gli embedding delle immagini del dataset saranno già presenti nel database Milvus e pronti per la ricerca.

Risorse Utili

About

A simple example of distributed system made with Hadoop-Spark technologies that enables clients to search similar images using a Vector Database

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors