Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.concurrent.GuardedBy;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
Expand Down Expand Up @@ -75,16 +72,10 @@ public static DimensionTableDataManager getInstanceByTableName(String tableNameW
return INSTANCES.get(tableNameWithType);
}

/**
* Instance properties/methods
*/
private final ReadWriteLock _rwl = new ReentrantReadWriteLock();
private final Lock _lookupTableReadLock = _rwl.readLock();
private final Lock _lookupTableWriteLock = _rwl.writeLock();

// _lookupTable is a HashMap used for storing/serving records for a table keyed by table PK
@GuardedBy("_rwl")
private final Map<PrimaryKey, GenericRow> _lookupTable = new HashMap<>();
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<DimensionTableDataManager, Map> UPDATER =
AtomicReferenceFieldUpdater.newUpdater(DimensionTableDataManager.class, Map.class, "_lookupTable");
private volatile Map<PrimaryKey, GenericRow> _lookupTable = new HashMap<>();
private Schema _tableSchema;
private List<String> _primaryKeyColumns;

Expand Down Expand Up @@ -127,42 +118,38 @@ public void removeSegment(String segmentName) {
*/
private void loadLookupTable()
throws Exception {
_lookupTableWriteLock.lock();
try {
_lookupTable.clear();
List<SegmentDataManager> segmentManagers = acquireAllSegments();
if (segmentManagers.isEmpty()) {
return;
}
Map<PrimaryKey, GenericRow> snapshot;
Map<PrimaryKey, GenericRow> replacement;
do {
snapshot = _lookupTable;
replacement = new HashMap<>(snapshot.size());
populate(replacement);
} while (!UPDATER.compareAndSet(this, snapshot, replacement));
}

try {
for (SegmentDataManager segmentManager : segmentManagers) {
IndexSegment indexSegment = segmentManager.getSegment();
try (PinotSegmentRecordReader reader = new PinotSegmentRecordReader(
indexSegment.getSegmentMetadata().getIndexDir())) {
while (reader.hasNext()) {
GenericRow row = reader.next();
_lookupTable.put(row.getPrimaryKey(_primaryKeyColumns), row);
}
private void populate(Map<PrimaryKey, GenericRow> map)
throws Exception {
List<SegmentDataManager> segmentManagers = acquireAllSegments();
try {
for (SegmentDataManager segmentManager : segmentManagers) {
IndexSegment indexSegment = segmentManager.getSegment();
try (PinotSegmentRecordReader reader = new PinotSegmentRecordReader(
indexSegment.getSegmentMetadata().getIndexDir())) {
while (reader.hasNext()) {
GenericRow row = reader.next();
map.put(row.getPrimaryKey(_primaryKeyColumns), row);
}
}
} finally {
for (SegmentDataManager segmentManager : segmentManagers) {
releaseSegment(segmentManager);
}
}
} finally {
_lookupTableWriteLock.unlock();
for (SegmentDataManager segmentManager : segmentManagers) {
releaseSegment(segmentManager);
}
}
}

public GenericRow lookupRowByPrimaryKey(PrimaryKey pk) {
_lookupTableReadLock.lock();
try {
return _lookupTable.get(pk);
} finally {
_lookupTableReadLock.unlock();
}
return _lookupTable.get(pk);
}

public FieldSpec getColumnFieldSpec(String columnName) {
Expand Down