Skip to content

Commit

Permalink
[feature](docker suite) Docker suite use independent executor (apache…
Browse files Browse the repository at this point in the history
…#40259)

If many docker suites run parallel, the docker containers take a lot
memory, so use independent executor for them to control the memory
usage.

Use options or config dockerSuiteParallel to set the docker suite
parallel, default is 1.
  • Loading branch information
yujun777 authored Sep 10, 2024
1 parent a8d8798 commit 4238199
Show file tree
Hide file tree
Showing 51 changed files with 195 additions and 87 deletions.
39 changes: 31 additions & 8 deletions docker/runtime/doris-compose/Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,16 @@ Use doris compose to create doris docker compose clusters.

## Requirements

1. The doris image should contains:
##### 1. Make sure you have docker permissions

run:
```
docker run hello-world
```

if have problem with permission denied, then [add-docker-permission](https://docs.docker.com/engine/install/linux-postinstall/).

##### 2. The doris image should contains

```
/opt/apache-doris/{fe, be, cloud}
Expand All @@ -32,23 +41,35 @@ Use doris compose to create doris docker compose clusters.
if don't create cloud cluster, the image no need to contains the cloud pkg.


if build doris use `sh build.sh --fe --be --cloud`, then its output satisfy with all above, then run command in doris root
if build doris use `sh build.sh --fe --be --cloud`, then its output satisfy with all above, then run command in doris root directory
will generate such a image.

```
docker build -f docker/runtime/doris-compose/Dockerfile -t <image> .
```

will generate a image.

2. Install the dependent python library in 'docker/runtime/doris-compose/requirements.txt'

##### 3. Install the dependent python library in 'docker/runtime/doris-compose/requirements.txt'

```
python -m pip install --user -r docker/runtime/doris-compose/requirements.txt
```

## Usage

### Notice

Each cluster will have a directory in '/tmp/doris/{cluster-name}', user can set env LOCAL_DORIS_PATH to change its directory.

For example, if user export LOCAL_DORIS_PATH=/mydoris, then the cluster's directory is '/mydoris/{cluster-name}'.

And cluster's directory will contains all its containers's logs and data, like fe-1, fe-2, be-1, ..., etc.

If there are multiple users run doris-compose on the same machine, suggest don't change LOCAL_DORIS_PATH or they should export the same LOCAL_DORIS_PATH.

Because when create a new cluster, doris-compose will search the local doris path, and choose a docker network which is different with this path's clusters.

So if multiple users use different LOCAL_DORIS_PATH, their clusters may have docker network conflict!!!

### Create a cluster or recreate its containers

```
Expand All @@ -65,9 +86,11 @@ add fe/be nodes with the specific image, or update existing nodes with `--fe-id`


For create a cloud cluster, steps are as below:

1. Write cloud s3 store config file, its default path is '/tmp/doris/cloud.ini'.
It's defined in environment variable DORIS_CLOUD_CFG_FILE, user can change this env var to change its path.
A Example file is locate in 'docker/runtime/doris-compose/resource/cloud.ini.example'.

2. Use doris compose up command with option '--cloud' to create a new cloud cluster.

The simplest way to create a cloud cluster:
Expand Down Expand Up @@ -127,7 +150,7 @@ Generate regression-conf-custom.groovy to connect to the specific docker cluster

steps:

1. Create a new cluster: `python doris-compose.py up my-cluster my-image --add-fe-num 2 --add-be-num 4 --cloud`
2. Generate regression-conf-custom.groovy: `python doris-compose.py config my-cluster <doris-root-path> --connect-follow-fe`
1. Create a new cluster: `python docker/runtime/doris-compose/doris-compose.py up my-cluster my-image --add-fe-num 2 --add-be-num 4 --cloud`
2. Generate regression-conf-custom.groovy: `python docker/runtime/doris-compose/doris-compose.py config my-cluster <doris-root-path> --connect-follow-fe`
3. Run regression test: `bash run-regression-test.sh --run -times 1 -parallel 1 -suiteParallel 1 -d cloud/multi_cluster`

26 changes: 26 additions & 0 deletions docker/runtime/doris-compose/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,16 @@ def run(self, args):
print("\nNo write regression custom file.")
return

annotation_start = "//---------- Start auto generate by doris-compose.py---------"
annotation_end = "//---------- End auto generate by doris-compose.py---------"

old_contents = []
if os.path.exists(regression_conf_custom):
with open(regression_conf_custom, "r") as f:
old_contents = f.readlines()
with open(regression_conf_custom, "w") as f:
# write auto gen config
f.write(annotation_start)
f.write(base_conf.format(fe_ip=fe_ip))
if cluster.is_cloud:
multi_cluster_bes = ",".join([
Expand All @@ -845,6 +854,23 @@ def run(self, args):
multi_cluster_bes=multi_cluster_bes,
fe_cloud_unique_id=cluster.get_node(
CLUSTER.Node.TYPE_FE, 1).cloud_unique_id()))
f.write(annotation_end + "\n\n")
annotation_end_line_count = -1

# write not-auto gen config
in_annotation = False
annotation_end_line_idx = -100
for line_idx, line in enumerate(old_contents):
line = line.rstrip()
if line == annotation_start:
in_annotation = True
elif line == annotation_end:
in_annotation = False
annotation_end_line_idx = line_idx
elif not in_annotation:
if line or line_idx != annotation_end_line_idx + 1:
f.write(line + "\n")

print("\nWrite succ: " + regression_conf_custom)


Expand Down
2 changes: 1 addition & 1 deletion docker/runtime/doris-compose/resource/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export LOG_FILE=$DORIS_HOME/log/health.out
export LOCK_FILE=$DORIS_HOME/status/token

health_log() {
echo "$(date +'%Y-%m-%d %H:%M:%S') $@" >>$LOG_FILE
echo "$(date +'%Y-%m-%d %H:%M:%S') $@" | tee -a $LOG_FILE
}

# concurrent write meta service server will failed due to fdb txn conflict.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ class Config {
public InetSocketAddress recycleServiceHttpInetSocketAddress
public Integer parallel
public Integer suiteParallel
public Integer dockerSuiteParallel
public Integer actionParallel
public Integer times
public boolean withOutLoadData
Expand Down Expand Up @@ -467,6 +468,7 @@ class Config {
config.forceGenerateOutputFile = cmd.hasOption(forceGenOutOpt)
config.parallel = Integer.parseInt(cmd.getOptionValue(parallelOpt, "10"))
config.suiteParallel = Integer.parseInt(cmd.getOptionValue(suiteParallelOpt, "10"))
config.dockerSuiteParallel = Integer.parseInt(cmd.getOptionValue(dockerSuiteParallelOpt, "1"))
config.actionParallel = Integer.parseInt(cmd.getOptionValue(actionParallelOpt, "10"))
config.times = Integer.parseInt(cmd.getOptionValue(timesOpt, "1"))
config.randomOrder = cmd.hasOption(randomOrderOpt)
Expand Down Expand Up @@ -888,6 +890,11 @@ class Config {
log.info("Set suiteParallel to 1 because not specify.".toString())
}

if (config.dockerSuiteParallel == null) {
config.dockerSuiteParallel = 1
log.info("Set dockerSuiteParallel to 1 because not specify.".toString())
}

if (config.actionParallel == null) {
config.actionParallel = 10
log.info("Set actionParallel to 10 because not specify.".toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class ConfigOptions {
static Option forceGenOutOpt
static Option parallelOpt
static Option suiteParallelOpt
static Option dockerSuiteParallelOpt
static Option actionParallelOpt
static Option randomOrderOpt
static Option stopWhenFailOpt
Expand Down Expand Up @@ -425,6 +426,14 @@ class ConfigOptions {
.longOpt("suiteParallel")
.desc("the num of threads running for suites")
.build()
dockerSuiteParallelOpt = Option.builder("dockerSuiteParallel")
.argName("dockerSuiteParallel")
.required(false)
.hasArg(true)
.type(String.class)
.longOpt("dockerSuiteParallel")
.desc("the num of threads running for docker suites")
.build()
actionParallelOpt = Option.builder("actionParallel")
.argName("parallel")
.required(false)
Expand Down Expand Up @@ -607,6 +616,7 @@ class ConfigOptions {
.addOption(forceGenOutOpt)
.addOption(parallelOpt)
.addOption(suiteParallelOpt)
.addOption(dockerSuiteParallelOpt)
.addOption(actionParallelOpt)
.addOption(randomOrderOpt)
.addOption(stopWhenFailOpt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,22 @@ import java.util.function.Predicate
@CompileStatic
class RegressionTest {

static enum GroupExecType {
NORMAL,
SINGLE, // group contains nonConcurrent
DOCKER, // group contains docker
}

static ClassLoader classloader
static CompilerConfiguration compileConfig
static GroovyShell shell
static ExecutorService scriptExecutors
static ExecutorService suiteExecutors
static ExecutorService singleSuiteExecutors
static ExecutorService actionExecutors
static Map<GroupExecType, ExecutorService> suiteExecutors
static ThreadLocal<Integer> threadLoadedClassNum = new ThreadLocal<>()
static final int cleanLoadedClassesThreshold = 20
static String nonConcurrentTestGroup = "nonConcurrent"
static String dockerTestGroup = "docker"

static {
ch.qos.logback.classic.Logger loggerOfSuite =
Expand Down Expand Up @@ -113,8 +119,9 @@ class RegressionTest {
}
}
actionExecutors.shutdown()
suiteExecutors.shutdown()
singleSuiteExecutors.shutdown()
for (ExecutorService suiteExecutor : suiteExecutors.values()) {
suiteExecutor.shutdown()
}
scriptExecutors.shutdown()
log.info("Test finished")
if (!success) {
Expand All @@ -135,17 +142,24 @@ class RegressionTest {
.build();
scriptExecutors = Executors.newFixedThreadPool(config.parallel, scriptFactory)

suiteExecutors = [:]
BasicThreadFactory suiteFactory = new BasicThreadFactory.Builder()
.namingPattern("suite-thread-%d")
.priority(Thread.MAX_PRIORITY)
.build();
suiteExecutors = Executors.newFixedThreadPool(config.suiteParallel, suiteFactory)
suiteExecutors[GroupExecType.NORMAL] = Executors.newFixedThreadPool(config.suiteParallel, suiteFactory)

BasicThreadFactory singleSuiteFactory = new BasicThreadFactory.Builder()
.namingPattern("non-concurrent-thread-%d")
.priority(Thread.MAX_PRIORITY)
.build();
singleSuiteExecutors = Executors.newFixedThreadPool(1, singleSuiteFactory)
suiteExecutors[GroupExecType.SINGLE] = Executors.newFixedThreadPool(1, singleSuiteFactory)

BasicThreadFactory dockerSuiteFactory = new BasicThreadFactory.Builder()
.namingPattern("docker-suite-thread-%d")
.priority(Thread.MAX_PRIORITY)
.build();
suiteExecutors[GroupExecType.DOCKER] = Executors.newFixedThreadPool(config.dockerSuiteParallel, dockerSuiteFactory)

BasicThreadFactory actionFactory = new BasicThreadFactory.Builder()
.namingPattern("action-thread-%d")
Expand Down Expand Up @@ -198,9 +212,9 @@ class RegressionTest {
return sources
}

static void runScript(Config config, ScriptSource source, Recorder recorder, boolean isSingleThreadScript) {
static void runScript(Config config, ScriptSource source, Recorder recorder, GroupExecType grpExecType) {
def suiteFilter = { String suiteName, String groupName ->
canRun(config, suiteName, groupName, isSingleThreadScript)
canRun(config, suiteName, groupName, grpExecType)
}
def file = source.getFile()
int failureLimit = Integer.valueOf(config.otherConfigs.getOrDefault("max_failure_num", "-1").toString());
Expand All @@ -211,12 +225,7 @@ class RegressionTest {
return;
}
def eventListeners = getEventListeners(config, recorder)
ExecutorService executors = null
if (isSingleThreadScript) {
executors = singleSuiteExecutors
} else {
executors = suiteExecutors
}
ExecutorService executors = suiteExecutors[grpExecType]

new ScriptContext(file, executors, actionExecutors,
config, eventListeners, suiteFilter).start { scriptContext ->
Expand All @@ -242,11 +251,20 @@ class RegressionTest {
scriptSources.eachWithIndex { source, i ->
// log.info("Prepare scripts [${i + 1}/${totalFile}]".toString())
def future = scriptExecutors.submit {
runScript(config, source, recorder, false)
runScript(config, source, recorder, GroupExecType.NORMAL)
}
futures.add(future)
}

List<Future> dockerFutures = Lists.newArrayList()
scriptSources.eachWithIndex { source, i ->
// log.info("Prepare scripts [${i + 1}/${totalFile}]".toString())
def future = scriptExecutors.submit {
runScript(config, source, recorder, GroupExecType.DOCKER)
}
dockerFutures.add(future)
}

// wait all scripts
for (Future future : futures) {
try {
Expand All @@ -261,12 +279,20 @@ class RegressionTest {
scriptSources.eachWithIndex { source, i ->
// log.info("Prepare scripts [${i + 1}/${totalFile}]".toString())
def future = scriptExecutors.submit {
runScript(config, source, recorder, true)
runScript(config, source, recorder, GroupExecType.SINGLE)
}
futures.add(future)
}

// wait all scripts
for (Future future : dockerFutures) {
try {
future.get()
} catch (Throwable t) {
// do nothing, because already save to Recorder
}
}

for (Future future : futures) {
try {
future.get()
Expand Down Expand Up @@ -323,19 +349,19 @@ class RegressionTest {
return true
}

static boolean canRun(Config config, String suiteName, String group, boolean isSingleThreadScript) {
static boolean canRun(Config config, String suiteName, String group, GroupExecType grpExecType) {
return getGroupExecType(group) == grpExecType && filterGroups(config, group) && filterSuites(config, suiteName)
}

static GroupExecType getGroupExecType(String group) {
Set<String> suiteGroups = group.split(',').collect { g -> g.trim() }.toSet();
if (isSingleThreadScript) {
if (!suiteGroups.contains(nonConcurrentTestGroup)) {
return false
}
if (suiteGroups.contains(nonConcurrentTestGroup)) {
return GroupExecType.SINGLE
} else if (suiteGroups.contains(dockerTestGroup)) {
return GroupExecType.DOCKER
} else {
if (suiteGroups.contains(nonConcurrentTestGroup)) {
return false
}
return GroupExecType.NORMAL
}

return filterGroups(config, group) && filterSuites(config, suiteName)
}

static List<EventListener> getEventListeners(Config config, Recorder recorder) {
Expand Down Expand Up @@ -421,7 +447,7 @@ class RegressionTest {
}
pluginPath.eachFileRecurse({ it ->
if (it.name.endsWith(".groovy")) {
ScriptContext context = new ScriptContext(it, suiteExecutors, actionExecutors,
ScriptContext context = new ScriptContext(it, suiteExecutors[GroupExecType.NORMAL], actionExecutors,
config, [], { name -> true })
File pluginFile = it
context.start({
Expand Down Expand Up @@ -454,7 +480,7 @@ class RegressionTest {
+ "output: ${sout.toString()}, error: ${serr.toString()}")
}

def pipList = 'pip list'.execute().text
def pipList = 'python -m pip list'.execute().text
log.info("python library: ${pipList}")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import groovy.json.JsonSlurper
import com.google.common.collect.ImmutableList
import org.apache.commons.lang3.ObjectUtils
import org.apache.doris.regression.Config
import org.apache.doris.regression.RegressionTest
import org.apache.doris.regression.action.BenchmarkAction
import org.apache.doris.regression.action.ProfileAction
import org.apache.doris.regression.action.WaitForAction
Expand Down Expand Up @@ -276,6 +277,11 @@ class Suite implements GroovyInterceptable {
return
}

if (RegressionTest.getGroupExecType(group) != RegressionTest.GroupExecType.DOCKER) {
throw new Exception("Need to add 'docker' to docker suite's belong groups, "
+ "see example demo_p0/docker_action.groovy")
}

boolean pipelineIsCloud = isCloudMode()
boolean dockerIsCloud = false
if (options.cloudMode == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class ClusterOptions {
]

List<String> beConfigs = [
'max_sys_mem_available_low_water_mark_bytes=0', //no check mem available memory
'report_disk_state_interval_seconds=2',
'report_random_wait=false',
]
Expand Down
Loading

0 comments on commit 4238199

Please sign in to comment.