Skip to content

HDDS-1620. Implement Volume Write Requests to use Cache and DoubleBuffer. #884

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Jun 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.apache.hadoop.utils.db;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;

import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.classification.InterfaceStability;
Expand Down Expand Up @@ -131,6 +133,14 @@ default void cleanupCache(long epoch) {
throw new NotImplementedException("cleanupCache is not implemented");
}

/**
* Return cache iterator maintained for this table.
*/
default Iterator<Map.Entry<CacheKey<KEY>, CacheValue<VALUE>>>
cacheIterator() {
throw new NotImplementedException("cacheIterator is not implemented");
}

/**
* Class used to represent the key and value pair of a db entry.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.hadoop.utils.db;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.utils.db.cache.CacheKey;
Expand Down Expand Up @@ -82,7 +84,7 @@ public boolean isEmpty() throws IOException {
@Override
public boolean isExist(KEY key) throws IOException {
CacheValue<VALUE> cacheValue= cache.get(new CacheKey<>(key));
return (cacheValue != null && cacheValue.getValue() != null) ||
return (cacheValue != null && cacheValue.getCacheValue() != null) ||
rawTable.isExist(codecRegistry.asRawData(key));
}

Expand All @@ -109,7 +111,7 @@ public VALUE get(KEY key) throws IOException {
return getFromTable(key);
} else {
// We have a value in cache, return the value.
return cacheValue.getValue();
return cacheValue.getCacheValue();
}
}

Expand Down Expand Up @@ -156,6 +158,9 @@ public void addCacheEntry(CacheKey<KEY> cacheKey,
cache.put(cacheKey, cacheValue);
}

public Iterator<Map.Entry<CacheKey<KEY>, CacheValue<VALUE>>> cacheIterator() {
return cache.iterator();
}

@Override
public void cleanupCache(long epoch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public CacheKey(KEY key) {
this.key = key;
}

public KEY getKey() {
public KEY getCacheKey() {
return key;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public CacheValue(Optional<VALUE> value, long epoch) {
this.epoch = epoch;
}

public VALUE getValue() {
public VALUE getCacheValue() {
return value.orNull();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.hadoop.utils.db.cache;

import java.util.Iterator;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -77,6 +78,11 @@ public int size() {
return cache.size();
}

@Override
public Iterator<Map.Entry<CACHEKEY, CACHEVALUE>> iterator() {
return cache.entrySet().iterator();
}

private void evictCache(long epoch) {
EpochEntry<CACHEKEY> currentEntry = null;
for (Iterator<EpochEntry<CACHEKEY>> iterator = epochEntries.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;

import java.util.Iterator;
import java.util.Map;

/**
* Cache used for RocksDB tables.
* @param <CACHEKEY>
Expand Down Expand Up @@ -60,4 +63,10 @@ public interface TableCache<CACHEKEY extends CacheKey,
* @return size
*/
int size();

/**
* Return an iterator for the cache.
* @return iterator of the underlying cache for the table.
*/
Iterator<Map.Entry<CACHEKEY, CACHEVALUE>> iterator();
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@ public void testPartialTableCache() {

for (int i=0; i < 10; i++) {
Assert.assertEquals(Integer.toString(i),
tableCache.get(new CacheKey<>(Integer.toString(i))).getValue());
tableCache.get(new CacheKey<>(Integer.toString(i))).getCacheValue());
}

// On a full table cache if some one calls cleanup it is a no-op.
tableCache.cleanup(4);

for (int i=5; i < 10; i++) {
Assert.assertEquals(Integer.toString(i),
tableCache.get(new CacheKey<>(Integer.toString(i))).getValue());
tableCache.get(new CacheKey<>(Integer.toString(i))).getCacheValue());
}
}

Expand Down Expand Up @@ -95,7 +95,7 @@ public void testPartialTableCacheParallel() throws Exception {
// Check we have first 10 entries in cache.
for (int i=1; i <= 10; i++) {
Assert.assertEquals(Integer.toString(i),
tableCache.get(new CacheKey<>(Integer.toString(i))).getValue());
tableCache.get(new CacheKey<>(Integer.toString(i))).getCacheValue());
}

int deleted = 5;
Expand All @@ -115,7 +115,7 @@ public void testPartialTableCacheParallel() throws Exception {
// Check if we have remaining entries.
for (int i=6; i <= totalCount; i++) {
Assert.assertEquals(Integer.toString(i),
tableCache.get(new CacheKey<>(Integer.toString(i))).getValue());
tableCache.get(new CacheKey<>(Integer.toString(i))).getCacheValue());
}

tableCache.cleanup(10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.OzoneConsts;
Expand Down Expand Up @@ -154,6 +155,28 @@ public Map<String, String> toAuditMap() {
return auditMap;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
OmVolumeArgs that = (OmVolumeArgs) o;
return creationTime == that.creationTime &&
quotaInBytes == that.quotaInBytes &&
Objects.equals(adminName, that.adminName) &&
Objects.equals(ownerName, that.ownerName) &&
Objects.equals(volume, that.volume);
}

@Override
public int hashCode() {
return Objects.hash(adminName, ownerName, volume, creationTime,
quotaInBytes);
}

/**
* Builder for OmVolumeArgs.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ message VolumeInfo {
optional uint64 quotaInBytes = 4;
repeated hadoop.hdds.KeyValue metadata = 5;
repeated OzoneAclInfo volumeAcls = 6;
required uint64 creationTime = 7;
optional uint64 creationTime = 7;
}

/**
Expand Down
2 changes: 1 addition & 1 deletion hadoop-ozone/ozone-manager/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.2.0</version>
<version>2.28.2</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.hadoop.hdds.client.BlockID;
Expand Down Expand Up @@ -62,6 +64,9 @@
import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;

import org.apache.hadoop.utils.db.TypedTable;
import org.apache.hadoop.utils.db.cache.CacheKey;
import org.apache.hadoop.utils.db.cache.CacheValue;
import org.eclipse.jetty.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -451,10 +456,27 @@ private boolean startsWith(byte[] firstArray, byte[] secondArray) {
public boolean isVolumeEmpty(String volume) throws IOException {
String volumePrefix = getVolumeKey(volume + OM_KEY_PREFIX);

// First check in bucket table cache.
Iterator<Map.Entry<CacheKey<String>, CacheValue<OmBucketInfo>>> iterator =
((TypedTable< String, OmBucketInfo>) bucketTable).cacheIterator();
while (iterator.hasNext()) {
Map.Entry< CacheKey< String >, CacheValue< OmBucketInfo > > entry =
iterator.next();
String key = entry.getKey().getCacheKey();
OmBucketInfo omBucketInfo = entry.getValue().getCacheValue();
// Making sure that entry is not for delete bucket request.
if (key.startsWith(volumePrefix) && omBucketInfo != null) {
return false;
}
}

try (TableIterator<String, ? extends KeyValue<String, OmBucketInfo>>
bucketIter = bucketTable.iterator()) {
KeyValue<String, OmBucketInfo> kv = bucketIter.seek(volumePrefix);
if (kv != null && kv.getKey().startsWith(volumePrefix)) {
// During iteration from DB, check in mean time if this bucket is not
// marked for delete.
if (kv != null && kv.getKey().startsWith(volumePrefix) &&
bucketTable.get(kv.getKey()) != null) {
return false; // we found at least one bucket with this volume prefix.
}
}
Expand All @@ -473,6 +495,8 @@ public boolean isVolumeEmpty(String volume) throws IOException {
public boolean isBucketEmpty(String volume, String bucket)
throws IOException {
String keyPrefix = getBucketKey(volume, bucket);
//TODO: When Key ops are converted in to HA model, use cache also to
// determine bucket is empty or not.
try (TableIterator<String, ? extends KeyValue<String, OmKeyInfo>> keyIter =
keyTable.iterator()) {
KeyValue<String, OmKeyInfo> kv = keyIter.seek(keyPrefix);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_PORT_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_PORT_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_USER_MAX_VOLUME_DEFAULT;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_AUTH_METHOD;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_REQUEST;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_ERROR_OTHER;
Expand Down Expand Up @@ -275,12 +277,20 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
private static String keyProviderUriKeyName =
CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH;

// Adding parameters needed for VolumeRequests here, so that during request
// execution, we can get from ozoneManager.
private long maxUserVolumeCount;


private OzoneManager(OzoneConfiguration conf) throws IOException,
AuthenticationException {
super(OzoneVersionInfo.OZONE_VERSION_INFO);
Preconditions.checkNotNull(conf);
configuration = conf;
this.maxUserVolumeCount = conf.getInt(OZONE_OM_USER_MAX_VOLUME,
OZONE_OM_USER_MAX_VOLUME_DEFAULT);
Preconditions.checkArgument(this.maxUserVolumeCount > 0,
OZONE_OM_USER_MAX_VOLUME + " value should be greater than zero");
omStorage = new OMStorage(conf);
omId = omStorage.getOmId();
if (omStorage.getState() != StorageState.INITIALIZED) {
Expand Down Expand Up @@ -3189,7 +3199,11 @@ public OMFailoverProxyProvider getOMFailoverProxyProvider() {
return null;
}

public OMMetrics getOmMetrics() {
return metrics;
/**
* Return maximum volumes count per user.
* @return maxUserVolumeCount
*/
public long getMaxUserVolumeCount() {
return maxUserVolumeCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,13 @@ private void flushTransactions() {
}

private void cleanupCache(long lastRatisTransactionIndex) {
// As now only bucket transactions are handled only called cleanupCache
// on bucketTable.
// As now only volume and bucket transactions are handled only called
// cleanupCache on bucketTable.
// TODO: After supporting all write operations we need to call
// cleanupCache on the tables only when buffer has entries for that table.
omMetadataManager.getBucketTable().cleanupCache(lastRatisTransactionIndex);
omMetadataManager.getVolumeTable().cleanupCache(lastRatisTransactionIndex);
omMetadataManager.getUserTable().cleanupCache(lastRatisTransactionIndex);
}

/**
Expand Down
Loading