Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(backup): Add restore indices and restore backup tasks #2779

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix to John's comments
  • Loading branch information
Dexter Lee committed Jun 29, 2021
commit 537f07ded31c1baf2653ad20b62f7af9229e6a91
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class GMSDisableWriteModeStep implements UpgradeStep {

@Override
public String id() {
return "GMSEnableWriteModeStep";
return "GMSDisableWriteModeStep";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.linkedin.datahub.upgrade.common.steps.ClearGraphServiceStep;
import com.linkedin.datahub.upgrade.common.steps.ClearSearchServiceStep;
import com.linkedin.datahub.upgrade.common.steps.GMSDisableWriteModeStep;
import com.linkedin.datahub.upgrade.common.steps.GMSEnableWriteModeStep;
import com.linkedin.datahub.upgrade.common.steps.GMSQualificationStep;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.entity.EntityService;
Expand Down Expand Up @@ -47,6 +48,7 @@ private List<UpgradeStep> buildSteps(final EbeanServer server, final EntityServi
steps.add(new ClearGraphServiceStep(graphClient, true));
steps.add(new ClearAspectV2TableStep(server));
steps.add(new RestoreStorageStep(entityService, entityRegistry));
steps.add(new GMSEnableWriteModeStep(entityClient));
return steps;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.FAILED);
}

EbeanAspectBackupIterator iterator = _backupReaders.get(backupReaderName.get()).getBackupIterator(context);
if (iterator == null) {
Optional<EbeanAspectBackupIterator> iterator = _backupReaders.get(backupReaderName.get()).getBackupIterator(context);
if (!iterator.isPresent()) {
context.report().addLine("Failed to build backup iterator");
return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.FAILED);
}
EbeanAspectV2 aspect;
while ((aspect = iterator.next()) != null) {
while ((aspect = iterator.get().next()) != null) {
numRows++;

// 1. Extract an Entity type from the entity Urn
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.datahub.upgrade.restorebackup.backupreader;

import com.linkedin.datahub.upgrade.UpgradeContext;
import java.util.Optional;


/**
Expand All @@ -10,5 +11,5 @@
public interface BackupReader {
String getName();

EbeanAspectBackupIterator getBackupIterator(UpgradeContext context);
Optional<EbeanAspectBackupIterator> getBackupIterator(UpgradeContext context);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,19 @@ public String getName() {
}

@Override
public EbeanAspectBackupIterator getBackupIterator(UpgradeContext context) {
public Optional<EbeanAspectBackupIterator> getBackupIterator(UpgradeContext context) {
Optional<String> path = context.parsedArgs().get("BACKUP_FILE_PATH");
if (!path.isPresent()) {
context.report().addLine("BACKUP_FILE_PATH must be set to run RestoreBackup through local parquet file");
return null;
return Optional.empty();
}

try {
ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(new Path(path.get())).build();
return new ParquetEbeanAspectBackupIterator(reader);
return Optional.of(new ParquetEbeanAspectBackupIterator(reader));
} catch (IOException e) {
context.report().addLine(String.format("Failed to build ParquetReader: %s", e));
return null;
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

/**
* Iterator to retrieve EbeanAspectV2 objects from the ParquetReader
* Converts the avro GeneralRecord object into EbeanAspectV2
* Converts the avro GenericRecord object into EbeanAspectV2
*/
@RequiredArgsConstructor
public class ParquetEbeanAspectBackupIterator implements EbeanAspectBackupIterator {
Expand All @@ -38,9 +38,6 @@ public void close() throws IOException {
}

private EbeanAspectV2 convertRecord(GenericRecord record) {
if (record == null) {
return null;
}
EbeanAspectV2.PrimaryKey key =
new EbeanAspectV2.PrimaryKey(record.get("urn").toString(), record.get("aspect").toString(),
(Long) record.get("version"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {

context.report().addLine("Sending MAE from local DB...");
final int rowCount = _server.find(EbeanAspectV2.class).where().eq(EbeanAspectV2.VERSION_COLUMN, 0).findCount();
context.report().addLine(String.format("Found %s rows in legacy aspects table", rowCount));
context.report().addLine(String.format("Found %s latest aspects in aspects table", rowCount));

int totalRowsMigrated = 0;
int start = 0;
int count = getBatchSize(context.parsedArgs());
while (start < rowCount) {

context.report()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

.addLine(String.format("Reading rows %s through %s from legacy aspects table.", start, start + count));
.addLine(String.format("Reading rows %s through %s from the aspects table.", start, start + count));
PagedList<EbeanAspectV2> rows = getPagedAspects(start, count);

for (EbeanAspectV2 aspect : rows.getList()) {
Expand Down
4 changes: 4 additions & 0 deletions docker/datahub-upgrade/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ to metadata_aspect_v2 table. Arguments:
2. **NoCodeDataMigrationCleanup**: Cleanses graph index, search index, and key-value store of legacy DataHub data (metadata_aspect table) once
the No Code Data Migration has completed successfully. No arguments.

3. **RestoreIndices**: Restores indices by fetching the latest version of each aspect and producing MAE

4. **RestoreBackup**: Restores the storage stack from a backup of the local database

## Environment Variables

To run the `datahub-upgrade` container, some environment variables must be provided in order to tell the upgrade CLI
Expand Down
3 changes: 3 additions & 0 deletions docs/how/restore-indices.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ Run the following command from root to send MAE for each aspect in the Local DB.

If you need to clear the search and graph indices before restoring, add `-a clean` to the end of the command.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding a note about the default env variables used and how to change them to suit your deployment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a link to the datahub-upgrade readme section on env variables. Also updated that doc


Refer to this [doc](../../docker/datahub-upgrade/README.md#environment-variables) on how to set environment variables
for your environment.

## Kubernetes

Run `kubectl get cronjobs` to see if the restoration job template has been deployed. If you see results like below, you
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good stuf!!!

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.metadata.graph.elastic;

import com.fasterxml.jackson.databind.node.JsonNodeFactory;

import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableList;
import com.linkedin.common.urn.Urn;
Expand Down Expand Up @@ -72,8 +73,7 @@ private String toDocument(@Nonnull final Edge edge) {

private String toDocId(@Nonnull final Edge edge) {
String rawDocId =
edge.getSource().toString() + DOC_DELIMETER + edge.getRelationshipType() + DOC_DELIMETER + edge.getDestination()
.toString();
edge.getSource().toString() + DOC_DELIMETER + edge.getRelationshipType() + DOC_DELIMETER + edge.getDestination().toString();

try {
byte[] bytesOfRawDocID = rawDocId.getBytes("UTF-8");
Expand All @@ -93,25 +93,36 @@ public void addEdge(@Nonnull final Edge edge) {
}

@Nonnull
public List<String> findRelatedUrns(@Nullable final String sourceType, @Nonnull final Filter sourceEntityFilter,
@Nullable final String destinationType, @Nonnull final Filter destinationEntityFilter,
@Nonnull final List<String> relationshipTypes, @Nonnull final RelationshipFilter relationshipFilter,
final int offset, final int count) {
public List<String> findRelatedUrns(
@Nullable final String sourceType,
@Nonnull final Filter sourceEntityFilter,
@Nullable final String destinationType,
@Nonnull final Filter destinationEntityFilter,
@Nonnull final List<String> relationshipTypes,
@Nonnull final RelationshipFilter relationshipFilter,
final int offset,
final int count) {

final RelationshipDirection relationshipDirection = relationshipFilter.getDirection();
String destinationNode = relationshipDirection == RelationshipDirection.OUTGOING ? "destination" : "source";

SearchResponse response =
_graphReadDAO.getSearchResponse(sourceType, sourceEntityFilter, destinationType, destinationEntityFilter,
relationshipTypes, relationshipFilter, offset, count);
SearchResponse response = _graphReadDAO.getSearchResponse(
sourceType,
sourceEntityFilter,
destinationType,
destinationEntityFilter,
relationshipTypes,
relationshipFilter,
offset,
count
);

if (response == null) {
return ImmutableList.of();
}

return Arrays.stream(response.getHits().getHits())
.map(hit -> ((HashMap<String, String>) hit.getSourceAsMap()
.getOrDefault(destinationNode, EMPTY_HASH)).getOrDefault("urn", null))
.map(hit -> ((HashMap<String, String>) hit.getSourceAsMap().getOrDefault(destinationNode, EMPTY_HASH)).getOrDefault("urn", null))
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
Expand All @@ -137,29 +148,52 @@ public void removeNode(@Nonnull final Urn urn) {
RelationshipFilter outgoingFilter = new RelationshipFilter().setDirection(RelationshipDirection.OUTGOING);
RelationshipFilter incomingFilter = new RelationshipFilter().setDirection(RelationshipDirection.INCOMING);

_graphWriteDAO.deleteByQuery(null, urnFilter, null, emptyFilter, relationshipTypes, outgoingFilter);

_graphWriteDAO.deleteByQuery(null, urnFilter, null, emptyFilter, relationshipTypes, incomingFilter);
_graphWriteDAO.deleteByQuery(
null,
urnFilter,
null,
emptyFilter,
relationshipTypes,
outgoingFilter
);

_graphWriteDAO.deleteByQuery(
null,
urnFilter,
null,
emptyFilter,
relationshipTypes,
incomingFilter
);

return;
}

public void removeEdgesFromNode(@Nonnull final Urn urn, @Nonnull final List<String> relationshipTypes,
public void removeEdgesFromNode(
@Nonnull final Urn urn,
@Nonnull final List<String> relationshipTypes,
@Nonnull final RelationshipFilter relationshipFilter) {

Filter urnFilter = createUrnFilter(urn);
Filter emptyFilter = new Filter().setCriteria(new CriterionArray());

_graphWriteDAO.deleteByQuery(null, urnFilter, null, emptyFilter, relationshipTypes, relationshipFilter);
_graphWriteDAO.deleteByQuery(
null,
urnFilter,
null,
emptyFilter,
relationshipTypes,
relationshipFilter
);
}

@Override
public void configure() {
log.info("Setting up elastic graph index");
boolean exists = false;
try {
exists = searchClient.indices()
.exists(new GetIndexRequest(_indexConvention.getIndexName(INDEX_NAME)), RequestOptions.DEFAULT);
exists = searchClient.indices().exists(
new GetIndexRequest(_indexConvention.getIndexName(INDEX_NAME)), RequestOptions.DEFAULT);
} catch (IOException e) {
log.error("ERROR: Failed to set up elasticsearch graph index. Could not check if the index exists");
e.printStackTrace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public void configure() {

@Override
public void clear() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to do something here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

esWriteDAO.clear();
}

@Override
Expand Down