Questo progetto realizza un motore di ricerca per immagini su un database distribuito, sfruttando tecniche di image embedding e un database vettoriale per l’indicizzazione e la ricerca per similarità.
L’obiettivo è fornire un’interfaccia semplice che permetta all’utente di caricare un’immagine e ottenere come risultato le k immagini più simili. Un sistema di questo tipo può risultare utile in molti ambiti, come ad esempio:
- E-commerce: ricerca di prodotti visivamente simili
- Copyright: individuazione di utilizzi non autorizzati di un’immagine
- Social media: suggerimento di contenuti coerenti a quelli già apprezzati dall’utente
- Arte e creatività: esplorazione di riferimenti visivi e fonti di ispirazione
Nello specifico, sono stati calcolati gli embedding, delle rappresentazioni vettoriali dense delle immagini provenienti da un dataset di dimensione circa 30000, tali per cui a immagini simili corrispondono vettori vicini. Una volta ricevuta un'immagine di query, viene calcolato il suo embedding e si ricercano le immagini con embedding più vicino, secondo la metrica coseno.
Il workflow appena descritto è sintetizzato dalla seguente immagine:
Fonte: Hugging FacePer rendere il sistema scalabile ed efficiente, è stato utilizzato un cluster Hadoop con due nodi e Spark per il calcolo distribuito degli embedding. Le immagini sono state caricate su HDFS e gli embedding sul database vettoriale Milvus.
- Hadoop su cluster a 2 VM (1 master
namenodee 1 workerdatanode1) - Spark 3.5.6 per il calcolo distribuito
- PyTorch per il calcolo degli embedding delle immagini
- Milvus come database vettoriale per l’indicizzazione e la ricerca
- FastAPI come backend per l’esposizione delle API di ricerca
Di seguito sono descritti tutti i passaggi necessari alla configurazione delle due macchine per operare con le tecnologie sopra citate.
Come cluster Hadoop è stato realizzato un cluster minimale costituito da 2 VM VirtualBox:
- 1 VM master chiamata
namenode, che svolge sia il ruolo di NameNode che di DataNode. (NOTA: in un cluster reale è consigliato mantenere il namenode e i datanodes su macchine distinte per una maggiore stabilità e resilienza del cluster) - 1 VM worker chiamata
datanode1, che svolge il ruolo di DataNode
Per l'installazione di Hadoop è stata seguita la seguente guida che spiega passo-passo l'installazione e la configurazione su 3 VM (1 NameNode e 2 DataNode) adattandola al caso specifico di 2 VM.
Di seguito sono descritti i passaggi chiave per l'installazione del cluster e, in particolare, sono riportati i comandi diversi rispetto alla guida:
- Creare in VirtualBox una nuova rete con NAT, a cui saranno connesse le 2 VM. Per questo esempio è stata creata una rete privata con indirizzo 192.168.100.1/24
-
Creare inizialmente 1 VM con una distribuzione Linux leggera basata su Ubuntu, ad esempio Lubuntu 24.04
Si consiglia di assegnare alla VM che fungerà il ruolo di Master un minimo di 8 GB di RAM e 2 vCPU
-
Installare e configurare SSH, necessario per la comunicazione tra i nodi del cluster
-
Installare Java openjdk versione 11 (al posto della versione 8)
sudo apt install openjdk-11-jdk
-
Scaricare ed estrarre Hadoop versione 3.4.1 da apache.org
# download archivio nella home sudo wget -P ~ https://dlcdn.apache.org/hadoop/common/hadoop-3.4.1/hadoop-3.4.1.tar.gz # estrarre e rinominare l\'archivio tar xzf hadoop-3.4.1.tar.gz mv hadoop-3.4.1 hadoop # spostare la cartella hadoop in usr/local/ sudo mv hadoop /usr/local/hadoop
modificare il file di configurazione
hadoop_env.shnano ~/hadoop/etc/hadoop/hadoop-env.sh # aggiungere in fondo al file il riferimento a JAVA_HOME export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64/
-
Modificare variabili di ambiente per hadoop
#modificare le variabili di sistema con il comando sudo nano /etc/environment #aggiungere al file i seguenti valori PATH="/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/usr/local/hadoop/bin:/usr/local/hadoop/sbin" JAVA_HOME="/usr/lib/jvm/java-11-openjdk-amd64/"
-
creare un nuovo utente
hadoopusernel sistema, che verrà usato dai nodi del cluster, e fornirgli tutti i permessi di rootNOTA: Da questo punto tutti i passaggi seguenti vanno eseguiti accedendo con il nuovo utente
hadoopuser
-
Nell'ambiente VirtualBox clonare la VM appena creata per instanziare la VM worker
DatanodeSi consiglia di assegnare alla VM che svolgerà il ruolo di Worker un minimo di 6 GB di RAM e 2 vCPU
-
Modificare gli hostname di entrambe le VM usando il comando
sudo nano /etc/hostname
in questo caso, la VM Master è stata chiamata
namenode, mentre la VM worker è stata chiamatadatanode1 -
Modificare in entrambe le VM il file /etc/hosts che associa gli indirizzi ip delle macchine con il loro hostname
# ottenere indirizzo ip associato a entrambe le VM ip addr #modificare il file hosts in ciascuna VM sudo nano /etc/hosts
ad esempio, il file hosts in questo caso è il seguente (da adattare alla configurazione specifica del cluster):
192.168.100.4 namenode 192.168.100.5 datanode1
-
In ciascuna VM creare la chiave SSH e condividerla con l'altra VM
-
Configurare la porta di Hadoop modificando il file core-site.xml
sudo nano /usr/local/hadoop/etc/hadoop/core-site.xml # modificare il file # aggiungendo la seguente <property> nel segmento <configuration> <property> <name>fs.defaultFS</name> <value>hdfs://namenode:9000</value> </property>
-
Configurare HDFS
sudo nano /usr/local/hadoop/etc/hadoop/hdfs-site.xml # modificare il file # aggiungendo le seguenti <property> nel segmento <configuration> <property> <name>dfs.namenode.name.dir</name> <value>/usr/local/hadoop/data/nameNode</value> </property> <property> <name>dfs.datanode.data.dir</name> <value>/usr/local/hadoop/data/dataNode</value> </property> <property> <name>dfs.replication</name> <value>2</value> </property>
-
Definire i nodi worker del cluster nel file /hadoop/workers, in questa configurazione entrambi i nodi svolgono il ruolo di worker
sudo nano /usr/local/hadoop/etc/hadoop/workers # all'interno del file scrivere gli hostname dei worker namenode datanode1 -
copiare i file di configurazione di hadoop nel nodo worker
datanode1scp /usr/local/hadoop/etc/hadoop/* datanode1:/usr/local/hadoop/etc/hadoop/ -
salvare le configurazioni e formattare per poi avviare HDFS
source /etc/environment hdfs namenode -format start-dfs.sh
-
configurare Yarn, il Resource Manager per il cluster Hadoop, su entrambi i nodi:
sudo nano /usr/local/hadoop/etc/hadoop/yarn-site.xml
-
Configurazione per il nodo master
namenode# aggiungere le seguenti <property> nel segmento <configuration> <property> <name>yarn.resourcemanager.hostname</name> <value>namenode</value> </property> <property> <name>yarn.scheduler.maximum-allocation-mb</name> <value>6144</value> </property> <property> <name>yarn.scheduler.maximum-allocation-vcores</name> <value>2</value> </property> <property> <name>yarn.nodemanager.resource.memory-mb</name> <value>2048</value> </property> <property> <name>yarn.nodemanager.resource.cpu-vcores</name> <value>1</value> </property>
-
Configurazione per il nodo worker
datanode1# aggiungere le seguenti <property> nel segmento <configuration> <property> <name>yarn.resourcemanager.hostname</name> <value>namenode</value> </property> <property> <name>yarn.nodemanager.resource.memory-mb</name> <value>4096</value> </property> <property> <name>yarn.nodemanager.resource.cpu-vcores</name> <value>2</value> </property>
NOTA: adattare i valori delle proprietà alle risorse disponibili nel proprio cluster. In particolare le proprietà yarn.scheduler definiscono le risorse massime allocabili da Yarn per l'intero cluster, mentre le proprietà yarn.nodemanager.resource definiscono le risorse allocabili per il singolo nodo. Per questa configurazione il nodo namenode funge sia da master che da worker Yarn quindi ha indicate entrambe le proprietà, mentre nel nodo worker sono indicate solo le proprietà yarn.nodemanager.resource
-
Per avviare Yarn eseguire dal nodo master il comando:
# avviare Yarn start-yarn.sh
Fare riferimento alla seguente guida
-
Scaricare l'archivio di Spark dal sito di apache.org nella versione 3.5.6
wget https://dlcdn.apache.org/spark/spark-3.5.6/spark-3.5.6-bin-hadoop3.tgz
-
Creare una directory dedicata dove estrarre il file tar
mkdir ~/spark mv spark-3.5.6-bin-hadoop3.tgz spark/ cd ~/spark tar -xvzf spark-3.5.6-bin-hadoop3.tgz
-
Configurare il file
spark-env.shpresente nella cartella conf di spark nel nodomastercd ~/spark/spark-3.5.6-bin-hadoop3/conf cp spark-env.sh.template spark-env.sh sudo nano spark-env.sh # aggiungere le seguenti variabili # modificare l'indirizzo ip con quello del nodo master export SPARK_MASTER_HOST=192.168.0.4 export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
-
Creare il file
slavessul nodomastere inserire i nomi dei nodi esecutori di Spark, in questo configurazione a 2 VM sononamenodeedatanode1che eseguiranno entrambi i job Spark:namenode datanode1
-
Per avviare master e slave di Spark usare il comando:
start-all.sh
-
Creare il Virtual Environment con Python 3.11 su entrambi i nodi
MastereWorkerNOTA: Python 3.12 ha problemi di compatibilità con Spark e Pyarrow
sudo apt install python3.11 python3.11-venv python3.11 -m venv pytorch_env source pytorch_env/bin/activate -
Installare i pacchetti python:
# su entrambi i nodi pip install pyspark==3.5.6 pip install pyarrow==12.0.1 pip install numpy==1.26.4 pandas pip install torch torchvision --index-url https://download.pytorch.org/whl/cpu pip install pymilvus # solo sul master pip install notebook pip install jupyter
-
Modificare il file ~/.bashrc aggiungendo tutte le variabili d'ambiente necessarie per Hadoop, Spark e Jupyter
# aprire il file bashrc nano ~/.bashrc
-
Aggiungere in fondo al file bashrc del nodo
Masterle seguenti variabili, adattandole alla propria configurazione:# variabili per hadoop e yarn export HADOOP_HOME="/usr/local/hadoop" export HADOOP_COMMON_HOME=$HADOOP_HOME export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop export HADOOP_HDFS_HOME=$HADOOP_HOME export HADOOP_MAPRED_HOME=$HADOOP_HOME export HADOOP_YARN_HOME=$HADOOP_HOME # variabili per spark e pyspark # modificare i percorsi alle cartelle se differenti export SPARK_HOME=~/spark/spark-3.5.6-bin-hadoop3 export PATH=$SPARK_HOME/bin:$SPARK_HOME/sbin:$PATH export PYSPARK_PYTHON=~/pytorch_env/bin/python3 export PYSPARK_DRIVER_PYTHON=jupyter # modificare indirizzo ip del server jupyter con quello del nodo master export PYSPARK_DRIVER_PYTHON_OPTS='notebook --ip=192.168.100.4 --no-browser --port=8889' export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64 export CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath --glob):$HADOOP_CONF_DIR
-
Aggiungere in fondo al file bashrc del nodo
Workerle seguenti variabili:# variabili per spark e pyspark # modificare i percorsi alle cartelle se differenti export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64 export SPARK_HOME=~/spark/spark-3.5.6-bin-hadoop3 export PATH=$SPARK_HOME/bin:$SPARK_HOME/sbin:$PATH export PYSPARK_PYTHON=~/pytorch_env/bin/python3
-
Aggiornare le variabili su entrambi i nodi con il comando:
source ~/.bashrc
La versione utilizzata per questo progetto è Milus Standalone tramite Docker Compose, che utilizza 3 container attivi sul nodo master:
milvus-standalone, che contiene le funzionalità principali di Milvusetcd, utilizzato per memorizzare metadati usati dagli altri componenti di Milvusminio, un database ad oggetti usato per memorizzare i dati in modo persistente
sudo apt-get update
sudo apt-get install ca-certificates curl gnupg lsb-release
sudo mkdir -p /etc/apt/keyrings
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /etc/apt/keyrings/docker.gpg
echo \ "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.gpg] https://download.docker.com/linux/ubuntu \
(lsb_release -cs) stable" | sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
sudo apt-get install docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin- Per avviare il servizio:
sudo systemctl start docker
# Abilita l'avvio di Docker all'avvio del sistema
sudo systemctl enable docker
# Verifica che il servizio Docker sia in esecuzione
sudo systemctl status docker
# Verifica l’istallazione di docker con l’immagine hello world:
sudo docker run hello-world
# Gestire Docker come utente non root: creazione del gruppo docker
sudo groupadd docker
# Aggiungi l’user al gruppo
sudo usermod -aG docker $USER
newgrp docker
# Verifica: esegui hello world senza sudo:
docker run hello-world
# Verifica l’istallazione del plugin docker compose
docker compose versionI seguenti comandi fanno riferimento alla guida per la configurazione di Milvus Standalone
# creare una cartella apposita per milvus
mkdir -p ~/milvus
cd ~/milvus
# scaricare il file docker dal repository di milvus
wget https://github.com/milvus-io/milvus/releases/download/v2.6.0/milvus-standalone-docker-compose.yml -O docker-compose.ymlIl file docker-compose.yml è stato modificato rispetto a quello predefinito, in modo da adattare i limiti di risorse utilizzate dai container in base a quelle del nodo master. È stata inoltre modificata la porta su cui è in ascolto
minio, poichè già in uso da Hadoop.
-
Per avviare i container Milvus eseguire il seguente comando:
docker compose up -d # per verificare lo stato dei container docker compose ps -
Per terminare e chiudere i container:
docker compose down
-
Il Backend API è realizzato in python con FastAPI ed eseguito in un web server Uvicorn
-
All'interno del virtual environment python sul nodo
Masterinstallare i seguenti pacchetti:pip install fastapi "uvicorn[standard]" python-multipart -
Copiare i file necessari per il funzionamento del Server FastAPI presenti nella cartella /ServerFastAPI nel nodo master. In particolare:
- il file main.py per il backend deve essere copiato nella directory home
- i file index.html e style.css per il frontend devono essere copiati all'interno di una cartella /static in home
-
Per avviare il server basta portarsi nella directory home dove si trova il file main.py ed eseguire il comando:
uvicorn main:app --reload
-
Il codice per il funzionamento del server è descritto in dettaglio nella sezione Applicazione web per la ricerca delle immagini
Il progetto utilizza il dataset Flickr30k Images, che contiene circa 30000 immagini reali provenienti da Flickr.
Dopo il download del dataset è necessario il suo caricamento su HDFS, attraverso il comando
hadoop distcp file:///home/hadoopuser/flickr30k_images hdfs:///user/hadoopuser/flickr30k_imagesIl notebook Jupyter image_embedding_spark.ipynb ha il compito di leggere le immagini dal cluster HDFS, calcolare i vettori di embedding per ciascuna di esse in modo distribuito utilizzando Spark e, infine, indicizzare questi vettori nel database Milvus per renderli ricercabili.
Di seguito sono illustrati i passaggi chiave del processo.
Per il calcolo degli embedding dalle immagini è stato utilizzato il modello ResNet50, una rete convoluzionale molto utilizzata per questa tipologia di task. Il modello pre-addestrato è disponibile su torchvision come torchvision.models.resnet50 con pesi ResNet50_Weights.DEFAULT.
- I pesi del modello vengono salvati e distribuiti agli esecutori Spark
Il salvataggio dei pesi del modello in un file nello SparkContext permette di distribuire in modo efficiente il modello a tutti gli esecutori che ne avranno bisogno per il calcolo.
import torch import torchvision.models as models # Carica il modello sul nodo driver weights = models.ResNet50_Weights.DEFAULT model = models.resnet50(weights=weights) state_dict = model.state_dict() # Salva i pesi e li rende disponibili a tutti i nodi del cluster torch.save(state_dict, "/tmp/resnet50_statedict2.pth") sc.addFile("/tmp/resnet50_statedict2.pth")
Le immagini vengono caricate da HDFS come file binari in un DataFrame Spark:
df = spark.read.format("binaryFile").load("hdfs:///user/hadoopuser/flickr30k_images/flickr30k_images/").select("path", "content")Questo è il passaggio computazionalmente più oneroso, per cui l'inferenza del modello viene eseguita in parallelo su tutti i nodi worker utilizzando una User Defined Function (UDF) ottimizzata per l'elaborazione in batch (predict_batch_udf). La logica è la seguente:
- Inizializzazione per Worker: su ciascun esecutore Spark, una funzione (make_resnet_fn) carica il modello ResNet50 leggendo il file dei pesi distribuito in precedenza. Il modello viene preparato per l'estrazione delle feature rimuovendo l'ultimo layer di classificazione
- Elaborazione in Batch: la UDF riceve in input un batch di immagini (rappresentate come byte). Per ogni immagine, esegue i passaggi di pre-processing necessari (ridimensionamento, normalizzazione) e la passa al modello per calcolare il vettore di embedding a 2048 dimensioni
- Calcolo in parallelo: Spark gestisce automaticamente la distribuzione dei dati tra i vari esecutori
# Applica la UDF per calcolare gli embedding sulla colonna 'content'
df_with_emb = df.withColumn("embedding", resnet_udf(col("content")))Il risultato è un DataFrame Spark con:
-
path: percorso dell'immagine su HDFS
-
embedding: vettore numerico
Una volta calcolati, gli embedding vengono salvati su HDFS in formato Parquet, un formato colonnare ottimizzato per letture veloci in Spark, indicato per dataset di grandi dimensioni.
df_embeddings.write.mode("overwrite").parquet(
"hdfs:///user/hadoopuser/flickr_image_embeddings_parquet/"
)L'ultimo passaggio consiste nel caricare i dati ottenuti (percorso dell'immagine e relativo embedding) nel database vettoriale Milvus. Anche questa operazione viene parallelizzata per massimizzare l'efficienza e consiste nei seguenti passaggi:
-
Creazione della Collezione: Dal nodo driver, viene stabilita una connessione a Milvus per creare una collezione (l'equivalente di una tabella in SQL) con uno schema ben definito: un ID univoco, il percorso dell'immagine e l'embedding
import pymilvus connections.connect("default", host=MILVUS_HOST, port=MILVUS_PORT) # Definisce lo schema della collezione fields = [ FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=False), FieldSchema(name="path", dtype=DataType.VARCHAR, max_length=65535), FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=DIMENSION) ] schema = CollectionSchema(fields, description="Image embeddings generated with Spark") # Crea la collezione collection = Collection(name=COLLECTION_NAME, schema=schema)
-
Caricamento in Parallelo (
foreachPartition): Ogni worker stabilisce la propria connessione a Milvus e inserisce autonomamente il proprio sottoinsieme di dati. Questo approccio distribuisce il carico di rete e di scrittura, accelerando il caricamentodf_with_ids = df_from_parquet.withColumn("id", monotonically_increasing_id()) df_with_ids.foreachPartition(upload_partition_to_milvus)
ciascun worker eseguirà la funzione upload_partition_to_milvus al cui interno caricherà la propria partizione nel database con:
collection.insert(data_to_insert)
-
Creazione dell'indice e caricamento in memoria: Una volta che tutti i dati sono stati inseriti, dal driver vengono inviati a Milvus i comandi finali:
# Per assicurare che tutti i dati siano stati scritti su disco: collection.flush() # Definisce i parametri per l'indice index_params = { "metric_type": "COSINE", # Metrica di distanza (cosine) "index_type": "IVF_FLAT", "params": {"nlist": 1024} } collection.create_index(field_name="embedding", index_params=index_params) # Carica la collezione in memoria per renderla disponibile alle ricerche collection.load()
La metrica di distanza utilizzata per calcolare le similarità tra immagini è
COSINE.
L'interfaccia utente per la ricerca delle immagini è un'applicazione web costruita con FastAPI come backend e un frontend basato su un singolo file HTML. Il codice è disponibile nella cartella /ServerFastAPI e comprende:
-
Frontend(index.html): è realizzata con HTML, CSS e JavaScript e permette di caricare un'immagine tramite click o trascinamento, selezionare il numero di risultati desiderati e visualizzare le immagini simili restituite dal sistema.
-
Backend(main.py) un server web basato su FastAPI che espone le API necessarie per la ricerca. All'avvio, il server si connette a Milvus, carica in memoria la collezione di immagini e inizializza il modello ResNet50.
Il backend gestisce due endpoint principali:
-
POST /search_similar: riceve l'immagine caricata dall'utente, esegue la ricerca sul database e restituisce i risultati con percorso delle immagini e valore di similarità.@app.post("/search_similar") async def search_similar_images_api(file: UploadFile = File(...), count: int=6): try: image_data = await file.read() query_img = Image.open(BytesIO(image_data)).convert("RGB") # Esegui la ricerca sul db results = search_similar_images(query_img, topk=count) # restituisce i risultati come JSON response_data = [] for hit in results: response_data.append({ "path": hit.entity.get("path"), "similarity": hit.distance }) return JSONResponse(content=response_data)
Il codice Python prevede le due seguenti funzioni, per il rispettivo calcolo dell'embedding dell'immagine di input e la ricerca su Milvus di k immagini di output:
def get_embedding(image: Image.Image) -> np.ndarray: """Converte un'immagine in embedding 2048-dim con ResNet50""" tensor = transform(image).unsqueeze(0).to(device) with torch.no_grad(): emb = feature_extractor(tensor).cpu().numpy().flatten() return emb / np.linalg.norm(emb) def search_similar_images(query_img: Image.Image, topk=5): """Cerca immagini simili in Milvus data un'immagine di query""" emb = get_embedding(query_img) results = collection.search( data=[emb.tolist()], anns_field="embedding", param={"metric_type": "COSINE", "params": {"nprobe": 10}}, limit=topk, output_fields=["path"] ) return results[0]
-
GET /image/: recupera e trasmette un file immagine direttamente da HDFS, dato il suo percorso. Utilizzato per trasmettere le immagini di output al frontend.
-
Di seguito sono illustrati i vari passaggi necessari per eseguire la ricerca per similarità delle immagini.
I passaggi vanno eseguiti tutti nel nodo master.
-
Come primo passaggio, avviare HDFS, Yarn, Spark e Milvus:
start.dfs.sh start-yarn.sh start-all.sh cd ~/milvus docker compose up -d
-
Attivare il Virtual Environment Python e avviare pyspark e Jupyter
source pytorch_env/bin/activate pyspark -
Aprire Jupyter Notebook nel browser ed eseguire tutte le celle del notebook image_embedding_spark.ipynb
NOTA: l'indirizzo del server Jupyter è quello inserito in fase di Configurazione completa variabili d'ambiente e corrisponde all'indirizzo ip del master alla porta 8889, per questo progetto l'indirizzo è: 192.168.100.4:8889
-
Avviare il server FastAPI
uvicorn main:app --reload --host 192.168.100.4
-
Collegarsi a 192.168.100.4:8000 (è possibile collegarsi sia dal master che dal worker)
-
Caricare un'immagine e selezionare il numero di immagini da mostrare in output
-
Visualizzare i risultati
NOTA: il passaggio 2 viene eseguito solo la prima volta, perché successivamente gli embedding delle immagini del dataset saranno già presenti nel database Milvus e pronti per la ricerca.


