Skip to content

Commit

Permalink
[BugFix] Fix potential huge leader fe memory occupation when load pro…
Browse files Browse the repository at this point in the history
…file is enabled (StarRocks#43717)

Signed-off-by: drake_wang <wxl250059@alibaba-inc.com>
  • Loading branch information
wxl24life authored Apr 9, 2024
1 parent 88ff601 commit 22c2e88
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1001,10 +1001,9 @@ public void afterCommitted(TransactionState txnState, boolean txnOperated) throw
return;
}

// sync stream load collect profile
// sync stream load collect profile, here we collect profile only when be has reported
if (isSyncStreamLoad() && coord.isProfileAlreadyReported()) {
collectProfile();
QeProcessorImpl.INSTANCE.unregisterQuery(loadId);
}

writeLock();
Expand All @@ -1017,6 +1016,8 @@ public void afterCommitted(TransactionState txnState, boolean txnOperated) throw
endTimeMs = System.currentTimeMillis();
} finally {
writeUnlock();
// sync stream load related query info should unregister here
QeProcessorImpl.INSTANCE.unregisterQuery(loadId);
}
}

Expand Down Expand Up @@ -1109,10 +1110,6 @@ public void afterAborted(TransactionState txnState, boolean txnOperated, String
return;
}

if (isSyncStreamLoad && coord.isEnableLoadProfile()) {
QeProcessorImpl.INSTANCE.unregisterQuery(loadId);
}

writeLock();
try {
if (isFinalState()) {
Expand All @@ -1132,6 +1129,8 @@ public void afterAborted(TransactionState txnState, boolean txnOperated, String
GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().getCallbackFactory().removeCallback(id);
} finally {
writeUnlock();
// sync stream load related query info should unregister here
QeProcessorImpl.INSTANCE.unregisterQuery(loadId);
}
}

Expand Down Expand Up @@ -1294,6 +1293,10 @@ public String getStateName() {
return state.name();
}

public TUniqueId getTUniqueId() {
return this.loadId;
}

public void setTUniqueId(TUniqueId loadId) {
this.loadId = loadId;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.load.streamload;

import com.starrocks.catalog.Database;
import com.starrocks.catalog.OlapTable;
import com.starrocks.common.UserException;
import com.starrocks.qe.DefaultCoordinator;
import com.starrocks.qe.QeProcessorImpl;
import com.starrocks.thrift.TUniqueId;
import com.starrocks.transaction.TransactionState;
import mockit.Expectations;
import mockit.Mocked;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class StreamLoadTaskTest {

@Mocked
private DefaultCoordinator coord;

private StreamLoadTask streamLoadTask;

@Before
public void setUp() {
long id = 123L;
String label = "label_abc";
long timeoutMs = 10000L;
long createTimeMs = 0L;
boolean isRoutineLoad = false;
long warehouseId = 0L;
streamLoadTask =
new StreamLoadTask(id, new Database(), new OlapTable(), label, timeoutMs, createTimeMs, isRoutineLoad,
warehouseId);
}

@Test
public void testAfterCommitted() throws UserException {
streamLoadTask.setCoordinator(coord);
new Expectations() {
{
coord.isProfileAlreadyReported();
result = false;
}
};
TUniqueId labelId = new TUniqueId(2, 3);
streamLoadTask.setTUniqueId(labelId);
QeProcessorImpl.INSTANCE.registerQuery(streamLoadTask.getTUniqueId(), coord);
Assert.assertEquals(1, QeProcessorImpl.INSTANCE.getCoordinatorCount());

TransactionState txnState = new TransactionState();
boolean txnOperated = true;
streamLoadTask.afterCommitted(txnState, txnOperated);
Assert.assertEquals(0, QeProcessorImpl.INSTANCE.getCoordinatorCount());
}

@Test
public void testAfterAborted() throws UserException {
TransactionState txnState = new TransactionState();
boolean txnOperated = true;

TUniqueId labelId = new TUniqueId(2, 3);
streamLoadTask.setTUniqueId(labelId);
QeProcessorImpl.INSTANCE.registerQuery(streamLoadTask.getTUniqueId(), coord);
Assert.assertEquals(1, QeProcessorImpl.INSTANCE.getCoordinatorCount());

streamLoadTask.afterAborted(txnState, txnOperated, "");
Assert.assertEquals(0, QeProcessorImpl.INSTANCE.getCoordinatorCount());
}
}

0 comments on commit 22c2e88

Please sign in to comment.