-
Notifications
You must be signed in to change notification settings - Fork 73
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add text embedding processor to neural search #18
Add text embedding processor to neural search #18
Conversation
} else { | ||
this.fieldMap = fieldMap; | ||
} | ||
this.mlCommonsClientAccessor = new MLCommonsClientAccessor(new MachineLearningNodeClient(client)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need to create this, you can use @Inject Annotation to inject the MLCommonsClientAccessor. We are creating this via CreateComponents function from the Plugin class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New Class is not a best practice indeed, but Processors
are created by Factory
instead of injection, also the NeuralSearch
plugin needs an implementation of IngestionPlugin
, check below code in NeuralSearch
:
@Override
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
return Collections.singletonMap(TextEmbeddingProcessor.TYPE, new TextEmbeddingProcessor.Factory(parameters.client));
}
We need to return a factory map, and the instance creation happens out of our code by invoking the factory.create automatically, and a @inject
field won't be initialized correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
even with the
@Override
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
return Collections.singletonMap(TextEmbeddingProcessor.TYPE, new TextEmbeddingProcessor.Factory(parameters.client));
}
You can use the same instance that we have created using createComponents. I would say rather than passing client in the new TextEmbeddingProcessor.Factory(parameters.client)
pass the MLCommonsAccessor.
ActionListener<List<List<Float>>> internalListener = ActionListener.wrap( | ||
responseConsumer(ingestDocument, knnMap), | ||
exceptionConsumer() | ||
); | ||
|
||
mlCommonsClientAccessor.inferenceSentences(this.modelId, buildMLInput(knnMap), internalListener); | ||
return ingestDocument; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am little confused here, if we are returning the ingestDocument back just after calling the mlCommonsClientAccessor.inferenceSentences, how does the document is getting updated? as the call is async call.
shouldn't we be doing the internalListener.onResponse kind of thing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, please check the responseConsumer
, it's a CheckedConsumer
and being passed to the ActionListener, the onResponse invocation will invoke the function created in responseConsumer
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the question was around, does this function execution will be stopped until we get the inference response?
Consumer<Exception> exceptionConsumer() { | ||
return exception -> log.error(exception.getMessage(), exception); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we are logging the exception, shouldn't we fail the ingestion request for that document.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
User has the flexibility to determine what to do when error arise, please refer to this: https://www.elastic.co/guide/en/elasticsearch/reference/master/ingest.html#handling-pipeline-failures. And a processor itself doesn't have the capability to prevent document ingestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The use can set the tags of ignore_failure to proceed. But at least we need to announce the failure if they choose not to ignore_failure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even if we want to announce the failures, we should put a proper message instead of saying exception.getMessage().
} | ||
|
||
@SuppressWarnings({ "unchecked" }) | ||
private List<String> buildMLInput(Map<String, Object> knnMap) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename this function to represent what it is doing, I can see that it is creating the sentences list which needs to be inferenced
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@VisibleForTesting | ||
CheckedConsumer<List<List<Float>>, Exception> responseConsumer(IngestDocument ingestDocument, Map<String, Object> knnMap) { | ||
return res -> { | ||
Objects.requireNonNull(res, "embedding failed!"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not enough context in the message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to check inference return value, null without exception is a rare case. Added a little more info to make the error more clear.
return TYPE; | ||
} | ||
|
||
public static final class Factory implements Processor.Factory { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can move this class out to a different file and rename the class to TextEmbeddingProcessorFactory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems this is a standard style of processor factory, this is an example: https://github.com/opensearch-project/OpenSearch/blob/d15795a7aca488c1fadb04b3c8d9f1a3b02e4056/modules/ingest-common/src/main/java/org/opensearch/ingest/common/SetProcessor.java#L115
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the opensearch style, because the factory code is little and the logic of processor creation is extremely related to the ingestion logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a quick comment its not an OpenSearch style. Given the size of Processor class which you have provided as example it might be fine to put in the same file, but the current TextEmbeddingProcessor class is big enough which already makes it difficult to read. So, as best practice let's move Factory class out of this file.
You can put the class under src/main/java/org/opensearch/neuralsearch/processor/factory
.
src/main/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessor.java
Show resolved
Hide resolved
|
||
@VisibleForTesting | ||
Consumer<Exception> exceptionConsumer() { | ||
return exception -> log.error(exception.getMessage(), exception); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should enhance the log message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Update the description and issue link. |
@zane-neo please fix this |
|
||
@Log4j2 | ||
public class TextEmbeddingProcessor extends AbstractProcessor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add Java Doc on all the public functions and public classes in main/java.
|
||
@Override | ||
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) { | ||
return Collections.singletonMap(TextEmbeddingProcessor.TYPE, new TextEmbeddingProcessor.Factory(parameters.client)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rather than passing client here, pass the MLCommonsClientAccessor which we are creating via createComponents. You can achieve this by creating a class variable for MLCommonsClientAccessor and then passing the same variable while creating the TextEmbeddingProcessor
|
||
public TextEmbeddingProcessor(String tag, String description, String modelId, Map<String, Object> fieldMap, Client client) { | ||
super(tag, description); | ||
this.modelId = Objects.requireNonNull(modelId, "model_id is null, can not process it"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should validate modelId for null and empty string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure
super(tag, description); | ||
this.modelId = Objects.requireNonNull(modelId, "model_id is null, can not process it"); | ||
if (fieldMap == null || fieldMap.size() == 0 || checkEmbeddingConfigNotValid(fieldMap)) { | ||
throw new IllegalArgumentException("filed_map is null, can not process it"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we say something like this in error message:
Unable to create the TextEmbedding processor as field_map is null or empty.
I would recommend putting more thoughts on each and every error message that is getting generated as those messages are directly read by users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to debug level.
return state(new ClusterName("test"), indexName, mapping, clusterManagerNode, clusterManagerNode, allNodes); | ||
} | ||
|
||
public static ClusterState setupTestClusterState() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move this to a base class, as suggested for upload and loadModel functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
public static RestRequest getStatsRestRequest() { | ||
RestRequest request = new FakeRestRequest.Builder(getXContentRegistry()).build(); | ||
return request; | ||
} | ||
|
||
public static RestRequest getStatsRestRequest(String nodeId, String stat) { | ||
RestRequest request = new FakeRestRequest.Builder(getXContentRegistry()).withParams(ImmutableMap.of("nodeId", nodeId, "stat", stat)) | ||
.build(); | ||
return request; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not able to see the usage of these 2 functions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
return makeRequest(client, method, endpoint, params, entity, headers, false); | ||
} | ||
|
||
public static Response makeRequest( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there are already created functions in OpenSearchRestTestCase class, try to look into them. If you are still not able to find it, move these functions to base class as suggested for upload and loadModel functions, so that all IT can take benefit of.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't found the function similar to this makeRequest, moved upload and loadModel and this function to BaseNeuralSearchIT.class
public static RestStatus restStatus(Response response) { | ||
return RestStatus.fromCode(response.getStatusLine().getStatusCode()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not able to see usage of this function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
Response response = TestHelper.makeRequest( | ||
client(), | ||
"POST", | ||
indexName + "/_doc", | ||
null, | ||
TestHelper.toHttpEntity( | ||
FileUtils.readFileToString(new File(classLoader.getResource("processor/IngestDocument.json").getFile()), "utf-8") | ||
), | ||
ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, "Kibana")) | ||
); | ||
JsonNode node = objectMapper.readTree(EntityUtils.toString(response.getEntity())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check other Integration tests on how to read the responses.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to XContent approach.
* @throws ExecutionException If the underlying task failed, this exception will be thrown in the future.get(). | ||
* @throws InterruptedException If the thread is interrupted, this will be thrown. | ||
*/ | ||
public List<List<Float>> blockingInferenceSentences(@NonNull final String modelId, @NonNull final List<String> inputText) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why explicitly call this "blockingInferenceSentences"? From what I have seen in OpenSearch, blocking versus non-blocking distinction is made by whether a listener is passed as an argument.
Checkout OpenSearchClient
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change the naming.
src/main/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessor.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessor.java
Outdated
Show resolved
Hide resolved
this.mlCommonsClientAccessor = clientAccessor; | ||
} | ||
|
||
private boolean checkEmbeddingConfigNotValid(Map<String, Object> fieldMap) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: isEmbeddingConfigValid (negate output) -> related post https://softwareengineering.stackexchange.com/questions/196830/boolean-method-naming-affirmative-vs-negative
|
||
private static final Locale locale = Locale.getDefault(); | ||
|
||
public void test_text_embedding_processor() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Camelcase for each component in test name (suggestion from Google style guide) -> testTextEmbeddingProcessor
If there are multiple tests, then we can separate components with _. For example: testTextEmbeddingProcessor_whenInputInvalid_thenThrowException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed tests method names to google style.
return uploadModel(requestBody); | ||
} | ||
|
||
private void createPipelineProcessor(String modelId) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be moved to BaseClass so other ITs can use it?
package org.opensearch.neuralsearch.utils; | ||
|
||
public class TestHelper { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whats this for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
"framework_type": "sentence_transformers", | ||
"all_config": "{\"architectures\":[\"BertModel\"],\"max_position_embeddings\":512,\"model_type\":\"bert\",\"num_attention_heads\":12,\"num_hidden_layers\":6}" | ||
}, | ||
"url": "https://api.quip-amazon.com/2/blob/MdZ9AAsfqat/y-6nBQpg6Ma_UEE3pYt2NQ?name=all-MiniLM-L6-v2.zip&oauth_token=TUhMOU1BV1gwWUE%3D%7C1695445530%7CNN8X0Y0SQ0NfJvMxNZCnumpJaurxCDaT%2FdK70Al%2Bgh0%3D&s=YkW8AVqTosiF" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this link available outside of amazon? If no, we cannot embed it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ylwu-amzn , please chime in and offer help on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a public link. Suggest put the model inside your test resource folder, then use local file url.
How about adding more details in description? Suggest adding some examples there so people know what feature this PR is building. |
private static final String indexName = "text_embedding_index"; | ||
|
||
private static final ObjectMapper objectMapper = new ObjectMapper(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make static final variable UPPER_CASE, that is a general convention for all static final variables
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
private static final ObjectMapper objectMapper = new ObjectMapper(); | ||
|
||
private static final Locale locale = Locale.getDefault(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This variable is not used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
@@ -95,6 +103,25 @@ public void testInferenceSentences_whenExceptionFromMLClient_thenFailure() { | |||
Mockito.verifyNoMoreInteractions(resultListener); | |||
} | |||
|
|||
public void test_blockingInferenceSentences() throws ExecutionException, InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use @SneakyThrows.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
src/main/java/org/opensearch/neuralsearch/ml/MLCommonsClientAccessor.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/neuralsearch/plugin/NeuralSearch.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessor.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessor.java
Show resolved
Hide resolved
return numbers; | ||
} | ||
|
||
private static void validateEmbeddingFieldsType(IngestDocument ingestDocument, Map<String, Object> embeddingFields) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason for making this and other functions static? if no please make them no static.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Static functions means this method do not depends on any stateful data, so other methods can use this without concerns of the state changing. Please let me know your thoughts on making it no static. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, this function is not dependent on any stateful correct that is correct, but it is always used in the context of a Stateful and that too only in this class.
So, seems like with your assumption if a function is not dependent on a state it should always be static. That seems to be an overkill. Static function lives in JVM heap and generally created to keep data in heap and make them run faster. We don't have any of the case here.
If this function was present in another class then we might want to make this function static as we don't want to create objects at runtime as object creation is expensive. But that is not the case here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Static function lives in JVM heap and generally created to keep data in heap and make them run faster. We don't have any of the case here.
If this function was present in another class then we might want to make this function static as we don't want to create objects at runtime as object creation is expensive. But that is not the case here.
The reducing of time complexity is the benefit we can get from a static method instead of the unnecessary of object creation, please refer to: https://stackoverflow.com/a/135038. This running faster itself is a benefit for us.
knnMap.entrySet().stream().filter(knnMapEntry -> knnMapEntry.getValue() != null).forEach(knnMapEntry -> { | ||
Object sourceValue = knnMapEntry.getValue(); | ||
if (sourceValue instanceof List) { | ||
((List<String>) sourceValue).stream().filter(StringUtils::isNotBlank).forEach(texts::add); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a comment here, which tells what when we will be building the output vector list for this list, we need to add empty vector list to define the string was empty hence no text embeddings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The final decision is: null or empty string value in a list will cause exception.
if (StringUtils.isNotBlank(strSourceValue)) { | ||
numbers.add(ImmutableMap.of(LIST_TYPE_NESTED_MAP_KEY, modelTensorList.get(indexWrapper.index++))); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we remove blank strings here, then the order vector lists will be wrong. Example:
Input
["This is"," ","Hello world"]
Output vectors in document:
[{"vector":[1.0,2.0]},{"vector":[4.0,3.0]}]
This seems to be wrong. I also think we add an empty vector list here that will cause the exception. Can we go with vector list of 0.0 values ?
@jmazanec15 do we see any issue with 0.0 value? Will is cause any impact on the K-NN?
@ylwu-amzn if we pass a blank string or a string with spaces to ML Algorithms what will happen? will it fail? I think ans is no. want to confirm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I think this method should not skip strings - instead, it should get what was passed to it. k-NN may have issues with an all 0.0 vector for cosine similarity type, but I think it should work. Remember:
Cos(x, y) = x . y / ||x|| * ||y||
src/main/java/org/opensearch/neuralsearch/ml/MLCommonsClientAccessor.java
Outdated
Show resolved
Hide resolved
while (!isComplete) { | ||
taskQueryResult = getTaskQueryResponse(taskId); | ||
isComplete = checkComplete(taskQueryResult); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a very brute force way of checking if model is uploaded or not. This is very resource intensive plus there is no time out which makes can make it run forever and just increase the build time.
Suggestion:
- We should move this checking of model upload to another thread.
- Add a timeout(3 times than the actual time, or let say 1 min) after which we will fail the test saying that model is not uploaded, and provide proper response why the tests failed and add things like increase the time out and other things.
- To avoid the resource intensive work, we should also provide some sleep time in the thread which is checking the model upload if we apply "1" suggestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add max retry time and sleep time in the thread.
builder.startObject() | ||
.startObject("index") | ||
.field("knn", true) | ||
.field("knn.algo_param.ef_search", 100) | ||
.field("refresh_interval", "30s") | ||
.field("default_pipeline", pipelineName) | ||
.endObject() | ||
.field("number_of_shards", 1) | ||
.field("number_of_replicas", 0) | ||
.endObject() | ||
.endObject() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried running this test it failed at the line 61. I see that we are adding 1 more extra endObject which is not required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Command to rerun the test:
./gradlew ':integTest' --tests "org.opensearch.neuralsearch.processor.TextEmbeddingProcessorIT.test_text_embedding_processor" -Dtests.seed=173A58A0D4C0E3A0 -Dtests.security.manager=false -Dtests.locale=es-SV -Dtests.timezone=Asia/Seoul
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
createIndex( | ||
indexName, | ||
Settings.builder().loadFromSource(settings, XContentType.JSON).build(), | ||
Files.readString(Path.of(classLoader.getResource("processor/IndexMappings.json").toURI())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as we are reading a JSON object from file, while reading the object you might want to remove "{" and "}" from start and end. I tried removing then it worked for.
Example reference: https://github.com/opensearch-project/k-NN/blob/48d2303ad6964d386709ab5ae5fdbb0965420cb8/src/testFixtures/java/org/opensearch/knn/KNNRestTestCase.java#L638-L639
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method encapsulation is really not friendly to use, I prefer to create our own method.
src/main/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessor.java
Outdated
Show resolved
Hide resolved
@Override | ||
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) { | ||
final MachineLearningNodeClient machineLearningNodeClient = new MachineLearningNodeClient(parameters.client); | ||
mlCommonsClientAccessor = new MLCommonsClientAccessor(machineLearningNodeClient); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to create the MLCommonsClientAccessor in the getProcessors method since this method executes before createComponents
, check this line: https://github.com/opensearch-project/OpenSearch/blob/e44158d4d10d4f8905895ffa50bf9398b8550667/server/src/main/java/org/opensearch/node/Node.java#L515, and this line: https://github.com/opensearch-project/OpenSearch/blob/e44158d4d10d4f8905895ffa50bf9398b8550667/server/src/main/java/org/opensearch/node/Node.java#L711.
Map<String, Object> config | ||
) throws Exception { | ||
String modelId = readStringProperty(TYPE, processorTag, config, MODEL_ID_FIELD); | ||
Map<String, Object> filedMap = readOptionalMap(TYPE, processorTag, config, FIELD_MAP_FIELD); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why optional? If this doesnt exist, what is the purpose of the processor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to non optional method, this will throw a configuration exception when this map is missing.
this.mlCommonsClientAccessor = clientAccessor; | ||
} | ||
|
||
private boolean isEmbeddingConfigValid(Map<String, Object> fieldMap) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should return true if it is valid. False otherwise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, why not make this a validation method (i.e. validateEmbeddingConfig or checkEmbeddingConfig) and have it throw an IllegalArgumentException, similar to the ones at the end of this class (checkListElementsType, validateEmbeddingFieldsType)
We can add the following to the method
fieldMap == null || fieldMap.size() == 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
if (StringUtils.isNotBlank(strSourceValue)) { | ||
numbers.add(ImmutableMap.of(LIST_TYPE_NESTED_MAP_KEY, modelTensorList.get(indexWrapper.index++))); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I think this method should not skip strings - instead, it should get what was passed to it. k-NN may have issues with an all 0.0 vector for cosine similarity type, but I think it should work. Remember:
Cos(x, y) = x . y / ||x|| * ||y||
src/main/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessor.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessor.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessor.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessor.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessor.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
private void validateEmbeddingConfiguration(Map<String, Object> fieldMap) { | ||
if (fieldMap == null || fieldMap.size() == 0 || fieldMap.entrySet() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering if a user can create an arbitrarily large fieldMap? If so, should this be limited, from a security perspective?
String sourceKey = embeddingFieldsEntry.getKey(); | ||
Class<?> sourceValueClass = sourceValue.getClass(); | ||
if (List.class.isAssignableFrom(sourceValueClass) || Map.class.isAssignableFrom(sourceValueClass)) { | ||
validateNestedTypeValue(sourceKey, sourceValue); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related to above comment: are there any limits on the depth of nested parameter that could be passed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now, there's no limits. Nested type will not be explicitly mentioned in the doc, so users use only raw string or list type. If we receive feedback on supporting nested type, we can then tell user how.
} | ||
|
||
@VisibleForTesting | ||
Map<String, Object> buildMapWithKnnKeyAndOriginalValue(IngestDocument ingestDocument, Map<String, Object> fieldMap) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private method: do we need to pass fieldMap if its a member?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this from parameter.
# This is the 1st commit message: Add text embedding processor to neural search Signed-off-by: Zan Niu <zaniu@amazon.com> # The commit message opensearch-project#2 will be skipped: # Code format # # Signed-off-by: Zan Niu <zaniu@amazon.com> # The commit message opensearch-project#3 will be skipped: # Address review comments # # Signed-off-by: Zan Niu <zaniu@amazon.com> # The commit message opensearch-project#4 will be skipped: # Add blocking text embedding method for pipeline processor # # Signed-off-by: Zan Niu <zaniu@amazon.com> # The commit message opensearch-project#5 will be skipped: # Add BaseNeuralSearchIT and address other review comments # # Signed-off-by: Zan Niu <zaniu@amazon.com> # The commit message opensearch-project#6 will be skipped: # Add BaseNeuralSearchIT and address other review comments # # Signed-off-by: Zan Niu <zaniu@amazon.com> # The commit message opensearch-project#7 will be skipped: # Add BaseNeuralSearchIT and address other review comments # # Signed-off-by: Zan Niu <zaniu@amazon.com> # The commit message opensearch-project#8 will be skipped: # Fix naming convention and IT function move to base # # Signed-off-by: Zan Niu <zaniu@amazon.com> # The commit message opensearch-project#9 will be skipped: # Fix naming convention and IT function move to base # # Signed-off-by: Zan Niu <zaniu@amazon.com> # The commit message opensearch-project#10 will be skipped: # Update src/main/java/org/opensearch/neuralsearch/ml/MLCommonsClientAccessor.java # # Co-authored-by: Navneet Verma <vermanavneet003@gmail.com> # The commit message opensearch-project#11 will be skipped: # Update src/main/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessor.java # # Co-authored-by: Navneet Verma <vermanavneet003@gmail.com> # The commit message opensearch-project#12 will be skipped: # Fix code review comments # # Signed-off-by: Zan Niu <zaniu@amazon.com> # The commit message opensearch-project#13 will be skipped: # Fix text embedding processor NPE # # Signed-off-by: Zan Niu <zaniu@amazon.com> # The commit message opensearch-project#14 will be skipped: # Remove jackson dependencies and fix tests with XCoontent # # Signed-off-by: Zan Niu <zaniu@amazon.com>
Signed-off-by: Zan Niu <zaniu@amazon.com>
117249c
to
e3cca7b
Compare
Signed-off-by: Zan Niu <zaniu@amazon.com>
Signed-off-by: Zan Niu <zaniu@amazon.com>
Signed-off-by: Zan Niu <zaniu@amazon.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Signed-off-by: Zan Niu zaniu@amazon.com
Description
[Describe what this change achieves]
Issues Resolved
[List any issues this PR will resolve]
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.