@@ -57,7 +57,9 @@ const uint32_t EMAIL_OFFSET = USERNAME_OFFSET + USERNAME_SIZE;
5757const uint32_t ROW_SIZE = ID_SIZE + USERNAME_SIZE + EMAIL_SIZE ;
5858
5959const uint32_t PAGE_SIZE = 4096 ;
60- #define TABLE_MAX_PAGES 100
60+ #define TABLE_MAX_PAGES 400
61+
62+ #define INVALID_PAGE_NUM UINT32_MAX
6163
6264typedef struct {
6365 int file_descriptor ;
@@ -116,7 +118,7 @@ const uint32_t INTERNAL_NODE_CHILD_SIZE = sizeof(uint32_t);
116118const uint32_t INTERNAL_NODE_CELL_SIZE =
117119 INTERNAL_NODE_CHILD_SIZE + INTERNAL_NODE_KEY_SIZE ;
118120/* Keep this small for testing */
119- const uint32_t INTERNAL_NODE_MAX_CELLS = 3 ;
121+ const uint32_t INTERNAL_NODE_MAX_KEYS = 3 ;
120122
121123/*
122124 * Leaf Node Header Layout
@@ -186,9 +188,19 @@ uint32_t* internal_node_child(void* node, uint32_t child_num) {
186188 printf ("Tried to access child_num %d > num_keys %d\n" , child_num , num_keys );
187189 exit (EXIT_FAILURE );
188190 } else if (child_num == num_keys ) {
189- return internal_node_right_child (node );
191+ uint32_t * right_child = internal_node_right_child (node );
192+ if (* right_child == INVALID_PAGE_NUM ) {
193+ printf ("Tried to access right child of node, but was invalid page\n" );
194+ exit (EXIT_FAILURE );
195+ }
196+ return right_child ;
190197 } else {
191- return internal_node_cell (node , child_num );
198+ uint32_t * child = internal_node_cell (node , child_num );
199+ if (* child == INVALID_PAGE_NUM ) {
200+ printf ("Tried to access child %d of node, but was invalid page\n" , child_num );
201+ exit (EXIT_FAILURE );
202+ }
203+ return child ;
192204 }
193205}
194206
@@ -216,24 +228,6 @@ void* leaf_node_value(void* node, uint32_t cell_num) {
216228 return leaf_node_cell (node , cell_num ) + LEAF_NODE_KEY_SIZE ;
217229}
218230
219- uint32_t get_node_max_key (void * node ) {
220- switch (get_node_type (node )) {
221- case NODE_INTERNAL :
222- return * internal_node_key (node , * internal_node_num_keys (node ) - 1 );
223- case NODE_LEAF :
224- return * leaf_node_key (node , * leaf_node_num_cells (node ) - 1 );
225- }
226- }
227-
228- void print_constants () {
229- printf ("ROW_SIZE: %d\n" , ROW_SIZE );
230- printf ("COMMON_NODE_HEADER_SIZE: %d\n" , COMMON_NODE_HEADER_SIZE );
231- printf ("LEAF_NODE_HEADER_SIZE: %d\n" , LEAF_NODE_HEADER_SIZE );
232- printf ("LEAF_NODE_CELL_SIZE: %d\n" , LEAF_NODE_CELL_SIZE );
233- printf ("LEAF_NODE_SPACE_FOR_CELLS: %d\n" , LEAF_NODE_SPACE_FOR_CELLS );
234- printf ("LEAF_NODE_MAX_CELLS: %d\n" , LEAF_NODE_MAX_CELLS );
235- }
236-
237231void * get_page (Pager * pager , uint32_t page_num ) {
238232 if (page_num > TABLE_MAX_PAGES ) {
239233 printf ("Tried to fetch page number out of bounds. %d > %d\n" , page_num ,
@@ -270,6 +264,23 @@ void* get_page(Pager* pager, uint32_t page_num) {
270264 return pager -> pages [page_num ];
271265}
272266
267+ uint32_t get_node_max_key (Pager * pager , void * node ) {
268+ if (get_node_type (node ) == NODE_LEAF ) {
269+ return * leaf_node_key (node , * leaf_node_num_cells (node ) - 1 );
270+ }
271+ void * right_child = get_page (pager ,* internal_node_right_child (node ));
272+ return get_node_max_key (pager , right_child );
273+ }
274+
275+ void print_constants () {
276+ printf ("ROW_SIZE: %d\n" , ROW_SIZE );
277+ printf ("COMMON_NODE_HEADER_SIZE: %d\n" , COMMON_NODE_HEADER_SIZE );
278+ printf ("LEAF_NODE_HEADER_SIZE: %d\n" , LEAF_NODE_HEADER_SIZE );
279+ printf ("LEAF_NODE_CELL_SIZE: %d\n" , LEAF_NODE_CELL_SIZE );
280+ printf ("LEAF_NODE_SPACE_FOR_CELLS: %d\n" , LEAF_NODE_SPACE_FOR_CELLS );
281+ printf ("LEAF_NODE_MAX_CELLS: %d\n" , LEAF_NODE_MAX_CELLS );
282+ }
283+
273284void indent (uint32_t level ) {
274285 for (uint32_t i = 0 ; i < level ; i ++ ) {
275286 printf (" " );
@@ -294,15 +305,17 @@ void print_tree(Pager* pager, uint32_t page_num, uint32_t indentation_level) {
294305 num_keys = * internal_node_num_keys (node );
295306 indent (indentation_level );
296307 printf ("- internal (size %d)\n" , num_keys );
297- for (uint32_t i = 0 ; i < num_keys ; i ++ ) {
298- child = * internal_node_child (node , i );
308+ if (num_keys > 0 ) {
309+ for (uint32_t i = 0 ; i < num_keys ; i ++ ) {
310+ child = * internal_node_child (node , i );
311+ print_tree (pager , child , indentation_level + 1 );
312+
313+ indent (indentation_level + 1 );
314+ printf ("- key %d\n" , * internal_node_key (node , i ));
315+ }
316+ child = * internal_node_right_child (node );
299317 print_tree (pager , child , indentation_level + 1 );
300-
301- indent (indentation_level + 1 );
302- printf ("- key %d\n" , * internal_node_key (node , i ));
303318 }
304- child = * internal_node_right_child (node );
305- print_tree (pager , child , indentation_level + 1 );
306319 break ;
307320 }
308321}
@@ -330,6 +343,12 @@ void initialize_internal_node(void* node) {
330343 set_node_type (node , NODE_INTERNAL );
331344 set_node_root (node , false);
332345 * internal_node_num_keys (node ) = 0 ;
346+ /*
347+ Necessary because the root page number is 0; by not initializing an internal
348+ node's right child to an invalid page number when initializing the node, we may
349+ end up with 0 as the node's right child, which makes the node a parent of the root
350+ */
351+ * internal_node_right_child (node ) = INVALID_PAGE_NUM ;
333352}
334353
335354Cursor * leaf_node_find (Table * table , uint32_t page_num , uint32_t key ) {
@@ -661,22 +680,40 @@ void create_new_root(Table* table, uint32_t right_child_page_num) {
661680 uint32_t left_child_page_num = get_unused_page_num (table -> pager );
662681 void * left_child = get_page (table -> pager , left_child_page_num );
663682
683+ if (get_node_type (root ) == NODE_INTERNAL ) {
684+ initialize_internal_node (right_child );
685+ initialize_internal_node (left_child );
686+ }
687+
664688 /* Left child has data copied from old root */
665689 memcpy (left_child , root , PAGE_SIZE );
666690 set_node_root (left_child , false);
667691
692+ if (get_node_type (left_child ) == NODE_INTERNAL ) {
693+ void * child ;
694+ for (int i = 0 ; i < * internal_node_num_keys (left_child ); i ++ ) {
695+ child = get_page (table -> pager , * internal_node_child (left_child ,i ));
696+ * node_parent (child ) = left_child_page_num ;
697+ }
698+ child = get_page (table -> pager , * internal_node_right_child (left_child ));
699+ * node_parent (child ) = left_child_page_num ;
700+ }
701+
668702 /* Root node is a new internal node with one key and two children */
669703 initialize_internal_node (root );
670704 set_node_root (root , true);
671705 * internal_node_num_keys (root ) = 1 ;
672706 * internal_node_child (root , 0 ) = left_child_page_num ;
673- uint32_t left_child_max_key = get_node_max_key (left_child );
707+ uint32_t left_child_max_key = get_node_max_key (table -> pager , left_child );
674708 * internal_node_key (root , 0 ) = left_child_max_key ;
675709 * internal_node_right_child (root ) = right_child_page_num ;
676710 * node_parent (left_child ) = table -> root_page_num ;
677711 * node_parent (right_child ) = table -> root_page_num ;
678712}
679713
714+ void internal_node_split_and_insert (Table * table , uint32_t parent_page_num ,
715+ uint32_t child_page_num );
716+
680717void internal_node_insert (Table * table , uint32_t parent_page_num ,
681718 uint32_t child_page_num ) {
682719 /*
@@ -685,25 +722,39 @@ void internal_node_insert(Table* table, uint32_t parent_page_num,
685722
686723 void * parent = get_page (table -> pager , parent_page_num );
687724 void * child = get_page (table -> pager , child_page_num );
688- uint32_t child_max_key = get_node_max_key (child );
725+ uint32_t child_max_key = get_node_max_key (table -> pager , child );
689726 uint32_t index = internal_node_find_child (parent , child_max_key );
690727
691728 uint32_t original_num_keys = * internal_node_num_keys (parent );
692- * internal_node_num_keys (parent ) = original_num_keys + 1 ;
693729
694- if (original_num_keys >= INTERNAL_NODE_MAX_CELLS ) {
695- printf ( "Need to implement splitting internal node\n" );
696- exit ( EXIT_FAILURE ) ;
730+ if (original_num_keys >= INTERNAL_NODE_MAX_KEYS ) {
731+ internal_node_split_and_insert ( table , parent_page_num , child_page_num );
732+ return ;
697733 }
698734
699735 uint32_t right_child_page_num = * internal_node_right_child (parent );
736+ /*
737+ An internal node with a right child of INVALID_PAGE_NUM is empty
738+ */
739+ if (right_child_page_num == INVALID_PAGE_NUM ) {
740+ * internal_node_right_child (parent ) = child_page_num ;
741+ return ;
742+ }
743+
700744 void * right_child = get_page (table -> pager , right_child_page_num );
745+ /*
746+ If we are already at the max number of cells for a node, we cannot increment
747+ before splitting. Incrementing without inserting a new key/child pair
748+ and immediately calling internal_node_split_and_insert has the effect
749+ of creating a new key at (max_cells + 1) with an uninitialized value
750+ */
751+ * internal_node_num_keys (parent ) = original_num_keys + 1 ;
701752
702- if (child_max_key > get_node_max_key (right_child )) {
753+ if (child_max_key > get_node_max_key (table -> pager , right_child )) {
703754 /* Replace right child */
704755 * internal_node_child (parent , original_num_keys ) = right_child_page_num ;
705756 * internal_node_key (parent , original_num_keys ) =
706- get_node_max_key (right_child );
757+ get_node_max_key (table -> pager , right_child );
707758 * internal_node_right_child (parent ) = child_page_num ;
708759 } else {
709760 /* Make room for the new cell */
@@ -722,6 +773,100 @@ void update_internal_node_key(void* node, uint32_t old_key, uint32_t new_key) {
722773 * internal_node_key (node , old_child_index ) = new_key ;
723774}
724775
776+ void internal_node_split_and_insert (Table * table , uint32_t parent_page_num ,
777+ uint32_t child_page_num ) {
778+ uint32_t old_page_num = parent_page_num ;
779+ void * old_node = get_page (table -> pager ,parent_page_num );
780+ uint32_t old_max = get_node_max_key (table -> pager , old_node );
781+
782+ void * child = get_page (table -> pager , child_page_num );
783+ uint32_t child_max = get_node_max_key (table -> pager , child );
784+
785+ uint32_t new_page_num = get_unused_page_num (table -> pager );
786+
787+ /*
788+ Declaring a flag before updating pointers which
789+ records whether this operation involves splitting the root -
790+ if it does, we will insert our newly created node during
791+ the step where the table's new root is created. If it does
792+ not, we have to insert the newly created node into its parent
793+ after the old node's keys have been transferred over. We are not
794+ able to do this if the newly created node's parent is not a newly
795+ initialized root node, because in that case its parent may have existing
796+ keys aside from our old node which we are splitting. If that is true, we
797+ need to find a place for our newly created node in its parent, and we
798+ cannot insert it at the correct index if it does not yet have any keys
799+ */
800+ uint32_t splitting_root = is_node_root (old_node );
801+
802+ void * parent ;
803+ void * new_node ;
804+ if (splitting_root ) {
805+ create_new_root (table , new_page_num );
806+ parent = get_page (table -> pager ,table -> root_page_num );
807+ /*
808+ If we are splitting the root, we need to update old_node to point
809+ to the new root's left child, new_page_num will already point to
810+ the new root's right child
811+ */
812+ old_page_num = * internal_node_child (parent ,0 );
813+ old_node = get_page (table -> pager , old_page_num );
814+ } else {
815+ parent = get_page (table -> pager ,* node_parent (old_node ));
816+ new_node = get_page (table -> pager , new_page_num );
817+ initialize_internal_node (new_node );
818+ }
819+
820+ uint32_t * old_num_keys = internal_node_num_keys (old_node );
821+
822+ uint32_t cur_page_num = * internal_node_right_child (old_node );
823+ void * cur = get_page (table -> pager , cur_page_num );
824+
825+ /*
826+ First put right child into new node and set right child of old node to invalid page number
827+ */
828+ internal_node_insert (table , new_page_num , cur_page_num );
829+ * node_parent (cur ) = new_page_num ;
830+ * internal_node_right_child (old_node ) = INVALID_PAGE_NUM ;
831+ /*
832+ For each key until you get to the middle key, move the key and the child to the new node
833+ */
834+ for (int i = INTERNAL_NODE_MAX_KEYS - 1 ; i > INTERNAL_NODE_MAX_KEYS / 2 ; i -- ) {
835+ cur_page_num = * internal_node_child (old_node , i );
836+ cur = get_page (table -> pager , cur_page_num );
837+
838+ internal_node_insert (table , new_page_num , cur_page_num );
839+ * node_parent (cur ) = new_page_num ;
840+
841+ (* old_num_keys )-- ;
842+ }
843+
844+ /*
845+ Set child before middle key, which is now the highest key, to be node's right child,
846+ and decrement number of keys
847+ */
848+ * internal_node_right_child (old_node ) = * internal_node_child (old_node ,* old_num_keys - 1 );
849+ (* old_num_keys )-- ;
850+
851+ /*
852+ Determine which of the two nodes after the split should contain the child to be inserted,
853+ and insert the child
854+ */
855+ uint32_t max_after_split = get_node_max_key (table -> pager , old_node );
856+
857+ uint32_t destination_page_num = child_max < max_after_split ? old_page_num : new_page_num ;
858+
859+ internal_node_insert (table , destination_page_num , child_page_num );
860+ * node_parent (child ) = destination_page_num ;
861+
862+ update_internal_node_key (parent , old_max , get_node_max_key (table -> pager , old_node ));
863+
864+ if (!splitting_root ) {
865+ internal_node_insert (table ,* node_parent (old_node ),new_page_num );
866+ * node_parent (new_node ) = * node_parent (old_node );
867+ }
868+ }
869+
725870void leaf_node_split_and_insert (Cursor * cursor , uint32_t key , Row * value ) {
726871 /*
727872 Create a new node and move half the cells over.
@@ -730,7 +875,7 @@ void leaf_node_split_and_insert(Cursor* cursor, uint32_t key, Row* value) {
730875 */
731876
732877 void * old_node = get_page (cursor -> table -> pager , cursor -> page_num );
733- uint32_t old_max = get_node_max_key (old_node );
878+ uint32_t old_max = get_node_max_key (cursor -> table -> pager , old_node );
734879 uint32_t new_page_num = get_unused_page_num (cursor -> table -> pager );
735880 void * new_node = get_page (cursor -> table -> pager , new_page_num );
736881 initialize_leaf_node (new_node );
@@ -772,7 +917,7 @@ void leaf_node_split_and_insert(Cursor* cursor, uint32_t key, Row* value) {
772917 return create_new_root (cursor -> table , new_page_num );
773918 } else {
774919 uint32_t parent_page_num = * node_parent (old_node );
775- uint32_t new_max = get_node_max_key (old_node );
920+ uint32_t new_max = get_node_max_key (cursor -> table -> pager , old_node );
776921 void * parent = get_page (cursor -> table -> pager , parent_page_num );
777922
778923 update_internal_node_key (parent , old_max , new_max );
0 commit comments