Skip to content

Commit

Permalink
[BugFix] Fix rows and bytes rate statistic in routine load (backport #…
Browse files Browse the repository at this point in the history
…52151) (#52169)

Co-authored-by: wyb <wybb86@gmail.com>
  • Loading branch information
mergify[bot] and wyb authored Oct 22, 2024
1 parent fef8aa7 commit 663d26b
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -449,9 +449,9 @@ protected String getStatistic() {
summary.put("unselectedRows", Long.valueOf(unselectedRows));
summary.put("receivedBytes", Long.valueOf(receivedBytes));
summary.put("taskExecuteTimeMs", Long.valueOf(totalTaskExcutionTimeMs));
summary.put("receivedBytesRate", Long.valueOf(receivedBytes / totalTaskExcutionTimeMs * 1000));
summary.put("receivedBytesRate", Long.valueOf(receivedBytes * 1000 / totalTaskExcutionTimeMs));
summary.put("loadRowsRate",
Long.valueOf((totalRows - errorRows - unselectedRows) / totalTaskExcutionTimeMs * 1000));
Long.valueOf((totalRows - errorRows - unselectedRows) * 1000 / totalTaskExcutionTimeMs));
summary.put("committedTaskNum", Long.valueOf(committedTaskNum));
summary.put("abortedTaskNum", Long.valueOf(abortedTaskNum));
Gson gson = new GsonBuilder().disableHtmlEscaping().create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,9 @@ protected String getStatistic() {
summary.put("unselectedRows", unselectedRows);
summary.put("receivedBytes", receivedBytes);
summary.put("taskExecuteTimeMs", totalTaskExcutionTimeMs);
summary.put("receivedBytesRate", receivedBytes / totalTaskExcutionTimeMs * 1000);
summary.put("receivedBytesRate", receivedBytes * 1000 / totalTaskExcutionTimeMs);
summary.put("loadRowsRate",
(totalRows - errorRows - unselectedRows) / totalTaskExcutionTimeMs * 1000);
(totalRows - errorRows - unselectedRows) * 1000 / totalTaskExcutionTimeMs);
summary.put("committedTaskNum", committedTaskNum);
summary.put("abortedTaskNum", abortedTaskNum);
Gson gson = new GsonBuilder().disableHtmlEscaping().create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,4 +500,17 @@ public List<Integer> getAllKafkaPartitions(String brokerList, String topic,
Assert.assertEquals("['$.category','$.price','$.author']", newJob.getJsonPaths());
Assert.assertEquals("", newJob.getJsonRoot());
}

@Test
public void testGetStatistic() {
RoutineLoadJob job = new KafkaRoutineLoadJob(1L, "routine_load", 1L, 1L, "127.0.0.1:9020", "topic1");
Deencapsulation.setField(job, "receivedBytes", 10);
Deencapsulation.setField(job, "totalRows", 20);
Deencapsulation.setField(job, "errorRows", 2);
Deencapsulation.setField(job, "unselectedRows", 2);
Deencapsulation.setField(job, "totalTaskExcutionTimeMs", 1000);
String statistic = job.getStatistic();
Assert.assertTrue(statistic.contains("\"receivedBytesRate\":10"));
Assert.assertTrue(statistic.contains("\"loadRowsRate\":16"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.load.routineload;

import com.starrocks.common.jmockit.Deencapsulation;
import org.junit.Assert;
import org.junit.Test;

public class PulsarRoutineLoadJobTest {

@Test
public void testGetStatistic() {
RoutineLoadJob job = new PulsarRoutineLoadJob(1L, "routine_load", 1L, 1L, "127.0.0.1:9020", "topic1", "");
Deencapsulation.setField(job, "receivedBytes", 10);
Deencapsulation.setField(job, "totalRows", 20);
Deencapsulation.setField(job, "errorRows", 2);
Deencapsulation.setField(job, "unselectedRows", 2);
Deencapsulation.setField(job, "totalTaskExcutionTimeMs", 1000);
String statistic = job.getStatistic();
Assert.assertTrue(statistic.contains("\"receivedBytesRate\":10"));
Assert.assertTrue(statistic.contains("\"loadRowsRate\":16"));
}
}

0 comments on commit 663d26b

Please sign in to comment.