Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.iotdb.confignode.manager.pipe.metric.overview;

import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
import org.apache.iotdb.commons.enums.PipeRateAverage;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.metric.PipeRemainingOperator;
import org.apache.iotdb.confignode.manager.pipe.agent.task.PipeConfigNodeSubtask;
Expand Down Expand Up @@ -56,7 +56,7 @@ class PipeConfigNodeRemainingTimeOperator extends PipeRemainingOperator {
* @return The estimated remaining time
*/
double getRemainingTime() {
final PipeRemainingTimeRateAverageTime pipeRemainingTimeCommitRateAverageTime =
final PipeRateAverage pipeRemainingTimeCommitRateAverageTime =
PipeConfig.getInstance().getPipeRemainingTimeCommitRateAverageTime();

// Do not calculate heartbeat event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.iotdb.db.pipe.metric.overview;

import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
import org.apache.iotdb.commons.enums.PipeRateAverage;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.metric.PipeRemainingOperator;
import org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtractor;
Expand Down Expand Up @@ -106,7 +106,7 @@ void decreaseHeartbeatEventCount() {
lastInsertNodeEventCountSmoothingTime = System.currentTimeMillis();
}
return PipeConfig.getInstance()
.getPipeRemainingTimeCommitRateAverageTime()
.getPipeRemainingInsertNodeCountAverage()
.getMeterRate(insertNodeEventCountMeter);
}

Expand Down Expand Up @@ -135,7 +135,7 @@ long getRemainingEvents() {
* @return The estimated remaining time
*/
double getRemainingTime() {
final PipeRemainingTimeRateAverageTime pipeRemainingTimeCommitRateAverageTime =
final PipeRateAverage pipeRemainingTimeCommitRateAverageTime =
PipeConfig.getInstance().getPipeRemainingTimeCommitRateAverageTime();

final double invocationValue = collectInvocationHistogram.getMean();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProperty;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.enums.HandleSystemErrorStrategy;
import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
import org.apache.iotdb.commons.enums.PipeRateAverage;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
import org.apache.iotdb.rpc.RpcUtils;
Expand Down Expand Up @@ -280,7 +280,7 @@ public class CommonConfig {
private long pipeStorageEngineFlushTimeIntervalMs = Long.MAX_VALUE;
private int pipeMaxAllowedRemainingInsertEventCountPerPipe = 10000;
private int pipeMaxAllowedTotalRemainingInsertEventCount = 50000;
private int pipeRemainingEventCountSmoothingIntervalSeconds = 15;
private int pipeRemainingEventCountSmoothingIntervalSeconds = 10;

private int pipeMetaReportMaxLogNumPerRound = 10;
private int pipeMetaReportMaxLogIntervalRounds = 36;
Expand All @@ -301,8 +301,8 @@ public class CommonConfig {
private long pipeListeningQueueTransferSnapshotThreshold = 1000;
private int pipeSnapshotExecutionMaxBatchSize = 1000;
private long pipeRemainingTimeCommitRateAutoSwitchSeconds = 30;
private PipeRemainingTimeRateAverageTime pipeRemainingTimeCommitRateAverageTime =
PipeRemainingTimeRateAverageTime.MEAN;
private PipeRateAverage pipeRemainingTimeCommitRateAverageTime = PipeRateAverage.FIVE_MINUTES;
private PipeRateAverage pipeRemainingInsertNodeCountAverage = PipeRateAverage.ONE_MINUTE;
private double pipeTsFileScanParsingThreshold = 0.05;
private double pipeDynamicMemoryHistoryWeight = 0.5;
private double pipeDynamicMemoryAdjustmentThreshold = 0.05;
Expand Down Expand Up @@ -1831,12 +1831,12 @@ public void setPipeRemainingTimeCommitRateAutoSwitchSeconds(
pipeRemainingTimeCommitRateAutoSwitchSeconds);
}

public PipeRemainingTimeRateAverageTime getPipeRemainingTimeCommitRateAverageTime() {
public PipeRateAverage getPipeRemainingTimeCommitRateAverageTime() {
return pipeRemainingTimeCommitRateAverageTime;
}

public void setPipeRemainingTimeCommitRateAverageTime(
PipeRemainingTimeRateAverageTime pipeRemainingTimeCommitRateAverageTime) {
PipeRateAverage pipeRemainingTimeCommitRateAverageTime) {
if (Objects.equals(
this.pipeRemainingTimeCommitRateAverageTime, pipeRemainingTimeCommitRateAverageTime)) {
return;
Expand All @@ -1847,6 +1847,21 @@ public void setPipeRemainingTimeCommitRateAverageTime(
pipeRemainingTimeCommitRateAverageTime);
}

public PipeRateAverage getPipeRemainingInsertNodeCountAverage() {
return pipeRemainingInsertNodeCountAverage;
}

public void setPipeRemainingInsertNodeCountAverage(
PipeRateAverage pipeRemainingInsertNodeCountAverage) {
if (Objects.equals(
this.pipeRemainingInsertNodeCountAverage, pipeRemainingInsertNodeCountAverage)) {
return;
}
this.pipeRemainingInsertNodeCountAverage = pipeRemainingInsertNodeCountAverage;
logger.info(
"pipeRemainingInsertEventCountAverage is set to {}", pipeRemainingInsertNodeCountAverage);
}

public double getPipeTsFileScanParsingThreshold() {
return pipeTsFileScanParsingThreshold;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@

package org.apache.iotdb.commons.enums;

import org.apache.iotdb.commons.pipe.config.PipeConfig;

import com.codahale.metrics.Meter;

public enum PipeRemainingTimeRateAverageTime {
public enum PipeRateAverage {
ONE_MINUTE,
FIVE_MINUTES,
FIFTEEN_MINUTES,
Expand All @@ -41,9 +39,7 @@ public double getMeterRate(final Meter meter) {
return meter.getMeanRate();
default:
throw new UnsupportedOperationException(
String.format(
"The type %s is not supported in average time of pipe remaining time.",
PipeConfig.getInstance().getPipeRemainingTimeCommitRateAverageTime()));
String.format("The type %s is not supported in pipe rate average.", this));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
import org.apache.iotdb.commons.enums.PipeRateAverage;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -223,10 +223,14 @@ public long getPipeRemainingTimeCommitAutoSwitchSeconds() {
return COMMON_CONFIG.getPipeRemainingTimeCommitRateAutoSwitchSeconds();
}

public PipeRemainingTimeRateAverageTime getPipeRemainingTimeCommitRateAverageTime() {
public PipeRateAverage getPipeRemainingTimeCommitRateAverageTime() {
return COMMON_CONFIG.getPipeRemainingTimeCommitRateAverageTime();
}

public PipeRateAverage getPipeRemainingInsertNodeCountAverage() {
return COMMON_CONFIG.getPipeRemainingInsertNodeCountAverage();
}

public double getPipeTsFileScanParsingThreshold() {
return COMMON_CONFIG.getPipeTsFileScanParsingThreshold();
}
Expand Down Expand Up @@ -513,6 +517,8 @@ public void printAllConfigs() {
getPipeRemainingTimeCommitAutoSwitchSeconds());
LOGGER.info(
"PipeRemainingTimeCommitRateAverageTime: {}", getPipeRemainingTimeCommitRateAverageTime());
LOGGER.info(
"PipePipeRemainingInsertEventCountAverage: {}", getPipeRemainingInsertNodeCountAverage());
LOGGER.info("PipeTsFileScanParsingThreshold(): {}", getPipeTsFileScanParsingThreshold());

LOGGER.info("PipeDynamicMemoryHistoryWeight: {}", getPipeDynamicMemoryHistoryWeight());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.ConfigurationFileUtils;
import org.apache.iotdb.commons.conf.TrimProperties;
import org.apache.iotdb.commons.enums.PipeRemainingTimeRateAverageTime;
import org.apache.iotdb.commons.enums.PipeRateAverage;

import java.io.IOException;
import java.util.Optional;
Expand Down Expand Up @@ -184,12 +184,19 @@ public static void loadPipeStaticConfig(CommonConfig config, TrimProperties prop
"pipe_snapshot_execution_max_batch_size",
String.valueOf(config.getPipeSnapshotExecutionMaxBatchSize()))));
config.setPipeRemainingTimeCommitRateAverageTime(
PipeRemainingTimeRateAverageTime.valueOf(
PipeRateAverage.valueOf(
properties
.getProperty(
"pipe_remaining_time_commit_rate_average_time",
String.valueOf(config.getPipeRemainingTimeCommitRateAverageTime()))
.trim()));
config.setPipeRemainingInsertNodeCountAverage(
PipeRateAverage.valueOf(
properties
.getProperty(
"pipe_remaining_insert_node_count_average",
String.valueOf(config.getPipeRemainingInsertNodeCountAverage()))
.trim()));
}

public static void loadPipeInternalConfig(CommonConfig config, TrimProperties properties)
Expand Down
Loading