Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ private OzoneConsts() {
public static final String CHUNKS_PATH = "chunksPath";
public static final String CONTAINER_DB_TYPE = "containerDBType";
public static final String CHECKSUM = "checksum";
public static final String DATA_SCAN_TIMESTAMP = "dataScanTimestamp";
public static final String ORIGIN_PIPELINE_ID = "originPipelineId";
public static final String ORIGIN_NODE_ID = "originNodeId";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.google.common.collect.Lists;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.List;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
Expand All @@ -32,13 +34,17 @@

import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import org.yaml.snakeyaml.Yaml;

import javax.annotation.Nullable;

import static org.apache.hadoop.ozone.OzoneConsts.CHECKSUM;
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_ID;
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_TYPE;
import static org.apache.hadoop.ozone.OzoneConsts.DATA_SCAN_TIMESTAMP;
import static org.apache.hadoop.ozone.OzoneConsts.LAYOUTVERSION;
import static org.apache.hadoop.ozone.OzoneConsts.MAX_SIZE;
import static org.apache.hadoop.ozone.OzoneConsts.METADATA;
Expand Down Expand Up @@ -89,7 +95,14 @@ public abstract class ContainerData {
private HddsVolume volume;

private String checksum;
public static final Charset CHARSET_ENCODING = Charset.forName("UTF-8");

/** Timestamp of last data scan (milliseconds since Unix Epoch).
* {@code null} if not yet scanned (or timestamp not recorded,
* eg. in prior versions). */
private Long dataScanTimestamp; // for serialization
private transient Optional<Instant> lastDataScanTime = Optional.empty();

public static final Charset CHARSET_ENCODING = StandardCharsets.UTF_8;
private static final String DUMMY_CHECKSUM = new String(new byte[64],
CHARSET_ENCODING);

Expand All @@ -103,6 +116,7 @@ public abstract class ContainerData {
METADATA,
MAX_SIZE,
CHECKSUM,
DATA_SCAN_TIMESTAMP,
ORIGIN_PIPELINE_ID,
ORIGIN_NODE_ID));

Expand Down Expand Up @@ -506,6 +520,30 @@ public String getChecksum() {
return this.checksum;
}

/**
* @return {@code Optional} with the timestamp of last data scan.
* {@code absent} if not yet scanned or timestamp was not recorded.
*/
public Optional<Instant> lastDataScanTime() {
return lastDataScanTime;
}

public void updateDataScanTime(@Nullable Instant time) {
lastDataScanTime = Optional.ofNullable(time);
dataScanTimestamp = time != null ? time.toEpochMilli() : null;
}

// for deserialization
public void setDataScanTimestamp(Long timestamp) {
dataScanTimestamp = timestamp;
lastDataScanTime = timestamp != null
? Optional.of(Instant.ofEpochMilli(timestamp))
: Optional.empty();
}

public Long getDataScanTimestamp() {
return dataScanTimestamp;
}

/**
* Returns the origin pipeline Id of this container.
Expand Down Expand Up @@ -557,4 +595,5 @@ public void computeAndSetChecksum(Yaml yaml) throws IOException {
* Returns the blockCommitSequenceId.
*/
public abstract long getBlockCommitSequenceId();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.impl;

import org.apache.hadoop.ozone.container.common.interfaces.Container;

import java.time.Instant;
import java.util.Comparator;
import java.util.Optional;

/**
* Orders containers:
* 1. containers not yet scanned first,
* 2. then least recently scanned first,
* 3. ties are broken by containerID.
*/
public class ContainerDataScanOrder implements Comparator<Container<?>> {

public static final Comparator<Container<?>> INSTANCE =
new ContainerDataScanOrder();

@Override
public int compare(Container<?> o1, Container<?> o2) {
ContainerData d1 = o1.getContainerData();
ContainerData d2 = o2.getContainerData();

Optional<Instant> scan1 = d1.lastDataScanTime();
boolean scanned1 = scan1.isPresent();
Optional<Instant> scan2 = d2.lastDataScanTime();
boolean scanned2 = scan2.isPresent();

int result = Boolean.compare(scanned1, scanned2);
if (0 == result && scanned1 && scanned2) {
result = scan1.get().compareTo(scan2.get());
}
if (0 == result) {
result = Long.compare(d1.getContainerID(), d2.getContainerID());
}

return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.yaml.snakeyaml.introspector.PropertyUtils;
import org.yaml.snakeyaml.nodes.MappingNode;
import org.yaml.snakeyaml.nodes.Node;
import org.yaml.snakeyaml.nodes.NodeTuple;
import org.yaml.snakeyaml.nodes.ScalarNode;
import org.yaml.snakeyaml.nodes.Tag;
import org.yaml.snakeyaml.representer.Representer;
Expand Down Expand Up @@ -92,7 +93,6 @@ public static void createContainerFile(ContainerType containerType,
containerFile);
writer = new OutputStreamWriter(out, "UTF-8");
yaml.dump(containerData, writer);

} finally {
try {
if (writer != null) {
Expand Down Expand Up @@ -217,6 +217,17 @@ protected Set<Property> getProperties(Class<? extends Object> type)
}
return filtered;
}

/**
* Omit properties with null value.
*/
@Override
protected NodeTuple representJavaBeanProperty(
Object bean, Property property, Object value, Tag tag) {
return value == null
? null
: super.representJavaBeanProperty(bean, property, value, tag);
}
}

/**
Expand Down Expand Up @@ -260,6 +271,8 @@ public Object construct(Node node) {
Map<String, String> meta = (Map) nodes.get(OzoneConsts.METADATA);
kvData.setMetadata(meta);
kvData.setChecksum((String) nodes.get(OzoneConsts.CHECKSUM));
Long timestamp = (Long) nodes.get(OzoneConsts.DATA_SCAN_TIMESTAMP);
kvData.setDataScanTimestamp(timestamp);
String state = (String) nodes.get(OzoneConsts.STATE);
kvData
.setState(ContainerProtos.ContainerDataProto.State.valueOf(state));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public Iterator<Container<?>> getContainerIterator() {

/**
* Return an iterator of containers associated with the specified volume.
* The iterator is sorted by last data scan timestamp in increasing order.
*
* @param volume the HDDS volume which should be used to filter containers
* @return {@literal Iterator<Container<?>>}
Expand All @@ -143,6 +144,7 @@ public Iterator<Container<?>> getContainerIterator(HddsVolume volume) {
return containerMap.values().stream()
.filter(x -> volumeUuid.equals(x.getContainerData().getVolume()
.getStorageID()))
.sorted(ContainerDataScanOrder.INSTANCE)
.iterator();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.time.Instant;
import java.util.Map;

import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
Expand Down Expand Up @@ -66,6 +67,9 @@ void create(VolumeSet volumeSet, VolumeChoosingPolicy volumeChoosingPolicy,
void update(Map<String, String> metaData, boolean forceUpdate)
throws StorageContainerException;

void updateDataScanTimestamp(Instant timestamp)
throws StorageContainerException;

/**
* Get metadata about the container.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;

Expand Down Expand Up @@ -337,6 +338,17 @@ public void close() throws StorageContainerException {
containerData.getBlockCommitSequenceId());
}

@Override
public void updateDataScanTimestamp(Instant time)
throws StorageContainerException {
writeLock();
try {
updateContainerData(() -> containerData.updateDataScanTime(time));
} finally {
writeUnlock();
}
}

/**
*
* Must be invoked with the writeLock held.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.time.Instant;
import java.util.Iterator;
import java.util.Map;

Expand Down Expand Up @@ -174,4 +175,10 @@ public Iterator<Container<?>> getContainers(HddsVolume volume) {
return containerSet.getContainerIterator(volume);
}

void updateDataScanTimestamp(long containerId, Instant timestamp)
throws IOException {
Container container = containerSet.getContainer(containerId);
container.updateDataScanTimestamp(timestamp);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
package org.apache.hadoop.ozone.container.ozoneimpl;

import java.io.IOException;
import java.time.Instant;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.slf4j.Logger;
Expand Down Expand Up @@ -95,14 +98,19 @@ public void runIteration() {
while (!stopping && itr.hasNext()) {
Container c = itr.next();
if (c.shouldScanData()) {
ContainerData containerData = c.getContainerData();
long containerId = containerData.getContainerID();
try {
logScanStart(containerData);
if (!c.scanData(throttler, canceler)) {
metrics.incNumUnHealthyContainers();
controller.markContainerUnhealthy(
c.getContainerData().getContainerID());
controller.markContainerUnhealthy(containerId);
} else {
Instant now = Instant.now();
logScanCompleted(containerData, now);
controller.updateDataScanTimestamp(containerId, now);
}
} catch (IOException ex) {
long containerId = c.getContainerData().getContainerID();
LOG.warn("Unexpected exception while scanning container "
+ containerId, ex);
} finally {
Expand Down Expand Up @@ -135,6 +143,23 @@ public void runIteration() {
}
}

private static void logScanStart(ContainerData containerData) {
if (LOG.isDebugEnabled()) {
Optional<Instant> scanTimestamp = containerData.lastDataScanTime();
Object lastScanTime = scanTimestamp.map(ts -> "at " + ts).orElse("never");
LOG.debug("Scanning container {}, last scanned {}",
containerData.getContainerID(), lastScanTime);
}
}

private static void logScanCompleted(
ContainerData containerData, Instant timestamp) {
if (LOG.isDebugEnabled()) {
LOG.debug("Completed scan of container {} at {}",
containerData.getContainerID(), timestamp);
}
}

public synchronized void shutdown() {
this.stopping = true;
this.canceler.cancel("ContainerDataScanner("+volume+") is shutting down");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.io.File;
import java.io.IOException;
import java.time.Instant;
import java.util.UUID;

import static org.junit.Assert.assertEquals;
Expand All @@ -45,6 +46,7 @@ public class TestContainerDataYaml {
private static String testRoot = new FileSystemTestHelper().getTestRootDir();

private static final long MAXSIZE = (long) StorageUnit.GB.toBytes(5);
private static final Instant SCAN_TIME = Instant.now();

/**
* Creates a .container file. cleanup() should be called at the end of the
Expand All @@ -61,6 +63,7 @@ private File createContainerFile(long containerID) throws IOException {
keyValueContainerData.setContainerDBType("RocksDB");
keyValueContainerData.setMetadataPath(testRoot);
keyValueContainerData.setChunksPath(testRoot);
keyValueContainerData.updateDataScanTime(SCAN_TIME);

File containerFile = new File(testRoot, containerPath);

Expand Down Expand Up @@ -98,6 +101,11 @@ public void testCreateContainerFile() throws IOException {
assertEquals(1, kvData.getLayOutVersion());
assertEquals(0, kvData.getMetadata().size());
assertEquals(MAXSIZE, kvData.getMaxSize());
assertEquals(MAXSIZE, kvData.getMaxSize());
assertTrue(kvData.lastDataScanTime().isPresent());
assertEquals(SCAN_TIME, kvData.lastDataScanTime().get());
assertEquals(SCAN_TIME.toEpochMilli(),
kvData.getDataScanTimestamp().longValue());

// Update ContainerData.
kvData.addMetadata("VOLUME", "hdfs");
Expand Down Expand Up @@ -126,6 +134,10 @@ public void testCreateContainerFile() throws IOException {
assertEquals("hdfs", kvData.getMetadata().get("VOLUME"));
assertEquals("ozone", kvData.getMetadata().get("OWNER"));
assertEquals(MAXSIZE, kvData.getMaxSize());
assertTrue(kvData.lastDataScanTime().isPresent());
assertEquals(SCAN_TIME, kvData.lastDataScanTime().get());
assertEquals(SCAN_TIME.toEpochMilli(),
kvData.getDataScanTimestamp().longValue());
}

@Test
Expand Down
Loading