Skip to content

HDDS-1898. GrpcReplicationService#download cannot replicate the container. #1326

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
package org.apache.hadoop.ozone.container.common.interfaces;


import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
Expand Down Expand Up @@ -109,14 +110,23 @@ public abstract ContainerCommandResponseProto handle(
DispatcherContext dispatcherContext);

/**
* Import container data from a raw input stream.
* Imports container from a raw input stream.
*/
public abstract Container importContainer(
long containerID,
long maxSize,
String originPipelineId,
String originNodeId,
FileInputStream rawContainerStream,
InputStream rawContainerStream,
TarContainerPacker packer)
throws IOException;

/**
* Exports container to the output stream.
*/
public abstract void exportContainer(
Container container,
OutputStream outputStream,
TarContainerPacker packer)
throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,12 @@ public void handle(SCMCommand command, OzoneContainer container,
case KeyValueContainer:
KeyValueContainerData containerData = (KeyValueContainerData)
cont.getContainerData();
deleteKeyValueContainerBlocks(containerData, entry);
cont.writeLock();
try {
deleteKeyValueContainerBlocks(containerData, entry);
} finally {
cont.writeUnlock();
}
txResultBuilder.setContainerID(containerId)
.setSuccess(true);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,9 @@ public void close() throws StorageContainerException {
} finally {
writeUnlock();
}
LOG.info("Container {} is closed with bcsId {}.",
containerData.getContainerID(),
containerData.getBlockCommitSequenceId());
}

/**
Expand Down Expand Up @@ -359,13 +362,10 @@ private void updateContainerData(Runnable update)
}
}

void compactDB() throws StorageContainerException {
private void compactDB() throws StorageContainerException {
try {
try(ReferenceCountedDB db = BlockUtils.getDB(containerData, config)) {
db.getStore().compactDB();
LOG.info("Container {} is closed with bcsId {}.",
containerData.getContainerID(),
containerData.getBlockCommitSequenceId());
}
} catch (StorageContainerException ex) {
throw ex;
Expand Down Expand Up @@ -522,6 +522,7 @@ public void exportContainerData(OutputStream destination,
"Only closed containers could be exported: ContainerId="
+ getContainerData().getContainerID());
}
compactDB();
packer.pack(this, destination);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@

package org.apache.hadoop.ozone.container.keyvalue;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedList;
Expand Down Expand Up @@ -841,13 +842,14 @@ private void checkContainerOpen(KeyValueContainer kvContainer)
throw new StorageContainerException(msg, result);
}

public Container importContainer(long containerID, long maxSize,
String originPipelineId,
String originNodeId,
FileInputStream rawContainerStream,
TarContainerPacker packer)
@Override
public Container importContainer(final long containerID,
final long maxSize, final String originPipelineId,
final String originNodeId, final InputStream rawContainerStream,
final TarContainerPacker packer)
throws IOException {

// TODO: Add layout version!
KeyValueContainerData containerData =
new KeyValueContainerData(containerID,
maxSize, originPipelineId, originNodeId);
Expand All @@ -862,6 +864,20 @@ public Container importContainer(long containerID, long maxSize,

}

@Override
public void exportContainer(final Container container,
final OutputStream outputStream,
final TarContainerPacker packer)
throws IOException{
container.readLock();
try {
final KeyValueContainer kvc = (KeyValueContainer) container;
kvc.exportContainerData(outputStream, packer);
} finally {
container.readUnlock();
}
}

@Override
public void markContainerForClose(Container container)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.TopNOrderedContainerDeletionChoosingPolicy;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDeletionChoosingPolicy;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
Expand Down Expand Up @@ -247,6 +248,9 @@ private class BlockDeletingTask
@Override
public BackgroundTaskResult call() throws Exception {
ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
final Container container = ozoneContainer.getContainerSet()
.getContainer(containerData.getContainerID());
container.writeLock();
long startTime = Time.monotonicNow();
// Scan container's db and get list of under deletion blocks
try (ReferenceCountedDB meta = BlockUtils.getDB(containerData, conf)) {
Expand Down Expand Up @@ -313,6 +317,8 @@ public BackgroundTaskResult call() throws Exception {
}
crr.addAll(succeedBlocks);
return crr;
} finally {
container.writeUnlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Iterator;
import java.util.Map;

Expand Down Expand Up @@ -120,13 +121,20 @@ public void closeContainer(final long containerId) throws IOException {

public Container importContainer(final ContainerType type,
final long containerId, final long maxSize, final String originPipelineId,
final String originNodeId, final FileInputStream rawContainerStream,
final String originNodeId, final InputStream rawContainerStream,
final TarContainerPacker packer)
throws IOException {
return handlers.get(type).importContainer(containerId, maxSize,
originPipelineId, originNodeId, rawContainerStream, packer);
}

public void exportContainer(final ContainerType type,
final long containerId, final OutputStream outputStream,
final TarContainerPacker packer) throws IOException {
handlers.get(type).exportContainer(
containerSet.getContainer(containerId), outputStream, packer);
}

/**
* Deletes a container given its Id.
* @param containerId Id of the container to be deleted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.io.OutputStream;

import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;

import com.google.common.base.Preconditions;
Expand All @@ -41,7 +40,7 @@ public class OnDemandContainerReplicationSource

private ContainerController controller;

private ContainerPacker packer = new TarContainerPacker();
private TarContainerPacker packer = new TarContainerPacker();

public OnDemandContainerReplicationSource(
ContainerController controller) {
Expand All @@ -59,18 +58,11 @@ public void copyData(long containerId, OutputStream destination)

Container container = controller.getContainer(containerId);

Preconditions
.checkNotNull(container, "Container is not found " + containerId);
Preconditions.checkNotNull(
container, "Container is not found " + containerId);

switch (container.getContainerType()) {
case KeyValueContainer:
packer.pack(container,
destination);
break;
default:
LOG.warn("Container type " + container.getContainerType()
+ " is not replicable as no compression algorithm for that.");
}
controller.exportContainer(
container.getContainerType(), containerId, destination, packer);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.hadoop.test.GenericTestUtils;

Expand Down Expand Up @@ -119,6 +120,13 @@ public void testContainerReplication() throws Exception {
chooseDatanodeWithoutContainer(sourcePipelines,
cluster.getHddsDatanodes());

// Close the container
cluster.getStorageContainerManager().getScmNodeManager()
.addDatanodeCommand(
sourceDatanodes.get(0).getUuid(),
new CloseContainerCommand(containerId,
sourcePipelines.getId(), true));

//WHEN: send the order to replicate the container
cluster.getStorageContainerManager().getScmNodeManager()
.addDatanodeCommand(destinationDatanode.getDatanodeDetails().getUuid(),
Expand Down