Skip to content

messaging addition #1195

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 9, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions README.txt
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,41 @@ Each of these directories was created with 'cvs checkout' command (with
appropriate arguments to get given branch) and will be treated by OpenGrok
as a project.

3.2 Messages
------------

Deployed OpenGrok can receive couple of messages through the active socket which
usually listens for the main configuration file. These are used in the web
application and displayed to the users. One can easily notify users about some
important events, for example that the reindex is being in progress and that
the searched information can be inconsistent.

The OpenGrok comes with a tool which allows you to send these messages without
any problem. It is called Messages and it is located under the tools directory.
See the file for usage and more information.

3.2.1 Tags
----------

Any message can use tags which makes it more specific for the application.
Messages which tag match some OpenGrok project are considered project specific
and the information contained in them are displayed only for the specific projects.

There is a key tag "main" which is exclusive for displaying
messages on the OpenGrok landing page - like a common information.

3.2.2 Types
-----------

Currently supported message types:
1) NormalMessage (normal)
This message is designed to display some information in the web application.
Use tags to target a specific project.
2) AbortMessage (abort)
This message can delete some already published information in
the web application.
Use tags to restrict the deletion only to specific projects.


4. OpenGrok install
-----------------
Expand Down
3 changes: 2 additions & 1 deletion platform/solaris/ips/create.sh
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ PKG pkgsend add file platform/solaris/smf/svc-opengrok mode=0555 owner=root grou
PKG pkgsend add file platform/solaris/smf/ogindexd mode=0555 owner=root group=bin path=/usr/opengrok/lib/ogindexd

PKG pkgsend add file OpenGrok mode=0555 owner=root group=bin path=/usr/opengrok/bin/OpenGrok
PKG pkgsend add file tools/Groups mode=0555 owner=root group=sys path=/usr/opengrok/bin/Groups
PKG pkgsend add file tools/Groups mode=0555 owner=root group=bin path=/usr/opengrok/bin/Groups
PKG pkgsend add file tools/Messages mode=0555 owner=root group=bin path=/usr/opengrok/bin/Messages

PKG pkgsend add file dist/opengrok.jar mode=0444 owner=root group=bin path=/usr/opengrok/lib/opengrok.jar

Expand Down
1 change: 1 addition & 0 deletions platform/solaris/pkgdef/prototype
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ f manifest var/svc/manifest/application/opengrok.xml=platform/solaris/smf/opengr
f none lib/svc/method/svc-opengrok=platform/solaris/smf/svc-opengrok 555 root bin
f none usr/opengrok/bin/OpenGrok=OpenGrok 0555 bin bin
f none usr/opengrok/bin/Groups=tools/Groups 0555 bin bin
f none usr/opengrok/bin/Groups=tools/Messages 0555 bin bin
f none usr/opengrok/doc/logging.properties=logging.properties 0444 root sys
f none usr/opengrok/doc/README.txt=README.txt 0444 root sys
f none usr/opengrok/doc/CHANGES.txt=CHANGES.txt 0444 root sys
Expand Down
239 changes: 220 additions & 19 deletions src/org/opensolaris/opengrok/configuration/RuntimeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,45 +50,44 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.Timer;
import java.util.TimerTask;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.MultiReader;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.opensolaris.opengrok.authorization.AuthorizationFramework;
import org.opensolaris.opengrok.configuration.messages.Message;
import org.opensolaris.opengrok.history.HistoryGuru;
import org.opensolaris.opengrok.history.RepositoryInfo;
import org.opensolaris.opengrok.index.Filter;
import org.opensolaris.opengrok.index.IgnoredNames;
import org.opensolaris.opengrok.index.IndexDatabase;
import org.opensolaris.opengrok.logger.LoggerFactory;
import org.opensolaris.opengrok.util.Executor;
import org.opensolaris.opengrok.util.IOUtils;
import org.opensolaris.opengrok.configuration.ThreadpoolSearcherFactory;

import static java.nio.file.FileVisitResult.CONTINUE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
import java.util.Collections;
import java.util.Iterator;
import java.util.SortedSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.MultiReader;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.opensolaris.opengrok.index.IndexDatabase;


/**
* The RuntimeEnvironment class is used as a placeholder for the current
Expand All @@ -108,6 +107,17 @@ public final class RuntimeEnvironment {
private final Map<Project, List<RepositoryInfo>> repository_map = new TreeMap<>();
private final Map<Project, Set<Group>> project_group_map = new TreeMap<>();
private final Map<String, SearcherManager> searcherManagerMap = new ConcurrentHashMap<>();

private static final String MESSAGES_MAIN_PAGE_TAG = "main";
/*
initial capacity - default 16
initial load factor - default 0.75f
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if they are public constants, then take them from there (just in case JDK will change defaults ... )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Impossible, they're private

initial concurrency level - number of concurrently updating threads (default 16)
- just two (the timer, configuration listener) so set it to small value
*/
private final ConcurrentMap<String, SortedSet<Message>> tagMessages = new ConcurrentHashMap<>(16, 0.75f, 5);
private static final int MESSAGE_LIMIT = 500;
private int messagesInTheSystem = 0;

/* Get thread pool used for top-level repository history generation. */
public static synchronized ExecutorService getHistoryExecutor() {
Expand Down Expand Up @@ -1210,6 +1220,167 @@ public void setConfiguration(Configuration configuration, List<String> subFileLi
public Configuration getConfiguration() {
return this.threadConfig.get();
}

private Timer expirationTimer;

private static SortedSet<Message> emptyMessageSet(SortedSet<Message> toRet) {
return toRet == null ? new TreeSet<>() : toRet;
}

/**
* Get the default set of messages for the main tag.
*
* @return set of messages
*/
public SortedSet<Message> getMessages() {
if (expirationTimer == null) {
expireMessages();
}
return emptyMessageSet(tagMessages.get(MESSAGES_MAIN_PAGE_TAG));
}

/**
* Get the set of messages for the arbitrary tag
*
* @param tag the message tag
* @return set of messages
*/
public SortedSet<Message> getMessages(String tag) {
if (expirationTimer == null) {
expireMessages();
}
return emptyMessageSet(tagMessages.get(tag));
}

/**
* Add a message to the application Also schedules a expirationTimer to
* remove this message after its expiration.
*
* @param m the message
*/
public void addMessage(Message m) {
if (!canAcceptMessage(m)) {
return;
}

if (expirationTimer == null) {
expireMessages();
}

boolean added = false;
for (String tag : m.getTags()) {
if (!tagMessages.containsKey(tag)) {
tagMessages.put(tag, new ConcurrentSkipListSet<>());
}
if (tagMessages.get(tag).add(m)) {
messagesInTheSystem++;
added = true;
}
}

if (added) {
if (expirationTimer != null) {
expirationTimer.schedule(new TimerTask() {
@Override
public void run() {
expireMessages();
}
}, new Date(m.getExpiration().getTime() + 10));
}
}
}

/**
* Immediately remove all messages in the application.
*/
public void removeAllMessages() {
tagMessages.clear();
messagesInTheSystem = 0;
}

/**
* Remove all messages containing at least on of the tags.
*
* @param tags set of tags
*/
public void removeAnyMessage(Set<String> tags) {
removeAnyMessage(new Predicate<Message>() {
@Override
public boolean test(Message t) {
return t.hasAny(tags);
}
});
}

/**
* Remove messages which have expired.
*/
private void expireMessages() {
removeAnyMessage(new Predicate<Message>() {
@Override
public boolean test(Message t) {
return t.isExpired();
}
});
}

/**
* Generic function to remove any message according to the result of the
* predicate.
*
* @param predicate the testing predicate
*/
private void removeAnyMessage(Predicate<Message> predicate) {
int size;
for (Map.Entry<String, SortedSet<Message>> set : tagMessages.entrySet()) {
size = set.getValue().size();
set.getValue().removeIf(predicate);
messagesInTheSystem -= size - set.getValue().size();
}

tagMessages.entrySet().removeIf(new Predicate<Map.Entry<String, SortedSet<Message>>>() {
@Override
public boolean test(Map.Entry<String, SortedSet<Message>> t) {
return t.getValue().isEmpty();
}
});
}

/**
* Test if the application can receive this messages.
*
* @param m the message
* @return true if it can
*/
public boolean canAcceptMessage(Message m) {
return messagesInTheSystem < getMessageLimit() && !m.isExpired();
}

/**
* Get the maximum number of messages in the application
*
* @return the number
*/
public int getMessageLimit() {
return MESSAGE_LIMIT;
}

/**
* Return number of messages present in the hash map.
*
* DISCLAIMER: This is not the real number of messages in the application
* because the same message is stored for all of the tags in the map. Also
* one can bypass the counter by not calling {@link #addMessage(Message)}
*
* @return number of messages
*/
public int getMessagesInTheSystem() {
if (expirationTimer == null) {
expireMessages();
}
return messagesInTheSystem;
}

private ServerSocket configServerSocket;

/**
Expand Down Expand Up @@ -1239,7 +1410,7 @@ public boolean startConfigurationListenerThread(SocketAddress endpoint) {
configurationListenerThread = new Thread(new Runnable() {
@Override
public void run() {
ByteArrayOutputStream bos = new ByteArrayOutputStream(1 << 13);
ByteArrayOutputStream bos = new ByteArrayOutputStream(1 << 15);
while (!sock.isClosed()) {
try (Socket s = sock.accept();
BufferedInputStream in = new BufferedInputStream(s.getInputStream())) {
Expand All @@ -1265,7 +1436,7 @@ public void run() {
((Configuration) obj).refreshDateForLastIndexRun();
setConfiguration((Configuration) obj);
LOGGER.log(Level.INFO, "Configuration updated: {0}",
configuration.getSourceRoot());
configuration.getSourceRoot());

// We are assuming that each update of configuration
// means reindex. If dedicated thread is introduced
Expand All @@ -1274,6 +1445,18 @@ public void run() {
// be moved there.
refreshSearcherManagerMap();
maybeRefreshIndexSearchers();
} else if (obj instanceof Message) {
Message m = ((Message) obj);
if (canAcceptMessage(m)) {
m.apply(RuntimeEnvironment.getInstance());
LOGGER.log(Level.FINER, "Message received: {0}",
m.getTags());
LOGGER.log(Level.FINER, "Messages in the system: {0}",
getMessagesInTheSystem());
} else {
LOGGER.log(Level.WARNING, "Message dropped: {0} - too many messages in the system",
m.getTags());
}
}
} catch (IOException e) {
LOGGER.log(Level.SEVERE, "Error reading config file: ", e);
Expand Down Expand Up @@ -1389,6 +1572,24 @@ public void stopWatchDogService() {
}
}

public void startExpirationTimer() {
if (expirationTimer != null) {
stopExpirationTimer();
}
expirationTimer = new Timer("expirationThread");
expireMessages();
}

/**
* Stops the watch dog service.
*/
public void stopExpirationTimer() {
if (expirationTimer != null) {
expirationTimer.cancel();
expirationTimer = null;
}
}

private Thread indexReopenThread;

public void maybeRefreshIndexSearchers() {
Expand Down
Loading