Skip to content

A Lambda-architecture pipeline that ingests the CMU ARCTIC speech dataset, extracts multi‑modal features (text, phonemes, pitch, MFCC), detects sustainability keywords, streams Avro‑serialized utterances to Kafka, processes real‑time and batch analytics with Spark, and stores results in Cassandra for a live dashboard

Notifications You must be signed in to change notification settings

AyoubToueti/lambda-speech-analytics

Repository files navigation

Speech Analytics Lambda Architecture

Multi-Modal Data Processing for Sustainable Development

Kafka Spark Cassandra Spring Boot


📋 Project Overview

This project implements a Lambda Architecture for processing multi-modal speech data from the CMU Arctic dataset, framed as a Voice-Enabled Environmental Reporting System aligned with UN Sustainable Development Goals (particularly SDG 10: Reduced Inequalities).

Key Features

Kafka in KRaft mode (no Zookeeper dependency)
Spark Structured Streaming (modern API, not DStream)
Avro serialization with Schema Registry
Speed Layer: Real-time processing → Cassandra
Batch Layer: Historical data → HDFS → Aggregated analytics
Real-time Dashboard: WebSocket-based visualization
Multi-modal features: Text, audio (phonemes, pitch, MFCC)
Sustainability keyword extraction


Core Infrastructure (Modern Stack)

  • Kafka 3.6.0 in KRaft mode (NO Zookeeper!)
  • Schema Registry 7.5.0 for Avro schema management
  • Apache Spark 3.5.0 with Structured Streaming (NO DStream!)
  • Cassandra 4.1 for speed layer storage
  • Hadoop HDFS 3.2.1 for batch layer storage
  • Spring Boot 3.2.0 for real-time dashboard

Applications Developed

  1. Kafka Producer (kafka-producer/)

    • Parses CMU Arctic dataset (1,132 utterances)
    • Extracts multi-modal features (text, phonemes, pitch, MFCC)
    • Detects sustainability keywords
    • Serializes to Avro format
    • Streams continuously with 2.5s delay
  2. Spark Streaming Processor (spark-processor/)

    • Speed Layer: Real-time processing → Cassandra
    • Batch Layer: Historical data → HDFS (Parquet)
    • Processes 10-second micro-batches
    • Deserializes Avro from Kafka
  3. Batch Processing Job (spark-processor/)

    • Reads HDFS Parquet files
    • Computes aggregate statistics
    • Generates keyword frequencies
    • Duration distributions
    • Writes to Cassandra
  4. Real-Time Dashboard (dashboard/)

    • Spring Boot 3.2 with WebSocket support
    • Live visualization with Chart.js
    • Displays:
      • Duration distribution
      • Speaking rate over time
      • Pitch statistics
      • Sustainability keyword cloud
      • Latest utterances

🏗️ Architecture Diagram

%%{init: {"theme":"default"}}%%
graph TD
    subgraph DATA_SOURCE[" DATA SOURCE"]
        DS[CMU Arctic Dataset<br/>Speech Data]
        DSF["• lab/ (phonemes & timing)<br/>• wav/ (audio files)<br/>• pm/ (pitch marks)<br/>• mcep/ (MFCC features)<br/>• etc/ (transcriptions)"]
    end

    subgraph KAFKA_PRODUCER["KAFKA PRODUCER"]
        KP["• Parses multi-modal features<br/>• Extracts sustainability keywords<br/>• Serializes to Avro format<br/>• Streams continuously (2.5s delay)"]
    end

    subgraph KAFKA[" KAFKA (KRaft Mode)"]
        KT[Topic: speech-utterances]
        SR[Schema Registry<br/>Port 8081]
        AVRO["Avro Schemas<br/>• SpeechUtterance<br/>• AggregatedMetrics"]
        SR -.->|manages| AVRO
    end

    subgraph SPARK["SPARK STRUCTURED STREAMING"]
        SS["readStream()<br/>.format('kafka')<br/>.load()<br/>from_avro() deserialization"]
        SPEED[Speed Layer]
        BATCH[Batch Layer]
        SS --> SPEED
        SS --> BATCH
    end

    subgraph CASSANDRA_SPEED[" CASSANDRA (Speed Layer)"]
        CS1[utterances]
        CS2[speech_metrics]
    end

    subgraph HDFS_LAYER[" HDFS (Batch Layer)"]
        HDFS["Parquet Files<br/>Partitioned by speaker<br/>All features preserved"]
    end

    subgraph BATCH_JOB[" Batch Processing Job"]
        BJ["• Compute aggregates<br/>• Keyword frequency<br/>• Duration distribution<br/>• Phoneme statistics"]
    end

    subgraph CASSANDRA_BATCH[" CASSANDRA (Batch Layer)"]
        CB1[aggregated_stats]
        CB2[sustainability_keywords]
        CB3[phoneme_stats]
        CB4[phoneme_durations]
        CB5[speaker_analytics]
        CB6[speaker_metrics]
    end

    subgraph DASHBOARD[" SPRING BOOT DASHBOARD"]
        SB["• WebSocket (STOMP)<br/>• Real-time charts<br/>• Keyword cloud<br/>• Metrics gauges"]
    end

    subgraph WEB_UI[" WEB UI (Port 8090)"]
        UI["Chart.js + WebSocket"]
    end

    DS --> KP
    KP --> KT
    KT --> SS
    SPEED --> CS1
    SPEED --> CS2
    BATCH --> HDFS
    HDFS --> BJ
    BJ --> CB1
    BJ --> CB2
    BJ --> CB3
    BJ --> CB4
    BJ --> CB5
    BJ --> CB6
    CS1 --> SB
    CS2 --> SB
    CB1 --> SB
    CB2 --> SB
    CB3 --> SB
    CB4 --> SB
    CB5 --> SB
    CB6 --> SB
    SB --> UI

    style DATA_SOURCE fill:#e1f5ff
    style KAFKA_PRODUCER fill:#fff4e1
    style KAFKA fill:#f0e1ff
    style SPARK fill:#ffe1e1
    style CASSANDRA_SPEED fill:#e1ffe1
    style HDFS_LAYER fill:#ffe1f0
    style BATCH_JOB fill:#fff9e1
    style CASSANDRA_BATCH fill:#e1ffe1
    style DASHBOARD fill:#e1f0ff
    style WEB_UI fill:#f0ffe1
Loading

📊 Data Description

CMU Arctic Dataset

The CMU ARCTIC databases are phonetically balanced US English single-speaker databases designed for speech synthesis research. We use the CLB (female speaker) variant containing ~1,132 utterances.

Data Processing

  • Multi-Modal Features: Text, phonemes (timing), pitch (Hz), MFCC (13 coefficients)
  • Sustainability Context: Keyword extraction (climate, energy, nature, pollution, etc.)
  • Schema Evolution: Avro schemas with backward compatibility
  • Data Volume: ~1,132 utterances, ~197MB dataset

Multi-Modal Features Extracted

Feature Type Source File Description
Text etc/txt.done.data Transcription of utterances
Phonemes lab/*.lab Phoneme labels with precise timing (timestamp, duration)
Pitch pm/*.pm Pitch marks for prosody analysis (mean, std, min, max)
MFCC mcep/*.mcep Mel-frequency cepstral coefficients (audio features)
Audio wav/*.wav WAV files (not streamed, referenced by path)

Sustainability Context

To align with the Sustainable Development theme, we frame this system as an accessible environmental communication platform:

  • Sustainability Keyword Detection: Extracts environmental terms (nature, climate, energy, water, pollution, etc.) from transcriptions
  • SDG 10 Alignment: Voice-enabled interfaces reduce inequalities in accessing environmental data
  • Use Case: Processing spoken environmental reports, climate announcements, sustainability audiobooks

Sample Data

{
  "utteranceId": "arctic_a0518",
  "text": "Illuminating oil was becoming all profit.",
  "speaker": "CLB",
  "durationSeconds": 3.45,
  "numPhonemes": 42,
  "speakingRate": 12.17,
  "sustainabilityKeywords": ["oil", "profit"],
  "pitchStats": {
    "mean": 187.5,
    "std": 25.3,
    "min": 150.2,
    "max": 220.8,
    "numPitchMarks": 345
  }
}

🚀 Quick Start

Prerequisites

  • Docker & Docker Compose
  • Java 11+ (for Maven builds)
  • Maven 3.6+
  • 8GB RAM minimum

Setup and Launch

# Clone the repository
git clone <your-repo>
cd lambda-speech-analytics

# Start all services
./scripts/start.sh

The script will:

  1. Start Docker infrastructure (Kafka, Cassandra, HDFS, Spark)
  2. Create Cassandra schema
  3. Build Maven projects
  4. Start Kafka producer
  5. Submit Spark Streaming job

Manual Setup

# 1. Start Docker services
docker-compose up -d

# Wait for services to be healthy
sleep 30

# 2. Create Cassandra schema
docker exec -i cassandra cqlsh -f /schema.cql

# 3. Build projects
cd kafka-producer && mvn clean package -DskipTests && cd ..
cd spark-processor && mvn clean package -DskipTests && cd ..
docker cp ./spark-processor/target spark-master:/app
cd dashboard && mvn clean package -DskipTests && cd ..

# 4. Run Kafka Producer
cd kafka-producer
java -jar target/kafka-producer-1.0.0.jar ../data/cmu_us_clb_arctic &

# 5. Submit Spark job
docker exec spark-master /spark/bin/spark-submit \
  --class com.speech.streaming.SpeechStreamingProcessor \
  --master spark://spark-master:7077 \
  --packages com.datastax.spark:spark-cassandra-connector_2.12:3.4.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0,org.apache.spark:spark-avro_2.12:3.3.0,io.confluent:kafka-avro-serializer:7.5.0 \
  --repositories https://packages.confluent.io/maven/ \
  --conf spark.cassandra.connection.host=cassandra \
  /app/spark-processor-1.0.0.jar 

# 6. Run Dashboard
cd dashboard
java -jar target/dashboard-1.0.0.jar

🖥️ Access Points

Service URL Description
Dashboard http://localhost:8090 Real-time visualization
Spark Master UI http://localhost:8080 Spark cluster monitoring
HDFS NameNode http://localhost:9870 HDFS storage browser
Schema Registry http://localhost:8081 Avro schema management
Cassandra localhost:9042 Database (use cqlsh)

📁 Project Structure Overview

lambda-speech-analytics/          (197MB)
├── docker-compose.yml            7 services (NO Zookeeper!)
├── README.md                     Comprehensive documentation
├── SETUP.md                      Step-by-step setup guide
│
├── kafka-producer/               Modern Kafka Producer
│   ├── pom.xml                  → Kafka Clients 3.6.0
│   └── src/main/
│       ├── avro/                → SpeechUtterance.avsc, AggregatedMetrics.avsc
│       └── java/
│           ├── producer/        → SpeechDataProducer.java
│           └── parser/          → ArcticDataParser.java (multi-modal extraction)
│
├── spark-processor/              Structured Streaming
│   ├── pom.xml                  → Spark 3.5.0, Cassandra Connector
│   └── src/main/java/
│       ├── streaming/           → SpeechStreamingProcessor.java (Speed + Batch)
│       └── batch/               → BatchAggregationJob.java (HDFS → Cassandra)
│
├── dashboard/                    Spring Boot 3.2 + WebSocket
│   ├── pom.xml                  → Spring Boot 3.2.0
│   ├── Dockerfile               → Multi-stage build
│   └── src/main/
│       ├── java/                → DashboardApplication.java
│       └── resources/
│           ├── application.properties
│           └── static/          → index.html (Chart.js + WebSocket)
│
├── docker/
│   └── schema.cql                Cassandra schema (7 tables)
│
├── scripts/
│   ├── start.sh                  One-command startup
│   └── stop.sh                   Graceful shutdown
│
└── data/
    └── cmu_us_clb_arctic/        1,132 speech utterances
        ├── lab/                 → Phoneme timing data
        ├── wav/                 → Audio files
        ├── pm/                  → Pitch marks
        ├── mcep/                → MFCC features
        └── etc/                 → Transcriptions

Total: 5 Java files + Avro schemas + Docker config + Documentation


📈 Main Functionalities

1. Real-Time Speech Processing (Speed Layer)

  • Processes utterances within 10 seconds
  • Writes to Cassandra utterances table
  • Metrics: duration, speaking rate, pitch statistics, sustainability keywords

2. Historical Batch Processing (Batch Layer)

  • Stores all data in HDFS (Parquet format)
  • Batch job computes:
    • Aggregate statistics (avg duration, speaking rate, pitch)
    • Top sustainability keywords
    • Duration distribution
    • Phoneme frequency
  • Updates Cassandra aggregated_stats table

3. Real-Time Dashboard

  • WebSocket-based live updates
  • Visualizations:
    • Duration Gauge: Current utterance duration
    • Speaking Rate Chart: Phonemes per second
    • Keyword Cloud: Sustainability terms
    • Pitch Histogram: Pitch distribution
    • Timeline: Utterances over time

4. Multi-Modal Feature Extraction

  • Text Analysis: Transcription, keyword extraction
  • Phonetic Analysis: 40+ phoneme types with timing
  • Prosodic Analysis: Pitch contours, intonation
  • Acoustic Analysis: MFCC features (13 coefficients)

🌍 Sustainable Development Alignment

SDG 10: Reduced Inequalities

This system demonstrates how voice-enabled interfaces can:

  • Make environmental data accessible to people with visual impairments
  • Enable information access in low-literacy contexts
  • Provide multilingual environmental reporting capabilities

Environmental Use Cases

  1. Spoken Climate Reports: Process audio announcements from meteorological stations
  2. Environmental Audiobooks: Analyze sustainability literature in audio format
  3. Accessibility: Convert written environmental data to speech and track delivery metrics

🔍 Data Flow Example

1. CMU Arctic File: arctic_a0518.lab
   └─> Parser extracts: "Illuminating oil was becoming all profit."
       ├─> 42 phonemes with timing
       ├─> Duration: 3.45s
       ├─> Speaking rate: 12.17 ph/s
       ├─> Pitch: mean=187.5Hz, std=25.3Hz
       └─> Keywords: ["oil", "profit"]

2. Kafka Producer
   └─> Serializes to Avro
   └─> Sends to topic "speech-utterances"

3. Spark Structured Streaming
   ├─> Speed Layer: Writes to Cassandra.utterances (10s latency)
   └─> Batch Layer: Writes to HDFS Parquet (30s batches)

4. Batch Job (periodic)
   └─> Reads HDFS → Computes aggregates → Writes to Cassandra.aggregated_stats

5. Dashboard
   ├─> Queries Cassandra every 10s
   ├─> Pushes via WebSocket
   └─> Browser updates charts in real-time

🖥️ Access Points After Deployment

Service URL Description
Dashboard http://localhost:8090 Real-time visualization
Spark Master http://localhost:8080 Cluster monitoring
HDFS NameNode http://localhost:9870 File system browser
Schema Registry http://localhost:8081/subjects Avro schemas
Cassandra CQL docker exec -it cassandra cqlsh Database queries

🛠️ Technologies Used

Modern Stack (No Legacy Components)

Component Technology Version Role
Message Broker Apache Kafka (KRaft) 3.6.0 Streaming platform
Schema Registry Confluent Schema Registry 7.5.0 Avro schema management
Stream Processing Spark Structured Streaming 3.5.0 Real-time processing
Speed Layer Storage Apache Cassandra 4.1 NoSQL database
Batch Layer Storage Hadoop HDFS 3.2.1 Distributed file system
Serialization Apache Avro 1.11.3 Schema evolution
Dashboard Spring Boot 3.2.0 Web application
WebSocket STOMP - Real-time communication
Visualization Chart.js 4.x Charts and graphs

Key Improvements Over Legacy Systems

OLD: Zookeeper dependency
NEW: Kafka KRaft mode (self-managed metadata)

OLD: DStream API (legacy)
NEW: Structured Streaming (DataFrame/Dataset API)

OLD: Manual JSON serialization
NEW: Avro with Schema Registry (schema evolution)

OLD: Spring Boot 2.x (EOL)
NEW: Spring Boot 3.2 (Jakarta EE, modern)


📝 Configuration

Environment Variables

# Kafka
KAFKA_BOOTSTRAP_SERVERS=localhost:9092
SCHEMA_REGISTRY_URL=http://localhost:8081

# Cassandra
CASSANDRA_HOST=localhost
CASSANDRA_PORT=9042
CASSANDRA_KEYSPACE=speech_analytics

# HDFS
HDFS_NAMENODE=hdfs://localhost:9000

# Spark
SPARK_MASTER=spark://localhost:7077

Customization

Change Streaming Delay

Edit kafka-producer/src/main/java/com/speech/producer/SpeechDataProducer.java:

private static final long DELAY_MS = 2500; // Change to desired delay

Add Custom Keywords

Edit kafka-producer/src/main/java/com/speech/parser/ArcticDataParser.java:

private static final Set<String> SUSTAINABILITY_KEYWORDS = new HashSet<>(Arrays.asList(
    "your", "custom", "keywords"
));

🧪 Testing

Verify Kafka Messages

docker exec -it kafka kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic speech-utterances \
  --from-beginning

Query Cassandra

docker exec -it cassandra cqlsh
cqlsh> USE speech_analytics;
cqlsh:speech_analytics> SELECT * FROM utterances LIMIT 10;

Or simply run

./scripts/query-data.sh

🐛 Troubleshooting

Issue: Kafka not starting

# Check logs
docker logs kafka

# Restart with fresh data
docker-compose down -v
docker-compose up -d

Issue: Spark job fails

# Check Spark logs
docker logs spark-master
docker logs spark-worker

# Verify Cassandra connection
docker exec -it cassandra cqlsh -e "DESCRIBE KEYSPACE speech_analytics;"

Issue: Schema Registry errors

# Verify Schema Registry
curl http://localhost:8081/subjects

# Re-register schemas if needed
curl -X DELETE http://localhost:8081/subjects/speech-utterances-value

👥 Team

  • Ayoub Toueti
  • Khalil Elahyani
  • Aymen Regaeg

About

A Lambda-architecture pipeline that ingests the CMU ARCTIC speech dataset, extracts multi‑modal features (text, phonemes, pitch, MFCC), detects sustainability keywords, streams Avro‑serialized utterances to Kafka, processes real‑time and batch analytics with Spark, and stores results in Cassandra for a live dashboard

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published