Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/action manager priority queue #1526

Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/)

## [Unreleased]

### Added
- #1526 - Added a priority to the Action Manager and associated classes so that Actions can executed in order of priority.

### Changed
- #1523 - Added check to EnsureACEs to avoid duplicate path processing.

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* #%L
* ACS AEM Commons Bundle
* %%
* Copyright (C) 2018 Adobe
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/

package com.adobe.acs.commons.fam;

public class ActionManagerConstants {

private ActionManagerConstants() {
//no instances of constants class
}

public static final int DEFAULT_ACTION_PRIORITY = 0;

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
*/
@ProviderType
public interface ActionManagerFactory extends ActionManagerMBean {

/**
* Creates an ActionManager instead with the provided name and JCR context provided bu the resourceResolver.
* @param name the name of the ActionManager. This method guarantee uniqueness of the action manager name.
Expand All @@ -39,6 +40,17 @@ public interface ActionManagerFactory extends ActionManagerMBean {
*/
public ActionManager createTaskManager(String name, ResourceResolver resourceResolver, int saveInterval) throws LoginException;

/**
* Creates an ActionManager instead with the provided name and JCR context provided bu the resourceResolver.
* @param name the name of the ActionManager. This method guarantee uniqueness of the action manager name.
* @param resourceResolver the resourceResolver used to perform
* @param saveInterval the number of changed that must incur on the resourceResolver before commit() is called (in support of batch saves)
* @param priority the priority of execution for the tasks in this action manager
* @return the created ActionManager
* @throws LoginException
*/
public ActionManager createTaskManager(String name, ResourceResolver resourceResolver, int saveInterval, int priority) throws LoginException;

/**
* Gets the named ActionManager from the ActionManagerFactory.
* The name corresponds to the name provided in ActionManagerFactory.createTaskManager(..)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,34 @@ public interface ThrottledTaskRunner extends ThrottledTaskRunnerMBean {
/**
* Schedule some kind of work to run in the future using the internal thread pool.
* The work will be throttled according to the CPU/Memory settings
* @param work
* @param work
*/
void scheduleWork(Runnable work);

/**
* Schedule some kind of work to run in the future using the internal thread pool.
* The work will be throttled according to the CPU/Memory settings. This action can be canceled at any time.
* @param work
* @param work
* @param cancelHandler
*/
void scheduleWork(Runnable work, CancelHandler cancelHandler);

/**
* Schedule some kind of work to run in the future using the internal thread pool.
* The work will be throttled according to the CPU/Memory settings
* @param work
* @param priority the priority of the task
*/
void scheduleWork(Runnable work, int priority);

/**
* Schedule some kind of work to run in the future using the internal thread pool.
* The work will be throttled according to the CPU/Memory settings. This action can be canceled at any time.
* @param work
* @param cancelHandler
* @param priority the priority of the task
*/
void scheduleWork(Runnable work, CancelHandler cancelHandler, int priority);

/**
* Record statistics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package com.adobe.acs.commons.fam.impl;

import com.adobe.acs.commons.fam.ActionManager;
import com.adobe.acs.commons.fam.ActionManagerConstants;
import com.adobe.acs.commons.fam.ActionManagerFactory;
import com.adobe.acs.commons.fam.ThrottledTaskRunner;
import com.adobe.acs.commons.fam.mbean.ActionManagerMBean;
Expand All @@ -45,6 +46,7 @@
@Property(name = "jmx.objectname", value = "com.adobe.acs.commons:type=Action Manager")
public class ActionManagerFactoryImpl extends AnnotatedStandardMBean implements ActionManagerFactory {


@Reference
ThrottledTaskRunner taskRunner;

Expand All @@ -54,12 +56,17 @@ public ActionManagerFactoryImpl() throws NotCompliantMBeanException {
super(ActionManagerMBean.class);
tasks = Collections.synchronizedMap(new LinkedHashMap<>());
}

@Override
public ActionManager createTaskManager(String name, ResourceResolver resourceResolver, int saveInterval) throws LoginException {
return this.createTaskManager(name, resourceResolver, saveInterval, ActionManagerConstants.DEFAULT_ACTION_PRIORITY);
}

@Override
public ActionManager createTaskManager(String name, ResourceResolver resourceResolver, int saveInterval, int priroty) throws LoginException {
String fullName = String.format("%s (%s)", name, UUID.randomUUID().toString());

ActionManagerImpl manager = new ActionManagerImpl(fullName, taskRunner, resourceResolver, saveInterval);
ActionManagerImpl manager = new ActionManagerImpl(fullName, taskRunner, resourceResolver, saveInterval, priroty);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Misspelled priority

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is exactly the sort of high quality code review I expect.

Snark aside, will fix this, thanks for pointing it out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, it's open-source. You're getting so much more than you paid for already! ;P But seriously, thanks for the awesome PR!

tasks.put(fullName, manager);
return manager;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,14 @@
*/
package com.adobe.acs.commons.fam.impl;

import com.adobe.acs.commons.fam.ActionManager;
import com.adobe.acs.commons.fam.CancelHandler;
import com.adobe.acs.commons.fam.Failure;
import com.adobe.acs.commons.fam.ThrottledTaskRunner;
import com.adobe.acs.commons.fam.actions.Actions;
import com.adobe.acs.commons.functions.BiConsumer;
import com.adobe.acs.commons.functions.BiFunction;
import com.adobe.acs.commons.functions.CheckedBiConsumer;
import com.adobe.acs.commons.functions.CheckedBiFunction;
import com.adobe.acs.commons.functions.CheckedConsumer;
import com.adobe.acs.commons.functions.Consumer;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.ResourceResolver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import javax.jcr.NodeIterator;
import javax.jcr.RepositoryException;
Expand All @@ -49,14 +41,25 @@
import javax.management.openmbean.OpenType;
import javax.management.openmbean.SimpleType;
import javax.management.openmbean.TabularType;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import com.adobe.acs.commons.fam.ActionManagerConstants;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.ResourceResolver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.adobe.acs.commons.fam.ActionManager;
import com.adobe.acs.commons.fam.CancelHandler;
import com.adobe.acs.commons.fam.Failure;
import com.adobe.acs.commons.fam.ThrottledTaskRunner;
import com.adobe.acs.commons.fam.actions.Actions;
import com.adobe.acs.commons.functions.BiConsumer;
import com.adobe.acs.commons.functions.BiFunction;
import com.adobe.acs.commons.functions.CheckedBiConsumer;
import com.adobe.acs.commons.functions.CheckedBiFunction;
import com.adobe.acs.commons.functions.CheckedConsumer;
import com.adobe.acs.commons.functions.Consumer;

/**
* Manages a pool of reusable resource resolvers and injects them into tasks
Expand All @@ -81,6 +84,7 @@ class ActionManagerImpl extends CancelHandler implements ActionManager, Serializ
private final AtomicLong started = new AtomicLong(0);
private long finished;
private int saveInterval;
private int priority;

private final transient ResourceResolver baseResolver;
private final transient List<ReusableResolver> resolvers = Collections.synchronizedList(new ArrayList<>());
Expand All @@ -94,12 +98,17 @@ class ActionManagerImpl extends CancelHandler implements ActionManager, Serializ
private final transient List<Runnable> finishHandlers = Collections.synchronizedList(new ArrayList<>());

ActionManagerImpl(String name, ThrottledTaskRunner taskRunner, ResourceResolver resolver, int saveInterval) throws LoginException {
this(name, taskRunner, resolver, saveInterval, ActionManagerConstants.DEFAULT_ACTION_PRIORITY);
}

ActionManagerImpl(String name, ThrottledTaskRunner taskRunner, ResourceResolver resolver, int saveInterval, int priority) throws LoginException {
this.name = name;
this.taskRunner = taskRunner;
this.saveInterval = saveInterval;
baseResolver = resolver.clone(null);
currentPath = new ThreadLocal<>();
failures = new ArrayList<>();
this.priority = priority;
}

@Override
Expand Down Expand Up @@ -156,7 +165,7 @@ private void deferredWithResolver(
}
taskRunner.scheduleWork(() -> {
runActionAndLogErrors(action, closesResolver);
}, this);
}, this, priority);
}

@SuppressWarnings("squid:S1181")
Expand Down Expand Up @@ -341,7 +350,7 @@ private void performAutomaticCleanup() {
}
runCompletionTasks();
closeAllResolvers();
});
}, priority);
}
}

Expand Down Expand Up @@ -436,6 +445,7 @@ public CompositeData getStatistics() throws OpenDataException {
return new CompositeDataSupport(statsCompositeType, statsItemNames,
new Object[]{
name,
priority,
tasksAdded.get(),
tasksCompleted.get(),
tasksFilteredOut.get(),
Expand Down Expand Up @@ -487,13 +497,17 @@ public List<CompositeData> getFailures() throws OpenDataException {

static {
try {
statsItemNames = new String[]{"_taskName", "started", "completed", "filtered", "successful", "errors", "runtime"};
statsItemNames =
new String[] { "_taskName", "priority", "started", "completed", "filtered", "successful",
"errors", "runtime" };
statsCompositeType = new CompositeType(
"Statics Row",
"Single row of statistics",
statsItemNames,
new String[]{"Name", "Started", "Completed", "Filtered", "Successful", "Errors", "Runtime"},
new OpenType[]{SimpleType.STRING, SimpleType.INTEGER, SimpleType.INTEGER, SimpleType.INTEGER, SimpleType.INTEGER, SimpleType.INTEGER, SimpleType.LONG});
new String[] { "Name", "Priority", "Started", "Completed", "Filtered", "Successful",
"Errors", "Runtime" }, new OpenType[] { SimpleType.STRING, SimpleType.INTEGER,
SimpleType.INTEGER, SimpleType.INTEGER, SimpleType.INTEGER, SimpleType.INTEGER,
SimpleType.INTEGER, SimpleType.LONG });
statsTabularType = new TabularType("Statistics", "Collected statistics", statsCompositeType, new String[]{"_taskName"});

failureItemNames = new String[]{"_taskName", "_count", "item", "error"};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* #%L
* ACS AEM Commons Bundle
* %%
* Copyright (C) 2018 Adobe
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/

package com.adobe.acs.commons.fam.impl;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* A custom extension of the thread pool executor which uses our {@link TimedRunnableFuture} implementation.
* This ensures that deferred tasks still respect the priority and don't cause errors when those are cast to {@link Comparable}.
*/
public class PriorityThreadPoolExecutor extends ThreadPoolExecutor {

/**
* {@inheritDoc}
*/
public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}

/**
* {@inheritDoc}
*/
public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}

/**
* {@inheritDoc}
*/
public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}

/**
* {@inheritDoc}
*/
public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}

@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new TimedRunnableFuture(runnable, value);
}
}
Loading