Skip to content

Commit

Permalink
[Feature][Milvus] Support Milvus source & sink
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomas-HuWei committed Jul 10, 2024
1 parent 476d492 commit 99390d3
Show file tree
Hide file tree
Showing 40 changed files with 2,968 additions and 1 deletion.
39 changes: 39 additions & 0 deletions docs/en/connector-v2/sink/Mivlus.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Milvus

> Milvus sink connector
## Description

Write data to Milvus or Zilliz Cloud

## Key Features

- [x] [batch](../../concept/connector-v2-features.md)
- [x] [exactly-once](../../concept/connector-v2-features.md)
- [ ] [column projection](../../concept/connector-v2-features.md)

## Sink Options

| Name | Type | Required | Default | Description |
|----------------------|---------|----------|------------------------------|-----------------------------------------------------------|
| url | String | Yes | - | The URL to connect to Milvus or Zilliz Cloud. |
| token | String | Yes | - | User:password |
| database | String | No | - | Write data to which database, default is source database. |
| schema_save_mode | enum | No | CREATE_SCHEMA_WHEN_NOT_EXIST | Auto create table when table not exist. |
| enable_auto_id | boolean | No | false | Primary key column enable autoId. |
| enable_upsert | boolean | No | false | Upsert data not insert. |
| enable_dynamic_field | boolean | No | true | Enable create table with dynamic field. |
| batch_size | int | No | 1000 | Write batch size. |

## Task Example

```bash
sink {
Milvus {
url = "http://127.0.0.1:19530"
token = "username:password"
batch_size = 1000
}
}
```

35 changes: 35 additions & 0 deletions docs/en/connector-v2/source/Mivlus.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Milvus

> Milvus source connector
## Description

Read data from Milvus or Zilliz Cloud

## Key Features

- [x] [batch](../../concept/connector-v2-features.md)
- [x] [exactly-once](../../concept/connector-v2-features.md)
- [ ] [column projection](../../concept/connector-v2-features.md)

## Source Options

| Name | Type | Required | Default | Description |
|------------|--------|----------|---------|--------------------------------------------------------------------------------------------|
| url | String | Yes | - | The URL to connect to Milvus or Zilliz Cloud. |
| token | String | Yes | - | User:password |
| database | String | Yes | default | Read data from which database. |
| collection | String | No | - | If set, will only read one collection, otherwise will read all collections under database. |

## Task Example

```bash
source {
Milvus {
url = "http://127.0.0.1:19530"
token = "username:password"
database = "default"
}
}
```

2 changes: 2 additions & 0 deletions plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,5 @@ seatunnel.source.Oracle-CDC = connector-cdc-oracle
seatunnel.sink.Pulsar = connector-pulsar
seatunnel.source.ObsFile = connector-file-obs
seatunnel.sink.ObsFile = connector-file-obs
seatunnel.source.Milvus = connector-milvus
seatunnel.sink.Milvus = connector-milvus
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,30 @@ public class PrimaryKey implements Serializable {

private final List<String> columnNames;

private Boolean enableAutoId;

public PrimaryKey(String primaryKey, List<String> columnNames) {
this.primaryKey = primaryKey;
this.columnNames = columnNames;
this.enableAutoId = null;
}

public static PrimaryKey of(String primaryKey, List<String> columnNames, Boolean autoId) {
return new PrimaryKey(primaryKey, columnNames, autoId);
}

public static PrimaryKey of(String primaryKey, List<String> columnNames) {
return new PrimaryKey(primaryKey, columnNames);
}

public PrimaryKey copy() {
return PrimaryKey.of(primaryKey, new ArrayList<>(columnNames));
}

public static boolean isPrimaryKeyField(PrimaryKey primaryKey, String fieldName) {
if (primaryKey == null || primaryKey.getColumnNames() == null) {
return false;
}
return primaryKey.getColumnNames().contains(fieldName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.api.table.type.VectorType;
import org.apache.seatunnel.common.exception.CommonError;

public class SeaTunnelDataTypeConvertorUtil {
Expand Down Expand Up @@ -80,6 +81,16 @@ public static SeaTunnelDataType<?> deserializeSeaTunnelDataType(
return LocalTimeType.LOCAL_DATE_TIME_TYPE;
case MAP:
return parseMapType(field, columnType);
case BINARY_VECTOR:
return VectorType.VECTOR_BINARY_TYPE;
case FLOAT_VECTOR:
return VectorType.VECTOR_FLOAT_TYPE;
case FLOAT16_VECTOR:
return VectorType.VECTOR_FLOAT16_TYPE;
case BFLOAT16_VECTOR:
return VectorType.VECTOR_BFLOAT16_TYPE;
case SPARSE_FLOAT_VECTOR:
return VectorType.VECTOR_SPARSE_FLOAT_TYPE;
default:
throw CommonError.unsupportedDataType("SeaTunnel", columnType, field);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public final class TableSchema implements Serializable {

private final List<ConstraintKey> constraintKeys;

private List<VectorIndex> vectorIndexes;

private Boolean enableDynamicField;

public static Builder builder() {
return new Builder();
}
Expand Down Expand Up @@ -68,6 +72,10 @@ public static final class Builder {

private final List<ConstraintKey> constraintKeys = new ArrayList<>();

private final List<VectorIndex> vectorIndexes = new ArrayList<>();

private Boolean enableDynamicField;

public Builder columns(List<Column> columns) {
this.columns.addAll(columns);
return this;
Expand All @@ -83,6 +91,16 @@ public Builder primaryKey(PrimaryKey primaryKey) {
return this;
}

public Builder enableDynamicField(Boolean enableDynamicField) {
this.enableDynamicField = enableDynamicField;
return this;
}

public Builder vectorIndexes(List<VectorIndex> vectorIndexes) {
this.vectorIndexes.addAll(vectorIndexes);
return this;
}

public Builder constraintKey(ConstraintKey constraintKey) {
this.constraintKeys.add(constraintKey);
return this;
Expand All @@ -94,7 +112,8 @@ public Builder constraintKey(List<ConstraintKey> constraintKeys) {
}

public TableSchema build() {
return new TableSchema(columns, primaryKey, constraintKeys);
return new TableSchema(
columns, primaryKey, constraintKeys, vectorIndexes, enableDynamicField);
}
}

Expand All @@ -106,6 +125,8 @@ public TableSchema copy() {
.constraintKey(copyConstraintKeys)
.columns(copyColumns)
.primaryKey(primaryKey == null ? null : primaryKey.copy())
.enableDynamicField(enableDynamicField)
.vectorIndexes(vectorIndexes)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.table.catalog;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@AllArgsConstructor
@Builder
@NoArgsConstructor
@Data
public class VectorIndex implements Serializable {

private String indexName;

private String fieldName;

private String indexType;

private String metricType;
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ private int getBytesForValue(Object v, SeaTunnelDataType<?> dataType) {
}

return getBytesForArray(v, ((ArrayType) dataType).getElementType());
case FLOAT_VECTOR:
return getArrayNotNullSize((Object[]) v) * 4;
case MAP:
int size = 0;
MapType<?, ?> mapType = ((MapType<?, ?>) dataType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,10 @@ public enum SqlType {
TIME,
TIMESTAMP,
ROW,
BINARY_VECTOR,
FLOAT_VECTOR,
FLOAT16_VECTOR,
BFLOAT16_VECTOR,
SPARSE_FLOAT_VECTOR,
MULTIPLE_ROW;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.table.type;

import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;

public class VectorType<T> implements SeaTunnelDataType<T> {
private static final long serialVersionUID = 2L;

public static final VectorType<Float> VECTOR_FLOAT_TYPE =
new VectorType<>(Float.class, SqlType.FLOAT_VECTOR);

public static final VectorType<Map> VECTOR_SPARSE_FLOAT_TYPE =
new VectorType<>(Map.class, SqlType.SPARSE_FLOAT_VECTOR);

public static final VectorType<Byte> VECTOR_BINARY_TYPE =
new VectorType<>(Byte.class, SqlType.BINARY_VECTOR);

public static final VectorType<ByteBuffer> VECTOR_FLOAT16_TYPE =
new VectorType<>(ByteBuffer.class, SqlType.FLOAT16_VECTOR);

public static final VectorType<ByteBuffer> VECTOR_BFLOAT16_TYPE =
new VectorType<>(ByteBuffer.class, SqlType.BFLOAT16_VECTOR);

// --------------------------------------------------------------------------------------------

/** The physical type class. */
private final Class<T> typeClass;

private final SqlType sqlType;

protected VectorType(Class<T> typeClass, SqlType sqlType) {
this.typeClass = typeClass;
this.sqlType = sqlType;
}

@Override
public Class<T> getTypeClass() {
return this.typeClass;
}

@Override
public SqlType getSqlType() {
return this.sqlType;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof VectorType)) {
return false;
}
VectorType<?> that = (VectorType<?>) obj;
return Objects.equals(typeClass, that.typeClass) && Objects.equals(sqlType, that.sqlType);
}

@Override
public int hashCode() {
return Objects.hash(typeClass, sqlType);
}

@Override
public String toString() {
return sqlType.toString();
}
}
Loading

0 comments on commit 99390d3

Please sign in to comment.