Skip to content

Commit

Permalink
[MONDRIAN-1998] Followup to the previous commits of this case. It was…
Browse files Browse the repository at this point in the history
… later found that the problem isn't about whether we delete the index from the registry or not at the end of the life of a schema, but rather a cache miss. It is perfectly fine to base the key of the registry off the schema's checksum, as it is also preferable. When we bring a schema offline, we already clean all the finished segments and also use the deletion flag to do a graceful recovery for the segments that were still loading at the time. Deleting the index prevents this graceful shutdown and causes the user thread to hang, as we sometimes delete the index from the registry before all of the queries are terminated. By the time they finish, the star was re-created and doesn't have the same object ident anymore, so we can't notify the user threads that the query has finished.
  • Loading branch information
lucboudreau committed May 27, 2014
1 parent 365debd commit 378a968
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 42 deletions.
7 changes: 0 additions & 7 deletions src/main/mondrian/rolap/RolapSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -345,13 +345,6 @@ protected void finalCleanUp() {
// Cleanup the segment data.
flushSegments();

// De-register our indexes of segments
for (Cube cube : getCubes()) {
MondrianServer.forConnection(internalConnection)
.getAggregationManager().cacheMgr.getIndexRegistry()
.clearIndex(((RolapCube)cube).getStar());
}

// Cleanup the agg JDBC cache
flushJdbcSchema();
}
Expand Down
53 changes: 27 additions & 26 deletions src/main/mondrian/rolap/agg/SegmentCacheManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

import java.io.PrintWriter;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.*;

/**
Expand Down Expand Up @@ -1515,26 +1514,39 @@ public PeekResponse(
/**
* Registry of all the indexes that were created for this
* cache manager, per {@link RolapStar}.
*
* The index is based off the checksum of the schema.
*/
public class SegmentCacheIndexRegistry {
private final Map<RolapStar, SegmentCacheIndex> indexes =
new WeakHashMap<RolapStar, SegmentCacheIndex>();
/**
* Removes a star from the registry.
*/
public void clearIndex(RolapStar star) {
indexes.remove(star);
}
private final Map<ByteString, SegmentCacheIndex> indexes =
Collections.synchronizedMap(
new HashMap<ByteString, SegmentCacheIndex>());

/**
* Returns the {@link SegmentCacheIndex} for a given
* {@link RolapStar}.
*/
public SegmentCacheIndex getIndex(RolapStar star) {
if (!indexes.containsKey(star)) {
indexes.put(star, new SegmentCacheIndexImpl(thread));
LOGGER.trace(
"SegmentCacheManager.SegmentCacheIndexRegistry.getIndex:"
+ System.identityHashCode(star));

if (!indexes.containsKey(star.getSchema().getChecksum())) {
final SegmentCacheIndexImpl index =
new SegmentCacheIndexImpl(thread);
LOGGER.trace(
"SegmentCacheManager.SegmentCacheIndexRegistry.getIndex:"
+ "Creating New Index "
+ System.identityHashCode(index));
indexes.put(star.getSchema().getChecksum(), index);
}
return indexes.get(star);
final SegmentCacheIndex index =
indexes.get(star.getSchema().getChecksum());
LOGGER.trace(
"SegmentCacheManager.SegmentCacheIndexRegistry.getIndex:"
+ "Returning Index "
+ System.identityHashCode(index));
return index;
}

/**
Expand All @@ -1546,21 +1558,10 @@ private SegmentCacheIndex getIndex(
{
// First we check the indexes that already exist.
// This is fast.
for (Entry<RolapStar, SegmentCacheIndex> entry
: indexes.entrySet())
{
final String factTableName =
entry.getKey().getFactTable().getTableName();
final ByteString schemaChecksum =
entry.getKey().getSchema().getChecksum();
if (!factTableName.equals(header.rolapStarFactTableName)) {
continue;
}
if (!schemaChecksum.equals(header.schemaChecksum)) {
continue;
}
return entry.getValue();
if (indexes.containsKey(header.schemaChecksum)) {
return indexes.get(header.schemaChecksum);
}

// The index doesn't exist. Let's create it.
final RolapStar star = getStar(header);
if (star == null) {
Expand Down
47 changes: 38 additions & 9 deletions src/main/mondrian/rolap/cache/SegmentCacheIndexImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.sql.Statement;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;

Expand Down Expand Up @@ -114,7 +115,9 @@ public List<SegmentHeader> locate(

if (LOGGER.isTraceEnabled()) {
LOGGER.trace(
"SegmentCacheIndexImpl.locate:"
"SegmentCacheIndexImpl("
+ System.identityHashCode(this)
+ ")locate:"
+ "\nschemaName:" + schemaName
+ "\nschemaChecksum:" + schemaChecksum
+ "\ncubeName:" + cubeName
Expand All @@ -137,7 +140,10 @@ public List<SegmentHeader> locate(
compoundPredicates);
final List<SegmentHeader> headerList = bitkeyMap.get(starKey);
if (headerList == null) {
LOGGER.trace("SegmentCacheIndexImpl.locate:NOMATCH");
LOGGER.trace(
"SegmentCacheIndexImpl("
+ System.identityHashCode(this)
+ ").locate:NOMATCH");
return Collections.emptyList();
}
for (SegmentHeader header : headerList) {
Expand All @@ -152,7 +158,10 @@ public List<SegmentHeader> locate(
}
if (LOGGER.isTraceEnabled()) {
final StringBuilder sb =
new StringBuilder("SegmentCacheIndexImpl.locate:MATCH");
new StringBuilder(
"SegmentCacheIndexImpl("
+ System.identityHashCode(this)
+ ").locate:MATCH");
for (SegmentHeader header : list) {
sb.append("\n");
sb.append(header.toString());
Expand All @@ -170,7 +179,9 @@ public void add(
checkThread();

LOGGER.debug(
"SegmentCacheIndexImpl.add:\n"
"SegmentCacheIndexImpl("
+ System.identityHashCode(this)
+ ").add:\n"
+ header.toString());

HeaderInfo headerInfo = headerMap.get(header);
Expand Down Expand Up @@ -229,6 +240,11 @@ public void update(
{
checkThread();

LOGGER.trace(
"SegmentCacheIndexImpl.update: Updating header from:\n"
+ oldHeader.toString()
+ "\n\nto\n\n"
+ newHeader.toString());
final HeaderInfo headerInfo = headerMap.get(oldHeader);
headerMap.remove(oldHeader);
headerMap.put(newHeader, headerInfo);
Expand Down Expand Up @@ -286,21 +302,34 @@ public void loadFailed(SegmentHeader header, Throwable throwable) {
public void remove(SegmentHeader header) {
checkThread();

LOGGER.debug(
"SegmentCacheIndexImpl.remove:\n"
+ header.toString());
if (LOGGER.isTraceEnabled()) {
LOGGER.trace(
"SegmentCacheIndexImpl("
+ System.identityHashCode(this)
+ ").remove:\n"
+ header.toString(),
new Throwable("Removal."));
} else {
LOGGER.debug(
"SegmentCacheIndexImpl.remove:\n"
+ header.toString());
}

final HeaderInfo headerInfo = headerMap.get(header);
if (headerInfo == null) {
LOGGER.debug(
"SegmentCacheIndexImpl.remove:UNKNOWN HEADER");
"SegmentCacheIndexImpl("
+ System.identityHashCode(this)
+ ").remove:UNKNOWN HEADER");
return;
}
if (headerInfo.slot != null && !headerInfo.slot.isDone()) {
// Cannot remove while load is pending; flag for removal after load
headerInfo.removeAfterLoad = true;
LOGGER.debug(
"SegmentCacheIndexImpl.remove:DEFFERED");
"SegmentCacheIndexImpl("
+ System.identityHashCode(this)
+ ").remove:DEFFERED");
return;
}

Expand Down

0 comments on commit 378a968

Please sign in to comment.