Skip to content

Commit

Permalink
Audit START/END log events for CoordinatorEvent operations (#936)
Browse files Browse the repository at this point in the history
  • Loading branch information
ehoner authored May 30, 2023
1 parent 7199427 commit c449757
Showing 1 changed file with 10 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1121,6 +1121,7 @@ protected synchronized void handleEvent(CoordinatorEvent event) {
boolean isLeader = _adapter.isLeader();
if (!isLeader && isLeaderEvent(event.getType())) {
_log.info("Skipping event {} isLeader: false", event.getType());
_log.info("END: Handle event " + event);
return;
}
try {
Expand Down Expand Up @@ -1249,6 +1250,7 @@ private boolean isDeletingOrExpired(Datastream stream) {
*/
private void handleDatastreamAddOrDelete() {
boolean shouldRetry = false;
_log.info("START: Coordinator::handleDatastreamAddOrDelete.");

// Get the list of all datastreams
List<Datastream> allStreams = _datastreamCache.getAllDatastreams(true);
Expand All @@ -1259,6 +1261,7 @@ private void handleDatastreamAddOrDelete() {
// do nothing if there are zero datastreams
if (allStreams.isEmpty()) {
_log.warn("Received a new datastream event, but there were no datastreams");
_log.info("END: Coordinator::handleDatastreamAddOrDelete.");
return;
}

Expand Down Expand Up @@ -1307,6 +1310,7 @@ private void handleDatastreamAddOrDelete() {
}

_eventQueue.put(CoordinatorEvent.createLeaderDoAssignmentEvent(false));
_log.info("END: Coordinator::handleDatastreamAddOrDelete.");
}

private void hardDeleteDatastream(Datastream ds, List<Datastream> activeStreams) {
Expand Down Expand Up @@ -1435,6 +1439,7 @@ && hasValidDestination(datastream) && !isDeletingOrExpired(datastream))
*/
private void handleLeaderDoAssignment(boolean isNewlyElectedLeader) {
boolean succeeded = true;
_log.info("START: Coordinator::handleLeaderDoAssignment.");
List<String> liveInstances = Collections.emptyList();
Map<String, Set<DatastreamTask>> previousAssignmentByInstance = Collections.emptyMap();
Map<String, List<DatastreamTask>> newAssignmentsByInstance = Collections.emptyMap();
Expand Down Expand Up @@ -1503,6 +1508,7 @@ private void handleLeaderDoAssignment(boolean isNewlyElectedLeader) {
if (!succeeded && !_leaderDoAssignmentScheduled.get()) {
scheduleLeaderDoAssignmentRetry(isNewlyElectedLeader);
}
_log.info("END: Coordinator::handleLeaderDoAssignment.");
}

private void scheduleLeaderDoAssignmentRetry(boolean isNewlyElectedLeader) {
Expand Down Expand Up @@ -1636,6 +1642,7 @@ private void performPreAssignmentCleanup(List<DatastreamGroup> datastreamGroups)
*/
private void performPartitionAssignment(String datastreamGroupName) {
boolean succeeded;
_log.info("START: Coordinator::performPartitionAssignment.");
Map<String, Set<DatastreamTask>> previousAssignmentByInstance = new HashMap<>();
Map<String, List<DatastreamTask>> newAssignmentsByInstance = new HashMap<>();

Expand Down Expand Up @@ -1698,6 +1705,7 @@ private void performPartitionAssignment(String datastreamGroupName) {
_eventQueue.put(CoordinatorEvent.createLeaderPartitionAssignmentEvent(datastreamGroupName));
}, _config.getRetryIntervalMs(), TimeUnit.MILLISECONDS);
}
_log.info("END: Coordinator::performPartitionAssignment.");
}

private void updateCounterForMaxPartitionInTask(Map<String, List<DatastreamTask>> assignments) {
Expand Down Expand Up @@ -1731,6 +1739,7 @@ private void onDatastreamChange(List<DatastreamGroup> datastreamGroups) {
* @param notifyTimestamp the timestamp when partition movement is triggered
*/
private void performPartitionMovement(Long notifyTimestamp) {
_log.info("START: Coordinator::performPartitionMovement.");
boolean shouldRetry = true;
Map<String, Set<DatastreamTask>> previousAssignmentByInstance = _adapter.getAllAssignedDatastreamTasks();
Map<String, List<DatastreamTask>> newAssignmentsByInstance = new HashMap<>();
Expand Down Expand Up @@ -1816,6 +1825,7 @@ private void performPartitionMovement(Long notifyTimestamp) {
_eventQueue.put(CoordinatorEvent.createPartitionMovementEvent(notifyTimestamp)), _config.getRetryIntervalMs(),
TimeUnit.MILLISECONDS);
}
_log.info("END: Coordinator::performPartitionMovement.");
}

@VisibleForTesting
Expand Down

0 comments on commit c449757

Please sign in to comment.