Skip to content

fix(entity-query-service): revert bulk update #127

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,23 @@
import com.google.protobuf.ServiceException;
import com.typesafe.config.Config;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.hypertrace.core.documentstore.Collection;
import org.hypertrace.core.documentstore.Datastore;
import org.hypertrace.core.documentstore.Document;
import org.hypertrace.core.documentstore.JSONDocument;
import org.hypertrace.core.documentstore.Key;
import org.hypertrace.core.documentstore.SingleValueKey;
import org.hypertrace.core.grpcutils.client.GrpcChannelRegistry;
import org.hypertrace.core.grpcutils.context.RequestContext;
import org.hypertrace.entity.data.service.DocumentParser;
import org.hypertrace.entity.data.service.v1.AttributeValue;
import org.hypertrace.entity.data.service.v1.Entity;
import org.hypertrace.entity.data.service.v1.Query;
import org.hypertrace.entity.query.service.v1.BulkEntityUpdateRequest;
import org.hypertrace.entity.query.service.v1.BulkEntityUpdateRequest.EntityUpdateInfo;
import org.hypertrace.entity.query.service.v1.ColumnIdentifier;
import org.hypertrace.entity.query.service.v1.ColumnMetadata;
import org.hypertrace.entity.query.service.v1.EntityQueryRequest;
Expand All @@ -41,7 +37,6 @@
import org.hypertrace.entity.query.service.v1.SetAttribute;
import org.hypertrace.entity.query.service.v1.TotalEntitiesRequest;
import org.hypertrace.entity.query.service.v1.TotalEntitiesResponse;
import org.hypertrace.entity.query.service.v1.UpdateOperation;
import org.hypertrace.entity.query.service.v1.Value;
import org.hypertrace.entity.query.service.v1.ValueType;
import org.hypertrace.entity.service.constants.EntityServiceConstants;
Expand Down Expand Up @@ -240,8 +235,8 @@ Row convertToEntityQueryResult(
public void update(EntityUpdateRequest request, StreamObserver<ResultSetChunk> responseObserver) {
// Validations
RequestContext requestContext = RequestContext.CURRENT.get();
Optional<String> maybeTenantId = requestContext.getTenantId();
if (maybeTenantId.isEmpty()) {
Optional<String> tenantId = requestContext.getTenantId();
if (tenantId.isEmpty()) {
responseObserver.onError(new ServiceException("Tenant id is missing in the request."));
return;
}
Expand All @@ -261,9 +256,14 @@ public void update(EntityUpdateRequest request, StreamObserver<ResultSetChunk> r
doUpdate(requestContext, request);

// Finally return the selections
List<Entity> entities =
getProjectedEntities(
request.getEntityIdsList(), request.getSelectionList(), requestContext);
Query entitiesQuery = Query.newBuilder().addAllEntityId(request.getEntityIdsList()).build();
List<String> docStoreSelections =
entityQueryConverter.convertSelectionsToDocStoreSelections(
requestContext, request.getSelectionList());
Iterator<Document> documentIterator =
entitiesCollection.search(
DocStoreConverter.transform(tenantId.get(), entitiesQuery, docStoreSelections));
List<Entity> entities = convertDocsToEntities(documentIterator);
responseObserver.onNext(
convertEntitiesToResultSetChunk(requestContext, entities, request.getSelectionList()));
responseObserver.onCompleted();
Expand All @@ -274,7 +274,7 @@ public void update(EntityUpdateRequest request, StreamObserver<ResultSetChunk> r
}

private void doUpdate(RequestContext requestContext, EntityUpdateRequest request)
throws Exception {
throws IOException {
if (request.getOperation().hasSetAttribute()) {
SetAttribute setAttribute = request.getOperation().getSetAttribute();
String attributeId = setAttribute.getAttribute().getColumnName();
Expand All @@ -289,128 +289,20 @@ private void doUpdate(RequestContext requestContext, EntityUpdateRequest request
AttributeValue attributeValue =
EntityQueryConverter.convertToAttributeValue(setAttribute.getValue()).build();
String jsonValue = DocStoreJsonFormat.printer().print(attributeValue);
JSONDocument jsonDocument = new JSONDocument(jsonValue);

Map<Key, Map<String, Document>> entitiesUpdateMap = new HashMap<>();
for (String entityId : request.getEntityIdsList()) {
SingleValueKey key =
new SingleValueKey(requestContext.getTenantId().orElseThrow(), entityId);
if (entitiesUpdateMap.containsKey(key)) {
entitiesUpdateMap.get(key).put(subDocPath, jsonDocument);
} else {
Map<String, Document> subDocument = new HashMap<>();
subDocument.put(subDocPath, jsonDocument);
entitiesUpdateMap.put(key, subDocument);
// TODO better error reporting once doc store exposes the,
if (!entitiesCollection.updateSubDoc(key, subDocPath, new JSONDocument(jsonValue))) {
LOG.warn(
"Failed to update entity {}, subDocPath {}, with new doc {}.",
key,
subDocPath,
jsonValue);
}
}
try {
entitiesCollection.bulkUpdateSubDocs(entitiesUpdateMap);
} catch (Exception e) {
LOG.error(
"Failed to update entities {}, subDocPath {}, with new doc {}.",
entitiesUpdateMap,
subDocPath,
jsonValue,
e);
throw e;
}
}
}

@Override
public void bulkUpdate(
BulkEntityUpdateRequest request, StreamObserver<ResultSetChunk> responseObserver) {
// Validations
RequestContext requestContext = RequestContext.CURRENT.get();
Optional<String> maybeTenantId = requestContext.getTenantId();
if (maybeTenantId.isEmpty()) {
responseObserver.onError(new ServiceException("Tenant id is missing in the request."));
return;
}
if (StringUtils.isEmpty(request.getEntityType())) {
responseObserver.onError(new ServiceException("Entity type is missing in the request."));
return;
}
if (request.getEntitiesCount() == 0) {
responseObserver.onError(new ServiceException("Entities are missing in the request."));
}
Map<String, EntityUpdateInfo> entitiesMap = request.getEntitiesMap();
try {
doBulkUpdate(requestContext, entitiesMap);
responseObserver.onCompleted();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will try to look through this more carefully at some point today, but one thing that jumps out at me off the bat - we never actually onNext a result set chunk here that I can see.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed a bit offline. There may not be a bug here, but it has a very unintuitive api that can easily lead to corruption - a Value going in looks like a oneof style object but is not, so both the field and type need to be set. If they're not both set, we may be changing data types of persisted data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, the API isn't supposed to send any response, as of now. We can just change the API to just send an empty response as of now, instead of result set chunk

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're going to break the new api by changing its return type anyway, perhaps it makes sense to fix the value type there?

} catch (Exception e) {
responseObserver.onError(
new ServiceException("Error occurred while executing " + request, e));
}
}

private List<Entity> getProjectedEntities(
Iterable<String> entityIdsList,
List<Expression> selectionList,
RequestContext requestContext) {
Query entitiesQuery = Query.newBuilder().addAllEntityId(entityIdsList).build();
List<String> docStoreSelections =
entityQueryConverter.convertSelectionsToDocStoreSelections(requestContext, selectionList);
Iterator<Document> documentIterator =
entitiesCollection.search(
DocStoreConverter.transform(
requestContext.getTenantId().orElseThrow(), entitiesQuery, docStoreSelections));
return convertDocsToEntities(documentIterator);
}

private void doBulkUpdate(
RequestContext requestContext, Map<String, EntityUpdateInfo> entitiesMap) throws Exception {
Map<Key, Map<String, Document>> entitiesUpdateMap = new HashMap<>();
for (String entityId : entitiesMap.keySet()) {
Map<String, Document> transformedUpdateOperations =
transformUpdateOperations(
entitiesMap.get(entityId).getUpdateOperationList(), requestContext);
if (transformedUpdateOperations.isEmpty()) {
continue;
}
entitiesUpdateMap.put(
new SingleValueKey(requestContext.getTenantId().orElseThrow(), entityId),
transformedUpdateOperations);
}

if (entitiesUpdateMap.isEmpty()) {
LOG.error("There are no entities to update!");
return;
}

try {
entitiesCollection.bulkUpdateSubDocs(entitiesUpdateMap);
} catch (Exception e) {
LOG.error("Failed to update entities {}", entitiesMap, e);
throw e;
}
}

private Map<String, Document> transformUpdateOperations(
List<UpdateOperation> updateOperationList, RequestContext requestContext) throws Exception {
Map<String, Document> documentMap = new HashMap<>();
for (UpdateOperation updateOperation : updateOperationList) {
if (!updateOperation.hasSetAttribute()) {
continue;
}
SetAttribute setAttribute = updateOperation.getSetAttribute();
String attributeId = setAttribute.getAttribute().getColumnName();
String subDocPath =
entityAttributeMapping
.getDocStorePathByAttributeId(requestContext, attributeId)
.orElseThrow(
() -> new IllegalArgumentException("Unknown attribute FQN " + attributeId));
AttributeValue attributeValue =
EntityQueryConverter.convertToAttributeValue(setAttribute.getValue()).build();
try {
String jsonValue = DocStoreJsonFormat.printer().print(attributeValue);
documentMap.put(subDocPath, new JSONDocument(jsonValue));
} catch (Exception e) {
LOG.error("Failed to put update corresponding to {} in the documentMap", subDocPath, e);
throw e;
}
}
return Collections.unmodifiableMap(documentMap);
}

@Override
Expand Down
Loading