Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
Expand All @@ -40,9 +42,12 @@ public SingularityDeployHistoryPersister(
DeployManager deployManager,
HistoryManager historyManager,
SingularitySchedulerLock schedulerLock,
@Named(SingularityHistoryModule.PERSISTER_LOCK) ReentrantLock persisterLock
@Named(SingularityHistoryModule.PERSISTER_LOCK) ReentrantLock persisterLock,
@Named(
SingularityHistoryModule.LAST_DEPLOY_PERSISTER_SUCCESS
) AtomicLong lastPersisterSuccess
) {
super(configuration, persisterLock);
super(configuration, persisterLock, lastPersisterSuccess);
this.schedulerLock = schedulerLock;
this.deployManager = deployManager;
this.historyManager = historyManager;
Expand All @@ -52,6 +57,7 @@ public SingularityDeployHistoryPersister(
public void runActionOnPoll() {
LOG.info("Attempting to grab persister lock");
persisterLock.lock();
AtomicBoolean persisterSuccess = new AtomicBoolean(true);
try {
LOG.info("Acquired persister lock");
LOG.info("Checking inactive deploys for deploy history persistence");
Expand Down Expand Up @@ -123,6 +129,8 @@ public void runActionOnPoll() {
);
if (moveToHistoryOrCheckForPurge(deployHistory, i++)) {
numTransferred.increment();
} else {
persisterSuccess.getAndSet(false);
}
}
},
Expand All @@ -138,6 +146,13 @@ public void runActionOnPoll() {
JavaUtils.duration(start)
);
} finally {
if (persisterSuccess.get()) {
lastPersisterSuccess.set(System.currentTimeMillis());
LOG.info(
"Finished run on deploy history persister at {}",
lastPersisterSuccess.get()
);
}
persisterLock.unlock();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,21 @@
import com.google.inject.name.Named;
import com.hubspot.singularity.data.history.SingularityMappers.SingularityIdMapper;
import com.hubspot.singularity.data.history.SingularityMappers.SingularityJsonStringMapper;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.jdbi.v3.core.mapper.ColumnMapper;
import org.jdbi.v3.core.mapper.RowMapper;

public class SingularityHistoryModule extends AbstractModule {
public static final String PERSISTER_LOCK = "history.persister.lock";

public static final String LAST_TASK_PERSISTER_SUCCESS =
"last-task-history-persister-success";
public static final String LAST_REQUEST_PERSISTER_SUCCESS =
"last-request-history-persister-success";
public static final String LAST_DEPLOY_PERSISTER_SUCCESS =
"last-deploy-history-persister-success";

public SingularityHistoryModule() {}

@Override
Expand Down Expand Up @@ -83,4 +91,25 @@ public void configure() {
public ReentrantLock providePersisterLock() {
return new ReentrantLock();
}

@Provides
@Singleton
@Named(LAST_TASK_PERSISTER_SUCCESS)
public AtomicLong lastTaskPersisterSuccess() {
return new AtomicLong();
}

@Provides
@Singleton
@Named(LAST_REQUEST_PERSISTER_SUCCESS)
public AtomicLong lastRequestPersisterSuccess() {
return new AtomicLong();
}

@Provides
@Singleton
@Named(LAST_DEPLOY_PERSISTER_SUCCESS)
public AtomicLong lastDeployPersisterSuccess() {
return new AtomicLong();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.hubspot.singularity.scheduler.SingularityLeaderOnlyPoller;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -20,13 +21,17 @@ public abstract class SingularityHistoryPersister<T extends SingularityHistoryIt
protected final SingularityConfiguration configuration;
protected final ReentrantLock persisterLock;

protected final AtomicLong lastPersisterSuccess;

public SingularityHistoryPersister(
SingularityConfiguration configuration,
ReentrantLock persisterLock
ReentrantLock persisterLock,
AtomicLong lastPersisterSuccess
) {
super(configuration.getPersistHistoryEverySeconds(), TimeUnit.SECONDS);
this.configuration = configuration;
this.persisterLock = persisterLock;
this.lastPersisterSuccess = lastPersisterSuccess;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import javax.inject.Singleton;
import org.slf4j.Logger;
Expand All @@ -40,9 +42,12 @@ public SingularityRequestHistoryPersister(
RequestManager requestManager,
HistoryManager historyManager,
SingularitySchedulerLock lock,
@Named(SingularityHistoryModule.PERSISTER_LOCK) ReentrantLock persisterLock
@Named(SingularityHistoryModule.PERSISTER_LOCK) ReentrantLock persisterLock,
@Named(
SingularityHistoryModule.LAST_REQUEST_PERSISTER_SUCCESS
) AtomicLong lastPersisterSuccess
) {
super(configuration, persisterLock);
super(configuration, persisterLock, lastPersisterSuccess);
this.requestManager = requestManager;
this.historyManager = historyManager;
this.lock = lock;
Expand Down Expand Up @@ -130,6 +135,7 @@ public String toString() {
public void runActionOnPoll() {
LOG.info("Attempting to grab persister lock");
persisterLock.lock();
AtomicBoolean persisterSuccess = new AtomicBoolean(true);
try {
LOG.info("Checking request history for persistence");

Expand All @@ -156,6 +162,8 @@ public void runActionOnPoll() {
() -> {
if (moveToHistoryOrCheckForPurge(requestHistoryParent, i.getAndIncrement())) {
numHistoryTransferred.getAndAdd(requestHistoryParent.history.size());
} else {
persisterSuccess.getAndSet(false);
}
},
requestHistoryParent.requestId,
Expand All @@ -170,6 +178,14 @@ public void runActionOnPoll() {
JavaUtils.duration(start)
);
} finally {
if (persisterSuccess.get()) {
lastPersisterSuccess.set(System.currentTimeMillis());
LOG.info(
"Finished run on request history persister at {}",
lastPersisterSuccess.get()
);
}

persisterLock.unlock();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import javax.inject.Singleton;
Expand All @@ -41,9 +42,12 @@ public SingularityTaskHistoryPersister(
TaskManager taskManager,
DeployManager deployManager,
HistoryManager historyManager,
@Named(SingularityHistoryModule.PERSISTER_LOCK) ReentrantLock persisterLock
@Named(SingularityHistoryModule.PERSISTER_LOCK) ReentrantLock persisterLock,
@Named(
SingularityHistoryModule.LAST_TASK_PERSISTER_SUCCESS
) AtomicLong lastPersisterSuccess
) {
super(configuration, persisterLock);
super(configuration, persisterLock, lastPersisterSuccess);
this.taskManager = taskManager;
this.historyManager = historyManager;
this.deployManager = deployManager;
Expand All @@ -55,6 +59,7 @@ public SingularityTaskHistoryPersister(
public void runActionOnPoll() {
LOG.info("Attempting to grab persister lock");
persisterLock.lock();
boolean persisterSuccess = true;
try {
LOG.info("Checking inactive task ids for task history persistence");

Expand Down Expand Up @@ -86,6 +91,8 @@ public void runActionOnPoll() {
if (moveToHistoryOrCheckForPurge(taskId, forRequest)) {
LOG.debug("Transferred task {}", taskId);
transferred++;
} else {
persisterSuccess = false;
}

forRequest++;
Expand All @@ -101,6 +108,14 @@ public void runActionOnPoll() {
}
}
} finally {
if (persisterSuccess) {
lastPersisterSuccess.set(System.currentTimeMillis());
LOG.info(
"Finished run on task history persister at {}",
lastPersisterSuccess.get()
);
}

persisterLock.unlock();
}
}
Expand Down