50
50
import java .util .List ;
51
51
import java .util .Map ;
52
52
import java .util .Set ;
53
+ import java .util .SortedSet ;
54
+ import java .util .Timer ;
55
+ import java .util .TimerTask ;
53
56
import java .util .TreeMap ;
54
57
import java .util .TreeSet ;
58
+ import java .util .concurrent .ConcurrentHashMap ;
59
+ import java .util .concurrent .ConcurrentMap ;
60
+ import java .util .concurrent .ConcurrentSkipListSet ;
55
61
import java .util .concurrent .ExecutorService ;
56
62
import java .util .concurrent .Executors ;
57
63
import java .util .concurrent .ThreadFactory ;
58
64
import java .util .concurrent .TimeUnit ;
59
65
import java .util .function .Predicate ;
60
66
import java .util .logging .Level ;
61
67
import java .util .logging .Logger ;
68
+ import java .util .stream .Collectors ;
69
+ import org .apache .lucene .index .IndexReader ;
70
+ import org .apache .lucene .index .MultiReader ;
71
+ import org .apache .lucene .search .IndexSearcher ;
72
+ import org .apache .lucene .search .SearcherManager ;
73
+ import org .apache .lucene .store .AlreadyClosedException ;
74
+ import org .apache .lucene .store .Directory ;
75
+ import org .apache .lucene .store .FSDirectory ;
62
76
import org .opensolaris .opengrok .authorization .AuthorizationFramework ;
77
+ import org .opensolaris .opengrok .configuration .messages .Message ;
63
78
import org .opensolaris .opengrok .history .HistoryGuru ;
64
79
import org .opensolaris .opengrok .history .RepositoryInfo ;
65
80
import org .opensolaris .opengrok .index .Filter ;
66
81
import org .opensolaris .opengrok .index .IgnoredNames ;
82
+ import org .opensolaris .opengrok .index .IndexDatabase ;
67
83
import org .opensolaris .opengrok .logger .LoggerFactory ;
68
84
import org .opensolaris .opengrok .util .Executor ;
69
85
import org .opensolaris .opengrok .util .IOUtils ;
70
- import org .opensolaris .opengrok .configuration .ThreadpoolSearcherFactory ;
71
86
72
87
import static java .nio .file .FileVisitResult .CONTINUE ;
73
88
import static java .nio .file .StandardWatchEventKinds .ENTRY_CREATE ;
74
89
import static java .nio .file .StandardWatchEventKinds .ENTRY_DELETE ;
75
90
import static java .nio .file .StandardWatchEventKinds .ENTRY_MODIFY ;
76
- import java .util .Collections ;
77
- import java .util .Iterator ;
78
- import java .util .SortedSet ;
79
- import java .util .concurrent .ConcurrentHashMap ;
80
- import java .util .stream .Collectors ;
81
- import org .apache .lucene .index .DirectoryReader ;
82
- import org .apache .lucene .index .IndexReader ;
83
- import org .apache .lucene .index .MultiReader ;
84
- import org .apache .lucene .search .IndexSearcher ;
85
- import org .apache .lucene .search .SearcherFactory ;
86
- import org .apache .lucene .search .SearcherManager ;
87
- import org .apache .lucene .store .AlreadyClosedException ;
88
- import org .apache .lucene .store .Directory ;
89
- import org .apache .lucene .store .FSDirectory ;
90
- import org .opensolaris .opengrok .index .IndexDatabase ;
91
-
92
91
93
92
/**
94
93
* The RuntimeEnvironment class is used as a placeholder for the current
@@ -108,6 +107,17 @@ public final class RuntimeEnvironment {
108
107
private final Map <Project , List <RepositoryInfo >> repository_map = new TreeMap <>();
109
108
private final Map <Project , Set <Group >> project_group_map = new TreeMap <>();
110
109
private final Map <String , SearcherManager > searcherManagerMap = new ConcurrentHashMap <>();
110
+
111
+ private static final String MESSAGES_MAIN_PAGE_TAG = "main" ;
112
+ /*
113
+ initial capacity - default 16
114
+ initial load factor - default 0.75f
115
+ initial concurrency level - number of concurrently updating threads (default 16)
116
+ - just two (the timer, configuration listener) so set it to small value
117
+ */
118
+ private final ConcurrentMap <String , SortedSet <Message >> tagMessages = new ConcurrentHashMap <>(16 , 0.75f , 5 );
119
+ private static final int MESSAGE_LIMIT = 500 ;
120
+ private int messagesInTheSystem = 0 ;
111
121
112
122
/* Get thread pool used for top-level repository history generation. */
113
123
public static synchronized ExecutorService getHistoryExecutor () {
@@ -1210,6 +1220,167 @@ public void setConfiguration(Configuration configuration, List<String> subFileLi
1210
1220
public Configuration getConfiguration () {
1211
1221
return this .threadConfig .get ();
1212
1222
}
1223
+
1224
+ private Timer expirationTimer ;
1225
+
1226
+ private static SortedSet <Message > emptyMessageSet (SortedSet <Message > toRet ) {
1227
+ return toRet == null ? new TreeSet <>() : toRet ;
1228
+ }
1229
+
1230
+ /**
1231
+ * Get the default set of messages for the main tag.
1232
+ *
1233
+ * @return set of messages
1234
+ */
1235
+ public SortedSet <Message > getMessages () {
1236
+ if (expirationTimer == null ) {
1237
+ expireMessages ();
1238
+ }
1239
+ return emptyMessageSet (tagMessages .get (MESSAGES_MAIN_PAGE_TAG ));
1240
+ }
1241
+
1242
+ /**
1243
+ * Get the set of messages for the arbitrary tag
1244
+ *
1245
+ * @param tag the message tag
1246
+ * @return set of messages
1247
+ */
1248
+ public SortedSet <Message > getMessages (String tag ) {
1249
+ if (expirationTimer == null ) {
1250
+ expireMessages ();
1251
+ }
1252
+ return emptyMessageSet (tagMessages .get (tag ));
1253
+ }
1254
+
1255
+ /**
1256
+ * Add a message to the application Also schedules a expirationTimer to
1257
+ * remove this message after its expiration.
1258
+ *
1259
+ * @param m the message
1260
+ */
1261
+ public void addMessage (Message m ) {
1262
+ if (!canAcceptMessage (m )) {
1263
+ return ;
1264
+ }
1265
+
1266
+ if (expirationTimer == null ) {
1267
+ expireMessages ();
1268
+ }
1269
+
1270
+ boolean added = false ;
1271
+ for (String tag : m .getTags ()) {
1272
+ if (!tagMessages .containsKey (tag )) {
1273
+ tagMessages .put (tag , new ConcurrentSkipListSet <>());
1274
+ }
1275
+ if (tagMessages .get (tag ).add (m )) {
1276
+ messagesInTheSystem ++;
1277
+ added = true ;
1278
+ }
1279
+ }
1280
+
1281
+ if (added ) {
1282
+ if (expirationTimer != null ) {
1283
+ expirationTimer .schedule (new TimerTask () {
1284
+ @ Override
1285
+ public void run () {
1286
+ expireMessages ();
1287
+ }
1288
+ }, new Date (m .getExpiration ().getTime () + 10 ));
1289
+ }
1290
+ }
1291
+ }
1292
+
1293
+ /**
1294
+ * Immediately remove all messages in the application.
1295
+ */
1296
+ public void removeAllMessages () {
1297
+ tagMessages .clear ();
1298
+ messagesInTheSystem = 0 ;
1299
+ }
1300
+
1301
+ /**
1302
+ * Remove all messages containing at least on of the tags.
1303
+ *
1304
+ * @param tags set of tags
1305
+ */
1306
+ public void removeAnyMessage (Set <String > tags ) {
1307
+ removeAnyMessage (new Predicate <Message >() {
1308
+ @ Override
1309
+ public boolean test (Message t ) {
1310
+ return t .hasAny (tags );
1311
+ }
1312
+ });
1313
+ }
1314
+
1315
+ /**
1316
+ * Remove messages which have expired.
1317
+ */
1318
+ private void expireMessages () {
1319
+ removeAnyMessage (new Predicate <Message >() {
1320
+ @ Override
1321
+ public boolean test (Message t ) {
1322
+ return t .isExpired ();
1323
+ }
1324
+ });
1325
+ }
1326
+
1327
+ /**
1328
+ * Generic function to remove any message according to the result of the
1329
+ * predicate.
1330
+ *
1331
+ * @param predicate the testing predicate
1332
+ */
1333
+ private void removeAnyMessage (Predicate <Message > predicate ) {
1334
+ int size ;
1335
+ for (Map .Entry <String , SortedSet <Message >> set : tagMessages .entrySet ()) {
1336
+ size = set .getValue ().size ();
1337
+ set .getValue ().removeIf (predicate );
1338
+ messagesInTheSystem -= size - set .getValue ().size ();
1339
+ }
1340
+
1341
+ tagMessages .entrySet ().removeIf (new Predicate <Map .Entry <String , SortedSet <Message >>>() {
1342
+ @ Override
1343
+ public boolean test (Map .Entry <String , SortedSet <Message >> t ) {
1344
+ return t .getValue ().isEmpty ();
1345
+ }
1346
+ });
1347
+ }
1348
+
1349
+ /**
1350
+ * Test if the application can receive this messages.
1351
+ *
1352
+ * @param m the message
1353
+ * @return true if it can
1354
+ */
1355
+ public boolean canAcceptMessage (Message m ) {
1356
+ return messagesInTheSystem < getMessageLimit () && !m .isExpired ();
1357
+ }
1358
+
1359
+ /**
1360
+ * Get the maximum number of messages in the application
1361
+ *
1362
+ * @return the number
1363
+ */
1364
+ public int getMessageLimit () {
1365
+ return MESSAGE_LIMIT ;
1366
+ }
1367
+
1368
+ /**
1369
+ * Return number of messages present in the hash map.
1370
+ *
1371
+ * DISCLAIMER: This is not the real number of messages in the application
1372
+ * because the same message is stored for all of the tags in the map. Also
1373
+ * one can bypass the counter by not calling {@link #addMessage(Message)}
1374
+ *
1375
+ * @return number of messages
1376
+ */
1377
+ public int getMessagesInTheSystem () {
1378
+ if (expirationTimer == null ) {
1379
+ expireMessages ();
1380
+ }
1381
+ return messagesInTheSystem ;
1382
+ }
1383
+
1213
1384
private ServerSocket configServerSocket ;
1214
1385
1215
1386
/**
@@ -1239,7 +1410,7 @@ public boolean startConfigurationListenerThread(SocketAddress endpoint) {
1239
1410
configurationListenerThread = new Thread (new Runnable () {
1240
1411
@ Override
1241
1412
public void run () {
1242
- ByteArrayOutputStream bos = new ByteArrayOutputStream (1 << 13 );
1413
+ ByteArrayOutputStream bos = new ByteArrayOutputStream (1 << 15 );
1243
1414
while (!sock .isClosed ()) {
1244
1415
try (Socket s = sock .accept ();
1245
1416
BufferedInputStream in = new BufferedInputStream (s .getInputStream ())) {
@@ -1265,7 +1436,7 @@ public void run() {
1265
1436
((Configuration ) obj ).refreshDateForLastIndexRun ();
1266
1437
setConfiguration ((Configuration ) obj );
1267
1438
LOGGER .log (Level .INFO , "Configuration updated: {0}" ,
1268
- configuration .getSourceRoot ());
1439
+ configuration .getSourceRoot ());
1269
1440
1270
1441
// We are assuming that each update of configuration
1271
1442
// means reindex. If dedicated thread is introduced
@@ -1274,6 +1445,18 @@ public void run() {
1274
1445
// be moved there.
1275
1446
refreshSearcherManagerMap ();
1276
1447
maybeRefreshIndexSearchers ();
1448
+ } else if (obj instanceof Message ) {
1449
+ Message m = ((Message ) obj );
1450
+ if (canAcceptMessage (m )) {
1451
+ m .apply (RuntimeEnvironment .getInstance ());
1452
+ LOGGER .log (Level .FINER , "Message received: {0}" ,
1453
+ m .getTags ());
1454
+ LOGGER .log (Level .FINER , "Messages in the system: {0}" ,
1455
+ getMessagesInTheSystem ());
1456
+ } else {
1457
+ LOGGER .log (Level .WARNING , "Message dropped: {0} - too many messages in the system" ,
1458
+ m .getTags ());
1459
+ }
1277
1460
}
1278
1461
} catch (IOException e ) {
1279
1462
LOGGER .log (Level .SEVERE , "Error reading config file: " , e );
@@ -1389,6 +1572,24 @@ public void stopWatchDogService() {
1389
1572
}
1390
1573
}
1391
1574
1575
+ public void startExpirationTimer () {
1576
+ if (expirationTimer != null ) {
1577
+ stopExpirationTimer ();
1578
+ }
1579
+ expirationTimer = new Timer ("expirationThread" );
1580
+ expireMessages ();
1581
+ }
1582
+
1583
+ /**
1584
+ * Stops the watch dog service.
1585
+ */
1586
+ public void stopExpirationTimer () {
1587
+ if (expirationTimer != null ) {
1588
+ expirationTimer .cancel ();
1589
+ expirationTimer = null ;
1590
+ }
1591
+ }
1592
+
1392
1593
private Thread indexReopenThread ;
1393
1594
1394
1595
public void maybeRefreshIndexSearchers () {
0 commit comments