Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 264: Update qviz for multiblock files #437

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
42e11d6
initial commit
jorgeMarin1 Sep 6, 2024
b0fda5d
Merge branch 'Qbeast-io:main' into qviz-bug
jorgeMarin1 Sep 10, 2024
0fe8b08
initial commit
jorgeMarin1 Sep 6, 2024
5fb0956
added process table function using delta tables
jorgeMarin1 Sep 10, 2024
5f60426
create qviz using Delta tables
jorgeMarin1 Sep 10, 2024
0ee8232
added custom table
jorgeMarin1 Sep 10, 2024
30256f8
added custom table
jorgeMarin1 Sep 10, 2024
cf9cfc3
removed ecommerce300k_2019
jorgeMarin1 Sep 10, 2024
cbcbaed
fixed visualization
jorgeMarin1 Sep 12, 2024
92cf732
deleted parquet files and added a new folder for table test
jorgeMarin1 Sep 12, 2024
e9559f7
added comments on the code
jorgeMarin1 Sep 12, 2024
ee95b87
added comments to the code
jorgeMarin1 Sep 12, 2024
2f0aea8
Merge branch 'main' into qviz-bug
jorgeMarin1 Sep 12, 2024
a267f87
deleted code and files that won't be used
jorgeMarin1 Sep 12, 2024
032077c
added unit test
jorgeMarin1 Sep 13, 2024
398494d
addressed changes
jorgeMarin1 Sep 17, 2024
353011f
addressed changes
jorgeMarin1 Sep 18, 2024
13e95a7
snake case
jorgeMarin1 Sep 18, 2024
31ac8e1
edited example of json log file
jorgeMarin1 Sep 19, 2024
8ecfdae
raised exception when failed to create delta table
jorgeMarin1 Sep 19, 2024
6f824d4
edited name of the method that extracted cubes from delta tables
jorgeMarin1 Sep 19, 2024
cb0f1a5
made sure blocks_str is indeed a string
jorgeMarin1 Sep 19, 2024
5f06c45
put definition of cube_string before if statement
jorgeMarin1 Sep 19, 2024
d565fff
added else statement for block_string
jorgeMarin1 Sep 25, 2024
0178d1e
deleted non-necessary comments
jorgeMarin1 Sep 25, 2024
a1d75a7
changed path to test table in README
jorgeMarin1 Sep 25, 2024
512dd67
default value of sampling fraction is 0.02 and that value can be modi…
jorgeMarin1 Sep 25, 2024
22ff6b5
deleted unnecessary files
jorgeMarin1 Sep 25, 2024
1faa7c2
switched to version 3.2.0 of Delta Lake, which is compatible with 3.5…
jorgeMarin1 Sep 25, 2024
cda8c6d
update size of cubes using path of blocks
jorgeMarin1 Sep 26, 2024
b407ada
WIP
Jiaweihu08 Oct 14, 2024
1b5ee6b
Remove large file
Jiaweihu08 Oct 14, 2024
aa486ca
WPI, first working version
Jiaweihu08 Oct 16, 2024
4bce299
Organize code, add tests
Jiaweihu08 Oct 16, 2024
b6855b0
Update README
Jiaweihu08 Oct 17, 2024
db3ebe9
Add comments
Jiaweihu08 Oct 17, 2024
a6b88a7
Update poetry lock and toml
Jiaweihu08 Oct 17, 2024
bc4ef3c
Update README
Jiaweihu08 Oct 17, 2024
2727e64
Correct stats computation
Jiaweihu08 Oct 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
WIP
  • Loading branch information
Jiaweihu08 committed Oct 14, 2024
commit b407ada4aa04c19a699d3dd36a601cb8d130a9bd
20 changes: 0 additions & 20 deletions utils/visualizer/qviz/block.py
Original file line number Diff line number Diff line change
@@ -1,20 +0,0 @@
from qviz.cube import normalize_weight

class Block:
def __init__(
self,
cube_id: str,
element_count: int,
min_weight: int,
max_weight: int,
file: str,
size: int,
):
self.cube_id = cube_id
self.file = file
self.min_weight = normalize_weight(min_weight)
self.max_weight = normalize_weight(max_weight)
self.element_count = element_count
self.size = size
self.children = []
self.parent = None
261 changes: 80 additions & 181 deletions utils/visualizer/qviz/content_loader.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
import json
import os
import glob
from typing import Optional
import pandas as pd
from deltalake import DeltaTable
import pyarrow.parquet as pa
from collections import defaultdict
from qviz.cube import Cube, SamplingInfo, normalize_weight
from qviz.block import Block

from deltalake import DeltaTable

from qviz.cube import Cube, Block, SamplingInfo


# Returns metadata (dict), cubes(dict), root(cube), nodes & edges (list[dict])
def process_table(
table_path: str, revision_id: str
table_path: str, revision_id: int
) -> tuple[dict, dict, Cube, list[dict]]:
"""
Process a delta table given a path to a Qbeast table and a revision ID.
Expand All @@ -38,209 +33,113 @@ def process_table(
:param revision_id: Configuration entry for the target RevisionID. e.g. 1
:return: a dictionary with the metadata of the table, a dictionary with all the created cubes, the root node (a cube) of the viusalization and a list of dictionaries with the nodes and edges of the visualization.
"""

# Create the Delta table
delta_table = create_delta_table(table_path)

# Extract metadata
metadata, symbol_count = extract_metadata_from_delta_table(delta_table, revision_id)

# Extract cubes
cubes = extract_cubes_from_delta_table(delta_table, symbol_count)

# Build Tree
delta_table = DeltaTable(table_path)
revision = load_revision(delta_table, revision_id)
cubes = load_revision_cubes(delta_table, revision)
root = populate_tree(cubes)

# Build Tree
elements = delta_nodes_and_edges(cubes)

return metadata, cubes, root, elements
elements = get_nodes_and_edges(cubes)
return revision, cubes, root, elements


# 1. create delta table
def create_delta_table(table_path: str) -> DeltaTable:
"""
Creates delta table given a path where a Qbeast table is stored.
:param table_path: local path to your Qbeast table
:return: a Delta table
"""
def load_revision(delta_table: DeltaTable, revision_id: int) -> (dict, int):
config = delta_table.metadata().configuration
revision_key = f"qbeast.revision.{revision_id}"
try:
# Try to find the table in the provided path
delta_table = DeltaTable(table_path)
return delta_table

except Exception as e:
# If the table doesn't exist, throw an exception
print(f"Failed when creating the delta table: {e}\n")
revision_str = config[revision_key]
revision = json.loads(revision_str)
return revision
except KeyError as e:
available_revisions = [k for k in config.keys() if k.startswith("qbeast.revision.")]
print(f"No metadata found for the given RevisionID: {revision_id}. Available revisions: {available_revisions}")
raise e


# 2. extract metadata from delta table
# Returns metadata (dict), dimension_count (int) and symbol_count (int)
def extract_metadata_from_delta_table(
delta_table: DeltaTable, revision_id: str
) -> (dict, int):
"""
Extract metadata from a Delta table by checking if the revision ID is in that table.
If that ID exists in the table, a revision key (e.g. qbeast.revision.1) is created and the metadata is returned.
It also calculates the number of symbols used to representate the visualization.
:param delta_table: a created delta table.
:param revision_id: Configuration entry for the target RevisionID. e.g. 1
:return: A dictionary with the metadata related to the revisionId, an integer
representing the number of simbols used for the visualization representation.
"""
def load_revision_cubes(delta_table: DeltaTable, revision: dict) -> dict:
revision_id_str = str(revision["revisionID"])
dimension_count = len(revision["columnTransformers"])
symbol_count = (dimension_count + 5) // 6
all_cubes = dict()
add_files = delta_table.get_add_actions(True).to_pylist()
for add_file in add_files:
if add_file.get("tags.revision", '-1') != revision_id_str:
continue
file_bytes = add_file["size_bytes"]
file_path = add_file["path"]
blocks = json.loads(add_file["tags.blocks"])
for block_dict in blocks:
cube_id = block_dict["cubeId"]
if cube_id not in all_cubes:
depth = len(cube_id) // symbol_count
cube = Cube(cube_id, depth=depth)
all_cubes[cube_id] = cube
cube = all_cubes[cube_id]
block = Block.from_dict(block_dict, file_path, file_bytes)
cube.add(block)
return all_cubes

# List with all the revision ids
metadata = delta_table.metadata().configuration #This returns a dictionary with the metadata
revision_key = "qbeast.revision." + str(revision_id)

if revision_key in metadata.keys():
metadata_key_str = metadata[revision_key]
metadata_key = json.loads(metadata_key_str)
dimension_count = len(metadata_key["columnTransformers"])
symbol_count = (dimension_count + 5) // 6
return metadata_key, symbol_count
else:
print(f"No metadata found for the given RevisionID. There is no Qbeast table.")
return None

# 3. Returns a dictionary, the keys are the cubeids and the values are the actual cubes
def extract_cubes_from_delta_table(delta_table: DeltaTable, symbol_count: int) -> dict:
"""
Takes all the blocks stored in the table and creates the cubes.
:param delta_table: list of json log files
:param symbol_count: Integer representing the number of symbols used for the representation
:return: A dictionary with all the cubes stored in the table.
"""

df = delta_table.get_add_actions(True).to_pandas()

df_filtered = df[["size_bytes", "tags.blocks", "path"]]
# In this dictionary we store a cube for every cubeId. Each cube will be associated to his cube ID
cubes_dict = dict()

# Iterate throughout each row of the dataframe
for index, row in df_filtered.iterrows():
blocks_str = row[
"tags.blocks"
]
blocks_str = str(blocks_str)
if isinstance(blocks_str, str):
try:
blocks = json.loads(
blocks_str
) # Convert the string JSON into a list of dictionaries
for block in blocks:
# Check if we already have a cube in the dictionary with the cube ID of this block
# If we already have a cube with this ID, we update the atributes of the cube
# Since objects are passed by reference, they can be modified in situ
cube_string = block["cubeId"]
if cube_string in cubes_dict.keys():

element_count_block = block["elementCount"]
min_weight_block = block["minWeight"]
max_weight_block = normalize_weight( int(block["maxWeight"]) )
file = row["path"]
size_block = int(row["size_bytes"])
# We create a new block
new_block = Block(cube_string, element_count_block, min_weight_block, max_weight_block, file, size_block)

# Get the cube from the dictionary
dict_cube = cubes_dict[cube_string]
normalized_max_weight = normalize_weight(
int(block["maxWeight"])
)
dict_cube.max_weight = min(max_weight, normalized_max_weight)
dict_cube.element_count += int(block["elementCount"])

# Let's search if we have a block for that file
repeated = False
for cube_block in dict_cube.blocks:
if cube_block.file == file:
repeated = True
if repeated == False:
dict_cube.size += int(row["size_bytes"])

# Append the new cube to the blocks list
dict_cube.blocks.append(new_block)

# If we don't have this cube in the dictionary, a new cube is created
else:
# Calculate depth of the cube
depth = len(cube_string) // symbol_count
# Assign max_weight, min_weight, elemnt_count a value for the new cube
element_count = block["elementCount"]
min_weight = block["minWeight"]
max_weight = normalize_weight( int(block["maxWeight"]) )
file = row["path"]
size = int(row["size_bytes"])
# We create the new block
new_block = Block(cube_string, element_count, min_weight, max_weight, file, size)
# Create cube and add the block the the cube's list
cubes_dict[cube_string] = Cube(
cube_string, max_weight, element_count, size, depth
)
cubes_dict[cube_string].blocks.append(new_block)

except json.JSONDecodeError:
print("Error decoding JSON:", blocks_str)
raise
else:
print("The format of the block_string is not a string")

return cubes_dict


# 4. build index from the list of cube blocks
# Returns the root of the tree (cube)
def populate_tree(cubes: dict) -> Cube:
"""
Assigns a depth (in terms if a tree viusalization) to each cube and finds the root of the tree.
It also creates the father - child relationships between these cubes.
:param cubes: dictionary with the cubes stored in a delta table.
:return: A cube that represents the root of the visualization tree.
"""

# Initialize variables. level_cubes is a dictionary of lists, being the key each level of representation and the matching value a list of the cubes in that level.
max_level = 0
level_cubes = defaultdict(list)

# Get the cubes from the values of the dictionary and assign a level (key) depending of the depth of each cube
cubes = cubes.values()
for cube in cubes:
level_cubes[cube.depth].append(cube)
max_level = max(max_level, cube.depth)

# Create father - child relationships between the cubes
for level in range(max_level):
for cube in level_cubes[level]:
for child in level_cubes[level + 1]:
child.link(cube)

root = level_cubes[0][0]
return root


# Populate Tree: Nodes & Edges (list[dict])
def delta_nodes_and_edges(cubes: dict, fraction: float = -1.0) -> list[dict]:
"""
Sampling function.
:param cubes: dictionary with the cubes stored in a delta table.
:param fraction: float between 0 and 1, used to select cubes based on their normalized maximum weight
:return: A list of dictionaries that contains the nodes and their connections.
"""

def get_nodes_and_edges(cubes: dict, fraction: float = -1.0) -> list[dict]:
nodes = []
edges = []
sampling_info = SamplingInfo(fraction)
for cube in cubes.values():
print("Cube N&E: ", cube)
node, connections = cube.get_elements_for_sampling(fraction)
nodes.append(node)
edges.extend(connections)
sampling_info.update(cube, node["selected"])

if fraction > 0:
print(sampling_info)

return nodes + edges


# def get_node_and_edges_from_cube(cube: Cube, fraction: float) -> tuple[dict, list[dict]]:
# node, edges = cube.get_elements_for_sampling(fraction)
# return node, edges


def get_node_and_edges_from_cube(cube: Cube, fraction: float) -> (dict, list[dict]):
"""
Return cube and edges for drawing if the cube or any of its children are selected
for the given sampling fraction.

:param cube: cube to be drawn
:param fraction: sampling fraction between [0, 1]
:return: styled node and edges for drawing depending on if they are selected for
sampling
"""
selected = any([b.is_sampled(fraction) for b in cube.blocks])
name = cube.cube_id or "root"
label = (name + " " if name == "root" else "") + str(cube.max_weight)
node = {
"data": {"id": name, "label": label},
"selected": selected,
"classes": "sampled" if selected else "",
}

edges = []
for child in cube.children:
selected_child = child.is_sampled(fraction)
edges.append(
{
"data": {"source": name, "target": child.cube_id},
"selected": selected_child,
"classes": "sampled" if selected_child else "",
}
)

return node, edges
Loading