Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions compression.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import heapq

class Node:
def __init__(self, char, freq):
self.char = char
self.freq = freq
self.left = None
self.right = None

def __lt__(self, other):
return self.freq < other.freq

def calc_freq(data):
freq_dict = {}
for char in data:
if char not in freq_dict:
freq_dict[char] = 0
freq_dict[char] += 1
return freq_dict

def build_tree(freq_dict):
heap = [Node(char, freq) for char, freq in freq_dict.items()]
heapq.heapify(heap)

while len(heap) > 1:
left_node = heapq.heappop(heap)
right_node = heapq.heappop(heap)
parent_node = Node(None, left_node.freq + right_node.freq)
parent_node.left = left_node
parent_node.right = right_node
heapq.heappush(heap, parent_node)

return heap[0] if heap else None

def generate_codes(node, code, code_dict):
if node is None:
return

if node.char is not None:
code_dict[node.char] = code
return

generate_codes(node.left, code + '0', code_dict)
generate_codes(node.right, code + '1', code_dict)

def compress(data):
freq_dict = calc_freq(data)
tree_root = build_tree(freq_dict)

code_dict = {}
generate_codes(tree_root, '', code_dict)

compressed_data = ''.join([code_dict[char] for char in data])
return compressed_data, tree_root

def decompress(compressed_data, tree_root):
decoded_data = ''
current_node = tree_root

for bit in compressed_data:
if bit == '0':
current_node = current_node.left
else:
current_node = current_node.right

if current_node.char is not None:
decoded_data += current_node.char
current_node = tree_root

return decoded_data
74 changes: 74 additions & 0 deletions test_compression.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import heapq
from typing import Dict, List, Tuple

class Node:
def __init__(self, char: str, freq: int):
self.char = char
self.freq = freq
self.left = None
self.right = None

def __lt__(self, other):
return self.freq < other.freq

def build_tree(data: str) -> Node:
freq: Dict[str, int] = {char: data.count(char) for char in set(data)}
heap: List[Node] = [Node(char, freq[char]) for char in freq]

while len(heap) > 1:
left = heapq.heappop(heap)
right = heapq.heappop(heap)
parent = Node(None, left.freq + right.freq)
parent.left = left
parent.right = right
heapq.heappush(heap, parent)

return heap[0]

def generate_codes(node: Node, code: str, codebook: Dict[str, str]) -> None:
if node is None:
return
if node.char is not None:
codebook[node.char] = code
return

generate_codes(node.left, code + '0', codebook)
generate_codes(node.right, code + '1', codebook)

def compress(data: str) -> Tuple[str, Dict[str, str]]:
tree = build_tree(data)
codebook: Dict[str, str] = {}
generate_codes(tree, '', codebook)

compressed_data = ''.join(codebook[char] for char in data)
return compressed_data, codebook

def decompress(compressed_data: str, codebook: Dict[str, str]) -> str:
node = Node(None, 0)
for char in compressed_data:
node = build_tree_from_code(node, char)

if node.char is not None:
yield node.char
node = Node(None, 0)

def build_tree_from_code(node: Node, code: str) -> Node:
if node is None or node.char is not None:
return Node(None, 0) if code[-1] == '0' else Node(None, 0)

if code[-1] == '0':
node.left = build_tree_from_code(node.left, code[:-1])
else:
node.right = build_tree_from_code(node.right, code[:-1])

return node

def test_compression() -> None:
data = "This is a test for Shannon-Fano compression."
compressed_data, codebook = compress(data)
decompressed_data = ''.join(decompress(compressed_data, codebook))

assert data == decompressed_data

if __name__ == "__main__":
test_compression()