Skip to content

Scalable, sharded storage system that implements distributed systems concepts like consistent hashing, dynamic resharding, and smart client routing

Notifications You must be signed in to change notification settings

ep186282/DistributedKeyValueStore

Repository files navigation

Distributed Key-Value Store (DKVS)

A scalable, sharded storage system that implements core distributed systems concepts including consistent hashing, dynamic resharding, and smart client routing.

The Distributed Key-Value Store partitions data across multiple Java-based storage nodes using a consistent hashing ring. A central Python Cluster Manager orchestrates the topology, while smart Python clients route requests directly to the correct storage node. The system supports live node addition and automatic data migration.

An example of the output:

Output

Architecture

  1. StorageNode.java - High-performance Java server that manages in-memory data storage and handles partition migration
  2. ClusterManager.py - Python-based control plane (orchestrates the hash ring topology and controls data rebalancing)
  3. SmartClient.py - Python library that retrieves topology metadata and routes PUT/GET operations to the responsible node
  4. hasher.py - Shared cryptographic logic ensuring agreement between clients and servers on mappings
  5. WriteAheadLog.java - Durable append-only log for crash recovery that ensures data survives crashes

Key Techniques

Consistent Hashing

The system uses a hash ring to decide which node stores each piece of data:

  • Hash range of 0 to 2^32-1 (using MD5 hashing)
  • Each node is placed on the ring based on a hash of its address (host:port)
  • Each key is assigned to the first node found when moving clockwise around the ring
  • When you add or remove a node, only about 1/N of the keys need to move (where N is the number of nodes)

Dynamic Resharding

You can add new nodes while the system is running:

  • When a new node registers, the Cluster Manager calculates the specific hash range that must be offloaded from an existing successor node
  • Keys are copied to the new node before being removed from the original node, preventing data loss during migration
  • Data is transferred over a separate connection while the node continues handling read requests from clients

Smart Client Routing

To maximize throughput and reduce latency, the system uses a "Smart Client" architecture:

  • Clients cache the hash ring topology locally
  • Operations are routed directly to the specific Java process holding the data
  • Clients can manually query the Manager to update their view of the cluster state using refresh_topology()

Concurrency & Threading

The storage engine uses Java's built-in thread-safe data structures to handle multiple requests simultaneously:

  • Concurrent Reads: Uses ConcurrentHashMap to allow multiple threads to read and write data safely without blocking each other
  • Connection Management: A thread pool handles incoming client connections, automatically scaling up during traffic spikes and scaling down when idle

Building and Running

Prerequisites

  • Java: JDK 11+ (for Storage Nodes)
  • Python: Python 3.6+ (for Manager and Client)

Compilation

Compile the Java Storage Node:

javac StorageNode.java

Execution

Run the full integration test which compiles code, launches the cluster, simulates traffic, and demonstrates live resharding:

python3 run_system.py

Performance

  • Scalability: Linearly scalable storage capacity and throughput by adding new nodes
  • Latency: Single-hop routing ensures minimal network overhead
  • Efficiency: Migration transfers only the specific subset of data required for the new node, preserving network bandwidth

Technical Details

Hash Synchronization

Both Java and Python components must implement identical hashing logic to prevent routing errors. This implementation takes the first 4 bytes from the MD5 hash result and converts them to a number using big-endian byte order, ensuring both languages produce exactly the same hash value.

Migration Protocol

The migration process is strictly ordered to prevent data loss (all components communicate via TCP sockets using a simple line-delimited text protocol):

  1. Manager calculates (start_hash, end_hash] for the new node
  2. Manager sends MIGRATE_RANGE to the successor node
  3. Successor node iterates its store, identifies keys in range, and pushes them to the new node
  4. Successor deletes local data only after receiving explicit acknowledgment from the target node

About

Scalable, sharded storage system that implements distributed systems concepts like consistent hashing, dynamic resharding, and smart client routing

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published