|
1 | | -use std::{cmp::max, collections::VecDeque, num::NonZeroU32}; |
| 1 | +use std::{ |
| 2 | + cmp::{max, Ordering}, |
| 3 | + collections::VecDeque, |
| 4 | + num::NonZeroU32, |
| 5 | +}; |
2 | 6 |
|
3 | 7 | use serde::{Deserialize, Serialize}; |
4 | 8 | use smallvec::SmallVec; |
@@ -541,37 +545,44 @@ impl AggregationUpdateQueue { |
541 | 545 | } else if should_be_follower { |
542 | 546 | // Remove the upper edge |
543 | 547 | let count = remove!(task, Upper { task: upper_id }).unwrap_or_default(); |
544 | | - if count > 0 { |
545 | | - let upper_ids: Vec<_> = get_uppers(&upper); |
| 548 | + match count.cmp(&0) { |
| 549 | + Ordering::Less => task.add_new(CachedDataItem::Upper { |
| 550 | + task: upper_id, |
| 551 | + value: count, |
| 552 | + }), |
| 553 | + Ordering::Greater => { |
| 554 | + let upper_ids: Vec<_> = get_uppers(&upper); |
| 555 | + |
| 556 | + // Add the same amount of follower edges |
| 557 | + if update_count!(upper, Follower { task: task_id }, count) { |
| 558 | + // notify uppers about new follower |
| 559 | + if !upper_ids.is_empty() { |
| 560 | + self.push(AggregationUpdateJob::InnerOfUppersHasNewFollower { |
| 561 | + upper_ids: upper_ids.clone(), |
| 562 | + new_follower_id: task_id, |
| 563 | + }); |
| 564 | + } |
| 565 | + } |
546 | 566 |
|
547 | | - // Add the same amount of follower edges |
548 | | - if update_count!(upper, Follower { task: task_id }, count) { |
549 | | - // notify uppers about new follower |
550 | | - if !upper_ids.is_empty() { |
551 | | - self.push(AggregationUpdateJob::InnerOfUppersHasNewFollower { |
| 567 | + // Since this is no longer an inner node, update the aggregated data and |
| 568 | + // followers |
| 569 | + let data = AggregatedDataUpdate::from_task(&mut task).invert(); |
| 570 | + let followers = get_followers(&task); |
| 571 | + let diff = data.apply(&mut upper, ctx.session_id(), self); |
| 572 | + if !upper_ids.is_empty() && !diff.is_empty() { |
| 573 | + self.push(AggregationUpdateJob::AggregatedDataUpdate { |
552 | 574 | upper_ids: upper_ids.clone(), |
553 | | - new_follower_id: task_id, |
| 575 | + update: diff, |
| 576 | + }); |
| 577 | + } |
| 578 | + if !followers.is_empty() { |
| 579 | + self.push(AggregationUpdateJob::InnerLostFollowers { |
| 580 | + upper_ids: vec![upper_id], |
| 581 | + lost_follower_ids: followers, |
554 | 582 | }); |
555 | 583 | } |
556 | 584 | } |
557 | | - |
558 | | - // Since this is no longer an inner node, update the aggregated data and |
559 | | - // followers |
560 | | - let data = AggregatedDataUpdate::from_task(&mut task).invert(); |
561 | | - let followers = get_followers(&task); |
562 | | - let diff = data.apply(&mut upper, ctx.session_id(), self); |
563 | | - if !upper_ids.is_empty() && !diff.is_empty() { |
564 | | - self.push(AggregationUpdateJob::AggregatedDataUpdate { |
565 | | - upper_ids: upper_ids.clone(), |
566 | | - update: diff, |
567 | | - }); |
568 | | - } |
569 | | - if !followers.is_empty() { |
570 | | - self.push(AggregationUpdateJob::InnerLostFollowers { |
571 | | - upper_ids: vec![upper_id], |
572 | | - lost_follower_ids: followers, |
573 | | - }); |
574 | | - } |
| 585 | + Ordering::Equal => {} |
575 | 586 | } |
576 | 587 | } else { |
577 | 588 | // both nodes have the same aggregation number |
|
0 commit comments