Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add text embedding processor to neural search #18

Merged
merged 5 commits into from
Oct 20, 2022

Conversation

zane-neo
Copy link
Collaborator

@zane-neo zane-neo commented Oct 12, 2022

Signed-off-by: Zan Niu zaniu@amazon.com

Description

[Describe what this change achieves]

Issues Resolved

[List any issues this PR will resolve]

Check List

  • New functionality includes testing.
    • All tests pass
  • New functionality has been documented.
    • New functionality has javadoc added
  • Commits are signed as per the DCO using --signoff

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@zane-neo zane-neo requested review from a team, navneet1v and ylwu-amzn October 12, 2022 02:34
build.gradle Outdated Show resolved Hide resolved
} else {
this.fieldMap = fieldMap;
}
this.mlCommonsClientAccessor = new MLCommonsClientAccessor(new MachineLearningNodeClient(client));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to create this, you can use @Inject Annotation to inject the MLCommonsClientAccessor. We are creating this via CreateComponents function from the Plugin class.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New Class is not a best practice indeed, but Processors are created by Factory instead of injection, also the NeuralSearch plugin needs an implementation of IngestionPlugin, check below code in NeuralSearch:

@Override
    public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
        return Collections.singletonMap(TextEmbeddingProcessor.TYPE, new TextEmbeddingProcessor.Factory(parameters.client));
    }

We need to return a factory map, and the instance creation happens out of our code by invoking the factory.create automatically, and a @inject field won't be initialized correctly.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even with the

@Override
    public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
        return Collections.singletonMap(TextEmbeddingProcessor.TYPE, new TextEmbeddingProcessor.Factory(parameters.client));
    }

You can use the same instance that we have created using createComponents. I would say rather than passing client in the new TextEmbeddingProcessor.Factory(parameters.client) pass the MLCommonsAccessor.

Comment on lines 64 to 86
ActionListener<List<List<Float>>> internalListener = ActionListener.wrap(
responseConsumer(ingestDocument, knnMap),
exceptionConsumer()
);

mlCommonsClientAccessor.inferenceSentences(this.modelId, buildMLInput(knnMap), internalListener);
return ingestDocument;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am little confused here, if we are returning the ingestDocument back just after calling the mlCommonsClientAccessor.inferenceSentences, how does the document is getting updated? as the call is async call.

shouldn't we be doing the internalListener.onResponse kind of thing?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, please check the responseConsumer, it's a CheckedConsumer and being passed to the ActionListener, the onResponse invocation will invoke the function created in responseConsumer.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the question was around, does this function execution will be stopped until we get the inference response?

Comment on lines 83 to 94
Consumer<Exception> exceptionConsumer() {
return exception -> log.error(exception.getMessage(), exception);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are logging the exception, shouldn't we fail the ingestion request for that document.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

User has the flexibility to determine what to do when error arise, please refer to this: https://www.elastic.co/guide/en/elasticsearch/reference/master/ingest.html#handling-pipeline-failures. And a processor itself doesn't have the capability to prevent document ingestion.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use can set the tags of ignore_failure to proceed. But at least we need to announce the failure if they choose not to ignore_failure.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if we want to announce the failures, we should put a proper message instead of saying exception.getMessage().

}

@SuppressWarnings({ "unchecked" })
private List<String> buildMLInput(Map<String, Object> knnMap) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename this function to represent what it is doing, I can see that it is creating the sentences list which needs to be inferenced

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@VisibleForTesting
CheckedConsumer<List<List<Float>>, Exception> responseConsumer(IngestDocument ingestDocument, Map<String, Object> knnMap) {
return res -> {
Objects.requireNonNull(res, "embedding failed!");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not enough context in the message.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to check inference return value, null without exception is a rare case. Added a little more info to make the error more clear.

return TYPE;
}

public static final class Factory implements Processor.Factory {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can move this class out to a different file and rename the class to TextEmbeddingProcessorFactory.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the opensearch style, because the factory code is little and the logic of processor creation is extremely related to the ingestion logic.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a quick comment its not an OpenSearch style. Given the size of Processor class which you have provided as example it might be fine to put in the same file, but the current TextEmbeddingProcessor class is big enough which already makes it difficult to read. So, as best practice let's move Factory class out of this file.

You can put the class under src/main/java/org/opensearch/neuralsearch/processor/factory.


@VisibleForTesting
Consumer<Exception> exceptionConsumer() {
return exception -> log.error(exception.getMessage(), exception);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should enhance the log message

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@navneet1v
Copy link
Collaborator

Signed-off-by: Zan Niu zaniu@amazon.com

Description

[Describe what this change achieves]

Issues Resolved

[List any issues this PR will resolve]

Check List

  • New functionality includes testing.

    • All tests pass
  • New functionality has been documented.

    • New functionality has javadoc added
  • Commits are signed as per the DCO using --signoff

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Update the description and issue link.

@navneet1v
Copy link
Collaborator

navneet1v commented Oct 13, 2022

Signed-off-by: Zan Niu zaniu@amazon.com

Description

[Describe what this change achieves]

Issues Resolved

[List any issues this PR will resolve]

Check List

  • New functionality includes testing.

    • All tests pass
  • New functionality has been documented.

    • New functionality has javadoc added
  • Commits are signed as per the DCO using --signoff

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Update the description and issue link.

@zane-neo please fix this

build.gradle Outdated Show resolved Hide resolved
Comment on lines 28 to 31

@Log4j2
public class TextEmbeddingProcessor extends AbstractProcessor {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add Java Doc on all the public functions and public classes in main/java.


@Override
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
return Collections.singletonMap(TextEmbeddingProcessor.TYPE, new TextEmbeddingProcessor.Factory(parameters.client));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than passing client here, pass the MLCommonsClientAccessor which we are creating via createComponents. You can achieve this by creating a class variable for MLCommonsClientAccessor and then passing the same variable while creating the TextEmbeddingProcessor


public TextEmbeddingProcessor(String tag, String description, String modelId, Map<String, Object> fieldMap, Client client) {
super(tag, description);
this.modelId = Objects.requireNonNull(modelId, "model_id is null, can not process it");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should validate modelId for null and empty string.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

super(tag, description);
this.modelId = Objects.requireNonNull(modelId, "model_id is null, can not process it");
if (fieldMap == null || fieldMap.size() == 0 || checkEmbeddingConfigNotValid(fieldMap)) {
throw new IllegalArgumentException("filed_map is null, can not process it");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we say something like this in error message:

Unable to create the TextEmbedding processor as field_map is null or empty.

I would recommend putting more thoughts on each and every error message that is getting generated as those messages are directly read by users.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to debug level.

return state(new ClusterName("test"), indexName, mapping, clusterManagerNode, clusterManagerNode, allNodes);
}

public static ClusterState setupTestClusterState() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this to a base class, as suggested for upload and loadModel functions.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 166 to 175
public static RestRequest getStatsRestRequest() {
RestRequest request = new FakeRestRequest.Builder(getXContentRegistry()).build();
return request;
}

public static RestRequest getStatsRestRequest(String nodeId, String stat) {
RestRequest request = new FakeRestRequest.Builder(getXContentRegistry()).withParams(ImmutableMap.of("nodeId", nodeId, "stat", stat))
.build();
return request;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not able to see the usage of these 2 functions

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

return makeRequest(client, method, endpoint, params, entity, headers, false);
}

public static Response makeRequest(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are already created functions in OpenSearchRestTestCase class, try to look into them. If you are still not able to find it, move these functions to base class as suggested for upload and loadModel functions, so that all IT can take benefit of.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't found the function similar to this makeRequest, moved upload and loadModel and this function to BaseNeuralSearchIT.class

Comment on lines 151 to 153
public static RestStatus restStatus(Response response) {
return RestStatus.fromCode(response.getStatusLine().getStatusCode());
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not able to see usage of this function.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

Comment on lines 130 to 140
Response response = TestHelper.makeRequest(
client(),
"POST",
indexName + "/_doc",
null,
TestHelper.toHttpEntity(
FileUtils.readFileToString(new File(classLoader.getResource("processor/IngestDocument.json").getFile()), "utf-8")
),
ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, "Kibana"))
);
JsonNode node = objectMapper.readTree(EntityUtils.toString(response.getEntity()));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check other Integration tests on how to read the responses.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to XContent approach.

* @throws ExecutionException If the underlying task failed, this exception will be thrown in the future.get().
* @throws InterruptedException If the thread is interrupted, this will be thrown.
*/
public List<List<Float>> blockingInferenceSentences(@NonNull final String modelId, @NonNull final List<String> inputText)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why explicitly call this "blockingInferenceSentences"? From what I have seen in OpenSearch, blocking versus non-blocking distinction is made by whether a listener is passed as an argument.

Checkout OpenSearchClient

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change the naming.

build.gradle Outdated Show resolved Hide resolved
this.mlCommonsClientAccessor = clientAccessor;
}

private boolean checkEmbeddingConfigNotValid(Map<String, Object> fieldMap) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


private static final Locale locale = Locale.getDefault();

public void test_text_embedding_processor() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Camelcase for each component in test name (suggestion from Google style guide) -> testTextEmbeddingProcessor

If there are multiple tests, then we can separate components with _. For example: testTextEmbeddingProcessor_whenInputInvalid_thenThrowException

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed tests method names to google style.

return uploadModel(requestBody);
}

private void createPipelineProcessor(String modelId) throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be moved to BaseClass so other ITs can use it?

package org.opensearch.neuralsearch.utils;

public class TestHelper {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whats this for?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

"framework_type": "sentence_transformers",
"all_config": "{\"architectures\":[\"BertModel\"],\"max_position_embeddings\":512,\"model_type\":\"bert\",\"num_attention_heads\":12,\"num_hidden_layers\":6}"
},
"url": "https://api.quip-amazon.com/2/blob/MdZ9AAsfqat/y-6nBQpg6Ma_UEE3pYt2NQ?name=all-MiniLM-L6-v2.zip&oauth_token=TUhMOU1BV1gwWUE%3D%7C1695445530%7CNN8X0Y0SQ0NfJvMxNZCnumpJaurxCDaT%2FdK70Al%2Bgh0%3D&s=YkW8AVqTosiF"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this link available outside of amazon? If no, we cannot embed it here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ylwu-amzn , please chime in and offer help on this.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a public link. Suggest put the model inside your test resource folder, then use local file url.

src/test/resources/processor/UploadModelRequestBody.json Outdated Show resolved Hide resolved
@ylwu-amzn
Copy link

How about adding more details in description? Suggest adding some examples there so people know what feature this PR is building.

Comment on lines 29 to 31
private static final String indexName = "text_embedding_index";

private static final ObjectMapper objectMapper = new ObjectMapper();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make static final variable UPPER_CASE, that is a general convention for all static final variables

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


private static final ObjectMapper objectMapper = new ObjectMapper();

private static final Locale locale = Locale.getDefault();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable is not used.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

@@ -95,6 +103,25 @@ public void testInferenceSentences_whenExceptionFromMLClient_thenFailure() {
Mockito.verifyNoMoreInteractions(resultListener);
}

public void test_blockingInferenceSentences() throws ExecutionException, InterruptedException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return numbers;
}

private static void validateEmbeddingFieldsType(IngestDocument ingestDocument, Map<String, Object> embeddingFields) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason for making this and other functions static? if no please make them no static.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Static functions means this method do not depends on any stateful data, so other methods can use this without concerns of the state changing. Please let me know your thoughts on making it no static. Thanks.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this function is not dependent on any stateful correct that is correct, but it is always used in the context of a Stateful and that too only in this class.

So, seems like with your assumption if a function is not dependent on a state it should always be static. That seems to be an overkill. Static function lives in JVM heap and generally created to keep data in heap and make them run faster. We don't have any of the case here.

If this function was present in another class then we might want to make this function static as we don't want to create objects at runtime as object creation is expensive. But that is not the case here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Static function lives in JVM heap and generally created to keep data in heap and make them run faster. We don't have any of the case here.
If this function was present in another class then we might want to make this function static as we don't want to create objects at runtime as object creation is expensive. But that is not the case here.

The reducing of time complexity is the benefit we can get from a static method instead of the unnecessary of object creation, please refer to: https://stackoverflow.com/a/135038. This running faster itself is a benefit for us.

knnMap.entrySet().stream().filter(knnMapEntry -> knnMapEntry.getValue() != null).forEach(knnMapEntry -> {
Object sourceValue = knnMapEntry.getValue();
if (sourceValue instanceof List) {
((List<String>) sourceValue).stream().filter(StringUtils::isNotBlank).forEach(texts::add);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a comment here, which tells what when we will be building the output vector list for this list, we need to add empty vector list to define the string was empty hence no text embeddings.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The final decision is: null or empty string value in a list will cause exception.

Comment on lines 214 to 216
if (StringUtils.isNotBlank(strSourceValue)) {
numbers.add(ImmutableMap.of(LIST_TYPE_NESTED_MAP_KEY, modelTensorList.get(indexWrapper.index++)));
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we remove blank strings here, then the order vector lists will be wrong. Example:

Input
["This is"," ","Hello world"]

Output vectors in document:
[{"vector":[1.0,2.0]},{"vector":[4.0,3.0]}]

This seems to be wrong. I also think we add an empty vector list here that will cause the exception. Can we go with vector list of 0.0 values ?

@jmazanec15 do we see any issue with 0.0 value? Will is cause any impact on the K-NN?

@ylwu-amzn if we pass a blank string or a string with spaces to ML Algorithms what will happen? will it fail? I think ans is no. want to confirm.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I think this method should not skip strings - instead, it should get what was passed to it. k-NN may have issues with an all 0.0 vector for cosine similarity type, but I think it should work. Remember:

Cos(x, y) = x . y / ||x|| * ||y||

Comment on lines 80 to 98
while (!isComplete) {
taskQueryResult = getTaskQueryResponse(taskId);
isComplete = checkComplete(taskQueryResult);
Copy link
Collaborator

@navneet1v navneet1v Oct 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very brute force way of checking if model is uploaded or not. This is very resource intensive plus there is no time out which makes can make it run forever and just increase the build time.

Suggestion:

  1. We should move this checking of model upload to another thread.
  2. Add a timeout(3 times than the actual time, or let say 1 min) after which we will fail the test saying that model is not uploaded, and provide proper response why the tests failed and add things like increase the time out and other things.
  3. To avoid the resource intensive work, we should also provide some sleep time in the thread which is checking the model upload if we apply "1" suggestion.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add max retry time and sleep time in the thread.

Comment on lines 51 to 61
builder.startObject()
.startObject("index")
.field("knn", true)
.field("knn.algo_param.ef_search", 100)
.field("refresh_interval", "30s")
.field("default_pipeline", pipelineName)
.endObject()
.field("number_of_shards", 1)
.field("number_of_replicas", 0)
.endObject()
.endObject()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried running this test it failed at the line 61. I see that we are adding 1 more extra endObject which is not required.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Command to rerun the test:

./gradlew ':integTest' --tests "org.opensearch.neuralsearch.processor.TextEmbeddingProcessorIT.test_text_embedding_processor" -Dtests.seed=173A58A0D4C0E3A0 -Dtests.security.manager=false -Dtests.locale=es-SV -Dtests.timezone=Asia/Seoul

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

createIndex(
indexName,
Settings.builder().loadFromSource(settings, XContentType.JSON).build(),
Files.readString(Path.of(classLoader.getResource("processor/IndexMappings.json").toURI()))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as we are reading a JSON object from file, while reading the object you might want to remove "{" and "}" from start and end. I tried removing then it worked for.

Example reference: https://github.com/opensearch-project/k-NN/blob/48d2303ad6964d386709ab5ae5fdbb0965420cb8/src/testFixtures/java/org/opensearch/knn/KNNRestTestCase.java#L638-L639

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method encapsulation is really not friendly to use, I prefer to create our own method.

@Override
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
final MachineLearningNodeClient machineLearningNodeClient = new MachineLearningNodeClient(parameters.client);
mlCommonsClientAccessor = new MLCommonsClientAccessor(machineLearningNodeClient);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Map<String, Object> config
) throws Exception {
String modelId = readStringProperty(TYPE, processorTag, config, MODEL_ID_FIELD);
Map<String, Object> filedMap = readOptionalMap(TYPE, processorTag, config, FIELD_MAP_FIELD);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why optional? If this doesnt exist, what is the purpose of the processor?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to non optional method, this will throw a configuration exception when this map is missing.

this.mlCommonsClientAccessor = clientAccessor;
}

private boolean isEmbeddingConfigValid(Map<String, Object> fieldMap) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should return true if it is valid. False otherwise.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, why not make this a validation method (i.e. validateEmbeddingConfig or checkEmbeddingConfig) and have it throw an IllegalArgumentException, similar to the ones at the end of this class (checkListElementsType, validateEmbeddingFieldsType)

We can add the following to the method

fieldMap == null || fieldMap.size() == 0

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 214 to 216
if (StringUtils.isNotBlank(strSourceValue)) {
numbers.add(ImmutableMap.of(LIST_TYPE_NESTED_MAP_KEY, modelTensorList.get(indexWrapper.index++)));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I think this method should not skip strings - instead, it should get what was passed to it. k-NN may have issues with an all 0.0 vector for cosine similarity type, but I think it should work. Remember:

Cos(x, y) = x . y / ||x|| * ||y||

}

private void validateEmbeddingConfiguration(Map<String, Object> fieldMap) {
if (fieldMap == null || fieldMap.size() == 0 || fieldMap.entrySet()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if a user can create an arbitrarily large fieldMap? If so, should this be limited, from a security perspective?

String sourceKey = embeddingFieldsEntry.getKey();
Class<?> sourceValueClass = sourceValue.getClass();
if (List.class.isAssignableFrom(sourceValueClass) || Map.class.isAssignableFrom(sourceValueClass)) {
validateNestedTypeValue(sourceKey, sourceValue);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to above comment: are there any limits on the depth of nested parameter that could be passed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, there's no limits. Nested type will not be explicitly mentioned in the doc, so users use only raw string or list type. If we receive feedback on supporting nested type, we can then tell user how.

}

@VisibleForTesting
Map<String, Object> buildMapWithKnnKeyAndOriginalValue(IngestDocument ingestDocument, Map<String, Object> fieldMap) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private method: do we need to pass fieldMap if its a member?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this from parameter.

# This is the 1st commit message:

Add text embedding processor to neural search

Signed-off-by: Zan Niu <zaniu@amazon.com>

# The commit message opensearch-project#2 will be skipped:

# Code format
#
# Signed-off-by: Zan Niu <zaniu@amazon.com>

# The commit message opensearch-project#3 will be skipped:

# Address review comments
#
# Signed-off-by: Zan Niu <zaniu@amazon.com>

# The commit message opensearch-project#4 will be skipped:

# Add blocking text embedding method for pipeline processor
#
# Signed-off-by: Zan Niu <zaniu@amazon.com>

# The commit message opensearch-project#5 will be skipped:

# Add BaseNeuralSearchIT and address other review comments
#
# Signed-off-by: Zan Niu <zaniu@amazon.com>

# The commit message opensearch-project#6 will be skipped:

# Add BaseNeuralSearchIT and address other review comments
#
# Signed-off-by: Zan Niu <zaniu@amazon.com>

# The commit message opensearch-project#7 will be skipped:

# Add BaseNeuralSearchIT and address other review comments
#
# Signed-off-by: Zan Niu <zaniu@amazon.com>

# The commit message opensearch-project#8 will be skipped:

# Fix naming convention and IT function move to base
#
# Signed-off-by: Zan Niu <zaniu@amazon.com>

# The commit message opensearch-project#9 will be skipped:

# Fix naming convention and IT function move to base
#
# Signed-off-by: Zan Niu <zaniu@amazon.com>

# The commit message opensearch-project#10 will be skipped:

# Update src/main/java/org/opensearch/neuralsearch/ml/MLCommonsClientAccessor.java
#
# Co-authored-by: Navneet Verma <vermanavneet003@gmail.com>

# The commit message opensearch-project#11 will be skipped:

# Update src/main/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessor.java
#
# Co-authored-by: Navneet Verma <vermanavneet003@gmail.com>

# The commit message opensearch-project#12 will be skipped:

# Fix code review comments
#
# Signed-off-by: Zan Niu <zaniu@amazon.com>

# The commit message opensearch-project#13 will be skipped:

# Fix text embedding processor NPE
#
# Signed-off-by: Zan Niu <zaniu@amazon.com>

# The commit message opensearch-project#14 will be skipped:

# Remove jackson dependencies and fix tests with XCoontent
#
# Signed-off-by: Zan Niu <zaniu@amazon.com>
Signed-off-by: Zan Niu <zaniu@amazon.com>
@zane-neo zane-neo force-pushed the text-embedding-processor branch from 117249c to e3cca7b Compare October 20, 2022 01:26
Signed-off-by: Zan Niu <zaniu@amazon.com>
Signed-off-by: Zan Niu <zaniu@amazon.com>
Signed-off-by: Zan Niu <zaniu@amazon.com>
Copy link
Collaborator

@model-collapse model-collapse left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@zane-neo zane-neo merged commit 799c402 into opensearch-project:main Oct 20, 2022
@jmazanec15 jmazanec15 added the Features Introduces a new unit of functionality that satisfies a requirement label Nov 3, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Features Introduces a new unit of functionality that satisfies a requirement
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants