Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-1810. SCM command to Activate and Deactivate pipelines. #1224

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,18 @@ public List<Pipeline> listPipelines() throws IOException {
return storageContainerLocationClient.listPipelines();
}

@Override
public void activatePipeline(HddsProtos.PipelineID pipelineID)
throws IOException {
storageContainerLocationClient.activatePipeline(pipelineID);
}

@Override
public void deactivatePipeline(HddsProtos.PipelineID pipelineID)
throws IOException {
storageContainerLocationClient.deactivatePipeline(pipelineID);
}

@Override
public void closePipeline(HddsProtos.PipelineID pipelineID)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,22 @@ Pipeline createReplicationPipeline(HddsProtos.ReplicationType type,
*/
List<Pipeline> listPipelines() throws IOException;

/**
* Activates the pipeline given a pipeline ID.
*
* @param pipelineID PipelineID to activate.
* @throws IOException In case of exception while activating the pipeline
*/
void activatePipeline(HddsProtos.PipelineID pipelineID) throws IOException;

/**
* Deactivates the pipeline given a pipeline ID.
*
* @param pipelineID PipelineID to deactivate.
* @throws IOException In case of exception while deactivating the pipeline
*/
void deactivatePipeline(HddsProtos.PipelineID pipelineID) throws IOException;

/**
* Closes the pipeline given a pipeline ID.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,14 +354,15 @@ public Pipeline build() {
* Possible Pipeline states in SCM.
*/
public enum PipelineState {
ALLOCATED, OPEN, CLOSED;
ALLOCATED, OPEN, DORMANT, CLOSED;

public static PipelineState fromProtobuf(HddsProtos.PipelineState state)
throws UnknownPipelineStateException {
Preconditions.checkNotNull(state, "Pipeline state is null");
switch (state) {
case PIPELINE_ALLOCATED: return ALLOCATED;
case PIPELINE_OPEN: return OPEN;
case PIPELINE_DORMANT: return DORMANT;
case PIPELINE_CLOSED: return CLOSED;
default:
throw new UnknownPipelineStateException(
Expand All @@ -375,6 +376,7 @@ public static HddsProtos.PipelineState getProtobuf(PipelineState state)
switch (state) {
case ALLOCATED: return HddsProtos.PipelineState.PIPELINE_ALLOCATED;
case OPEN: return HddsProtos.PipelineState.PIPELINE_OPEN;
case DORMANT: return HddsProtos.PipelineState.PIPELINE_DORMANT;
case CLOSED: return HddsProtos.PipelineState.PIPELINE_CLOSED;
default:
throw new UnknownPipelineStateException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,22 @@ Pipeline createReplicationPipeline(HddsProtos.ReplicationType type,
*/
List<Pipeline> listPipelines() throws IOException;

/**
* Activates a dormant pipeline.
*
* @param pipelineID ID of the pipeline to activate.
* @throws IOException in case of any Exception
*/
void activatePipeline(HddsProtos.PipelineID pipelineID) throws IOException;

/**
* Deactivates an active pipeline.
*
* @param pipelineID ID of the pipeline to deactivate.
* @throws IOException in case of any Exception
*/
void deactivatePipeline(HddsProtos.PipelineID pipelineID) throws IOException;

/**
* Closes a pipeline given the pipelineID.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ActivatePipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ClosePipelineRequestProto;
Expand Down Expand Up @@ -339,6 +341,36 @@ public List<Pipeline> listPipelines() throws IOException {
}
}

@Override
public void activatePipeline(HddsProtos.PipelineID pipelineID)
throws IOException {
try {
ActivatePipelineRequestProto request =
ActivatePipelineRequestProto.newBuilder()
.setTraceID(TracingUtil.exportCurrentSpan())
.setPipelineID(pipelineID)
.build();
rpcProxy.activatePipeline(NULL_RPC_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}

@Override
public void deactivatePipeline(HddsProtos.PipelineID pipelineID)
throws IOException {
try {
DeactivatePipelineRequestProto request =
DeactivatePipelineRequestProto.newBuilder()
.setTraceID(TracingUtil.exportCurrentSpan())
.setPipelineID(pipelineID)
.build();
rpcProxy.deactivatePipeline(NULL_RPC_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}

@Override
public void closePipeline(HddsProtos.PipelineID pipelineID)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public enum SCMAction implements AuditAction {
LIST_CONTAINER,
LIST_PIPELINE,
CLOSE_PIPELINE,
ACTIVATE_PIPELINE,
DEACTIVATE_PIPELINE,
DELETE_CONTAINER,
IN_SAFE_MODE,
FORCE_EXIT_SAFE_MODE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos.ActivatePipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos.ActivatePipelineResponseProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos.DeactivatePipelineResponseProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos.ContainerRequestProto;
import org.apache.hadoop.hdds.protocol.proto
Expand Down Expand Up @@ -257,6 +265,32 @@ public ListPipelineResponseProto listPipelines(
}
}

@Override
public ActivatePipelineResponseProto activatePipeline(
RpcController controller, ActivatePipelineRequestProto request)
throws ServiceException {
try (Scope ignored = TracingUtil
.importAndCreateScope("activatePipeline", request.getTraceID())) {
impl.activatePipeline(request.getPipelineID());
return ActivatePipelineResponseProto.newBuilder().build();
} catch (IOException e) {
throw new ServiceException(e);
}
}

@Override
public DeactivatePipelineResponseProto deactivatePipeline(
RpcController controller, DeactivatePipelineRequestProto request)
throws ServiceException {
try (Scope ignored = TracingUtil
.importAndCreateScope("activatePipeline", request.getTraceID())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.importAndCreateScope("activatePipeline", request.getTraceID())) {
.importAndCreateScope("deactivatePipeline", request.getTraceID())) {

impl.deactivatePipeline(request.getPipelineID());
return DeactivatePipelineResponseProto.newBuilder().build();
} catch (IOException e) {
throw new ServiceException(e);
}
}

@Override
public ClosePipelineResponseProto closePipeline(
RpcController controller, ClosePipelineRequestProto request)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,22 @@ message ListPipelineResponseProto {
repeated Pipeline pipelines = 1;
}

message ActivatePipelineRequestProto {
required PipelineID pipelineID = 1;
optional string traceID = 2;
}

message ActivatePipelineResponseProto {
}

message DeactivatePipelineRequestProto {
required PipelineID pipelineID = 1;
optional string traceID = 2;
}

message DeactivatePipelineResponseProto {
}

message ClosePipelineRequestProto {
required PipelineID pipelineID = 1;
optional string traceID = 2;
Expand Down Expand Up @@ -274,6 +290,12 @@ service StorageContainerLocationProtocolService {
rpc listPipelines(ListPipelineRequestProto)
returns (ListPipelineResponseProto);

rpc activatePipeline(ActivatePipelineRequestProto)
returns (ActivatePipelineResponseProto);

rpc deactivatePipeline(DeactivatePipelineRequestProto)
returns (DeactivatePipelineResponseProto);

/**
* Closes a pipeline.
*/
Expand Down
3 changes: 2 additions & 1 deletion hadoop-hdds/common/src/main/proto/hdds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ message PipelineID {
enum PipelineState {
PIPELINE_ALLOCATED = 1;
PIPELINE_OPEN = 2;
PIPELINE_CLOSED = 3;
PIPELINE_DORMANT = 3;
PIPELINE_CLOSED = 4;
}

message Pipeline {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,21 @@ void finalizeAndDestroyPipeline(Pipeline pipeline, boolean onTimeout)
void triggerPipelineCreation();

void incNumBlocksAllocatedMetric(PipelineID id);

/**
* Activates a dormant pipeline.
*
* @param pipelineID ID of the pipeline to activate.
* @throws IOException in case of any Exception
*/
void activatePipeline(PipelineID pipelineID) throws IOException;

/**
* Deactivates an active pipeline.
*
* @param pipelineID ID of the pipeline to deactivate.
* @throws IOException in case of any Exception
*/
void deactivatePipeline(PipelineID pipelineID) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,28 @@ Pipeline openPipeline(PipelineID pipelineId) throws IOException {
}
return pipeline;
}

/**
* Activates a dormant pipeline.
*
* @param pipelineID ID of the pipeline to activate.
* @throws IOException in case of any Exception
*/
public void activatePipeline(PipelineID pipelineID)
throws IOException {
pipelineStateMap
.updatePipelineState(pipelineID, PipelineState.OPEN);
}

/**
* Deactivates an active pipeline.
*
* @param pipelineID ID of the pipeline to deactivate.
* @throws IOException in case of any Exception
*/
public void deactivatePipeline(PipelineID pipelineID)
throws IOException {
pipelineStateMap
.updatePipelineState(pipelineID, PipelineState.DORMANT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ Pipeline updatePipelineState(PipelineID pipelineID, PipelineState state)
if (updatedPipeline.getPipelineState() == PipelineState.OPEN) {
// for transition to OPEN state add pipeline to query2OpenPipelines
query2OpenPipelines.get(query).add(updatedPipeline);
} else if (updatedPipeline.getPipelineState() == PipelineState.CLOSED) {
} else {
// for transition from OPEN to CLOSED state remove pipeline from
// query2OpenPipelines
query2OpenPipelines.get(query).remove(pipeline);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ public Pipeline create(ReplicationFactor factor) throws IOException {
Set<DatanodeDetails> dnsUsed = new HashSet<>();
stateManager.getPipelines(ReplicationType.RATIS, factor).stream().filter(
p -> p.getPipelineState().equals(PipelineState.OPEN) ||
p.getPipelineState().equals(PipelineState.DORMANT) ||
p.getPipelineState().equals(PipelineState.ALLOCATED))
.forEach(p -> dnsUsed.addAll(p.getNodes()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.hdds.scm.pipeline;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
Expand Down Expand Up @@ -116,6 +117,7 @@ public PipelineStateManager getStateManager() {
return stateManager;
}

@VisibleForTesting
public void setPipelineProvider(ReplicationType replicationType,
PipelineProvider provider) {
pipelineFactory.setProvider(replicationType, provider);
Expand Down Expand Up @@ -349,6 +351,30 @@ public void triggerPipelineCreation() {
backgroundPipelineCreator.triggerPipelineCreation();
}

/**
* Activates a dormant pipeline.
*
* @param pipelineID ID of the pipeline to activate.
* @throws IOException in case of any Exception
*/
@Override
public void activatePipeline(PipelineID pipelineID)
throws IOException {
stateManager.activatePipeline(pipelineID);
}

/**
* Deactivates an active pipeline.
*
* @param pipelineID ID of the pipeline to deactivate.
* @throws IOException in case of any Exception
*/
@Override
public void deactivatePipeline(PipelineID pipelineID)
throws IOException {
stateManager.deactivatePipeline(pipelineID);
}

/**
* Moves the pipeline to CLOSED state and sends close container command for
* all the containers in the pipeline.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,24 @@ public List<Pipeline> listPipelines() {
return scm.getPipelineManager().getPipelines();
}

@Override
public void activatePipeline(HddsProtos.PipelineID pipelineID)
throws IOException {
AUDIT.logReadSuccess(buildAuditMessageForSuccess(
SCMAction.ACTIVATE_PIPELINE, null));
scm.getPipelineManager().activatePipeline(
PipelineID.getFromProtobuf(pipelineID));
}

@Override
public void deactivatePipeline(HddsProtos.PipelineID pipelineID)
throws IOException {
AUDIT.logReadSuccess(buildAuditMessageForSuccess(
SCMAction.DEACTIVATE_PIPELINE, null));
scm.getPipelineManager().deactivatePipeline(
PipelineID.getFromProtobuf(pipelineID));
}

@Override
public void closePipeline(HddsProtos.PipelineID pipelineID)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
import org.apache.hadoop.hdds.scm.cli.container.DeleteSubcommand;
import org.apache.hadoop.hdds.scm.cli.container.InfoSubcommand;
import org.apache.hadoop.hdds.scm.cli.container.ListSubcommand;
import org.apache.hadoop.hdds.scm.cli.pipeline.ActivatePipelineSubcommand;
import org.apache.hadoop.hdds.scm.cli.pipeline.ClosePipelineSubcommand;
import org.apache.hadoop.hdds.scm.cli.pipeline.DeactivatePipelineSubcommand;
import org.apache.hadoop.hdds.scm.cli.pipeline.ListPipelinesSubcommand;
import org.apache.hadoop.hdds.scm.client.ContainerOperationClient;
import org.apache.hadoop.hdds.scm.client.ScmClient;
Expand Down Expand Up @@ -84,6 +86,8 @@
CreateSubcommand.class,
CloseSubcommand.class,
ListPipelinesSubcommand.class,
ActivatePipelineSubcommand.class,
DeactivatePipelineSubcommand.class,
ClosePipelineSubcommand.class,
TopologySubcommand.class,
ReplicationManagerCommands.class
Expand Down
Loading