Skip to content

Commit 9e73508

Browse files
author
James DeFelice
committed
revive offers upon successful task completion; allocate memory for executors to prevent abnormal executor death
1 parent 6957b12 commit 9e73508

File tree

1 file changed

+14
-1
lines changed

1 file changed

+14
-1
lines changed

examples/scheduler/main.go

+14-1
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,10 @@ import (
4141
)
4242

4343
const (
44+
CPUS_PER_EXECUTOR = 0.01
4445
CPUS_PER_TASK = 1
45-
MEM_PER_TASK = 128
46+
MEM_PER_EXECUTOR = 64
47+
MEM_PER_TASK = 64
4648
defaultArtifactPort = 12345
4749
)
4850

@@ -132,6 +134,12 @@ func (sched *ExampleScheduler) ResourceOffers(driver sched.SchedulerDriver, offe
132134
remainingCpus := cpus
133135
remainingMems := mems
134136

137+
// account for executor resources if there's not an executor already running on the slave
138+
if len(offer.ExecutorIds) == 0 {
139+
remainingCpus -= CPUS_PER_EXECUTOR
140+
remainingMems -= MEM_PER_EXECUTOR
141+
}
142+
135143
var tasks []*mesos.TaskInfo
136144
for sched.tasksLaunched < sched.totalTasks &&
137145
CPUS_PER_TASK <= remainingCpus &&
@@ -168,6 +176,7 @@ func (sched *ExampleScheduler) StatusUpdate(driver sched.SchedulerDriver, status
168176
log.Infoln("Status update: task", status.TaskId.GetValue(), " is in state ", status.State.Enum().String())
169177
if status.GetState() == mesos.TaskState_TASK_FINISHED {
170178
sched.tasksFinished++
179+
driver.ReviveOffers() // TODO(jdef) rate-limit this
171180
}
172181

173182
if sched.tasksFinished >= sched.totalTasks {
@@ -263,6 +272,10 @@ func prepareExecutorInfo() *mesos.ExecutorInfo {
263272
Value: proto.String(executorCommand),
264273
Uris: executorUris,
265274
},
275+
Resources: []*mesos.Resource{
276+
util.NewScalarResource("cpus", CPUS_PER_EXECUTOR),
277+
util.NewScalarResource("mem", MEM_PER_EXECUTOR),
278+
},
266279
}
267280
}
268281

0 commit comments

Comments
 (0)