Skip to content

Commit

Permalink
Glue datasource support
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <reddyvam@amazon.com>
  • Loading branch information
vamsi-amazon committed Sep 8, 2023
1 parent 61d1eb7 commit ff52688
Show file tree
Hide file tree
Showing 10 changed files with 250 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
public enum DataSourceType {
PROMETHEUS("prometheus"),
OPENSEARCH("opensearch"),
SPARK("spark");
SPARK("spark"),
S3GLUE("s3glue");

private String text;

DataSourceType(String text) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.opensearch.sql.datasources.glue;

import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.Set;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.datasources.utils.DatasourceValidationUtils;
import org.opensearch.sql.storage.DataSourceFactory;

@RequiredArgsConstructor
public class GlueDataSourceFactory implements DataSourceFactory {

private final Settings pluginSettings;

// Glue configuration properties
public static final String GLUE_AUTH_TYPE = "glue.auth.type";
public static final String GLUE_ROLE_ARN = "glue.auth.role_arn";
public static final String FLINT_URI = "glue.indexstore.opensearch.uri";
public static final String FLINT_AUTH = "glue.indexstore.opensearch.auth";
public static final String FLINT_REGION = "glue.indexstore.opensearch.region";

@Override
public DataSourceType getDataSourceType() {
return DataSourceType.S3GLUE;
}

@Override
public DataSource createDataSource(DataSourceMetadata metadata) {
try {
validateGlueDataSourceConfiguration(metadata.getProperties());
return new DataSource(
metadata.getName(),
metadata.getConnector(),
(dataSourceSchemaName, tableName) -> {
throw new UnsupportedOperationException("Glue storage engine is not supported.");
});
} catch (URISyntaxException | UnknownHostException e) {
throw new IllegalArgumentException("Invalid flint host in properties.");
}
}

private void validateGlueDataSourceConfiguration(Map<String, String> dataSourceMetadataConfig)
throws URISyntaxException, UnknownHostException {
DatasourceValidationUtils.validateLengthAndRequiredFields(
dataSourceMetadataConfig,
Set.of(GLUE_AUTH_TYPE, GLUE_ROLE_ARN, FLINT_URI, FLINT_REGION, FLINT_AUTH));
DatasourceValidationUtils.validateHost(
dataSourceMetadataConfig.get(FLINT_URI),
pluginSettings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ public static void validateLengthAndRequiredFields(
StringBuilder errorStringBuilder = new StringBuilder();
if (missingFields.size() > 0) {
errorStringBuilder.append(
String.format(
"Missing %s fields in the Prometheus connector properties.", missingFields));
String.format("Missing %s fields in the connector properties.", missingFields));
}

if (invalidLengthFields.size() > 0) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package org.opensearch.sql.datasources.glue;

import static org.mockito.Mockito.when;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import lombok.SneakyThrows;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.sql.DataSourceSchemaName;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;

@ExtendWith(MockitoExtension.class)
public class GlueDataSourceFactoryTest {

@Mock private Settings settings;

@Test
void testGetConnectorType() {
GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings);
Assertions.assertEquals(DataSourceType.S3GLUE, glueDatasourceFactory.getDataSourceType());
}

@Test
@SneakyThrows
void testCreateGLueDatSource() {
when(settings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST))
.thenReturn(Collections.emptyList());
GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings);

DataSourceMetadata metadata = new DataSourceMetadata();
HashMap<String, String> properties = new HashMap<>();
properties.put("glue.auth.type", "iam_role");
properties.put("glue.auth.role_arn", "role_arn");
properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200");
properties.put("glue.indexstore.opensearch.auth", "false");
properties.put("glue.indexstore.opensearch.region", "us-west-2");

metadata.setName("my_glue");
metadata.setConnector(DataSourceType.S3GLUE);
metadata.setProperties(properties);
DataSource dataSource = glueDatasourceFactory.createDataSource(metadata);
Assertions.assertEquals(DataSourceType.S3GLUE, dataSource.getConnectorType());
UnsupportedOperationException unsupportedOperationException =
Assertions.assertThrows(
UnsupportedOperationException.class,
() ->
dataSource
.getStorageEngine()
.getTable(new DataSourceSchemaName("my_glue", "default"), "alb_logs"));
Assertions.assertEquals(
"Glue storage engine is not supported.", unsupportedOperationException.getMessage());
}

@Test
@SneakyThrows
void testCreateGLueDatSourceWithInvalidFlintHost() {
when(settings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST))
.thenReturn(List.of("127.0.0.0/8"));
GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings);

DataSourceMetadata metadata = new DataSourceMetadata();
HashMap<String, String> properties = new HashMap<>();
properties.put("glue.auth.type", "iam_role");
properties.put("glue.auth.role_arn", "role_arn");
properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200");
properties.put("glue.indexstore.opensearch.auth", "false");
properties.put("glue.indexstore.opensearch.region", "us-west-2");

metadata.setName("my_glue");
metadata.setConnector(DataSourceType.S3GLUE);
metadata.setProperties(properties);
IllegalArgumentException illegalArgumentException =
Assertions.assertThrows(
IllegalArgumentException.class, () -> glueDatasourceFactory.createDataSource(metadata));
Assertions.assertEquals(
"Disallowed hostname in the uri. "
+ "Validate with plugins.query.datasources.uri.hosts.denylist config",
illegalArgumentException.getMessage());
}

@Test
@SneakyThrows
void testCreateGLueDatSourceWithInvalidFlintHostSyntax() {
when(settings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST))
.thenReturn(List.of("127.0.0.0/8"));
GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings);

DataSourceMetadata metadata = new DataSourceMetadata();
HashMap<String, String> properties = new HashMap<>();
properties.put("glue.auth.type", "iam_role");
properties.put("glue.auth.role_arn", "role_arn");
properties.put(
"glue.indexstore.opensearch.uri",
"http://dummyprometheus.com:9090? paramt::localhost:9200");
properties.put("glue.indexstore.opensearch.auth", "false");
properties.put("glue.indexstore.opensearch.region", "us-west-2");

metadata.setName("my_glue");
metadata.setConnector(DataSourceType.S3GLUE);
metadata.setProperties(properties);
IllegalArgumentException illegalArgumentException =
Assertions.assertThrows(
IllegalArgumentException.class, () -> glueDatasourceFactory.createDataSource(metadata));
Assertions.assertEquals(
"Invalid flint host in properties.", illegalArgumentException.getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void testValidateLengthAndRequiredFieldsWithAbsentField() {
DatasourceValidationUtils.validateLengthAndRequiredFields(
config, Set.of("s3.uri", "s3.auth.type")));
Assertions.assertEquals(
"Missing [s3.auth.type] fields in the Prometheus connector properties.",
"Missing [s3.auth.type] fields in the connector properties.",
illegalArgumentException.getMessage());
}

Expand All @@ -77,7 +77,7 @@ public void testValidateLengthAndRequiredFieldsWithInvalidLength() {
DatasourceValidationUtils.validateLengthAndRequiredFields(
config, Set.of("s3.uri", "s3.auth.type")));
Assertions.assertEquals(
"Missing [s3.auth.type] fields in the Prometheus connector properties.Fields "
"Missing [s3.auth.type] fields in the connector properties.Fields "
+ "[s3.uri] exceeds more than 1000 characters.",
illegalArgumentException.getMessage());
}
Expand Down
68 changes: 68 additions & 0 deletions docs/user/ppl/admin/connectors/s3glue_connector.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
.. highlight:: sh

====================
S3Glue Connector
====================

.. rubric:: Table of contents

.. contents::
:local:
:depth: 1


Introduction
============

s3Glue connector provides a way to query s3 files using glue as metadata store and spark as execution engine.
This page covers s3Glue datasource configuration and also how to query and s3Glue datasource.


Required resources for s3 Glue Connector
===================================
* S3: This is where the data lies.
* Spark Execution Engine: Query Execution happens on spark.
* Glue Metadata store: Glue takes care of table metadata.
* Opensearch: Index for s3 data lies in opensearch and also acts as temporary buffer for query results.

We currently only support emr-serverless as spark execution engine and Glue as metadata store. we will add more support in future.

Glue Connector Properties in DataSource Configuration
========================================================
Glue Connector Properties.

* ``glue.auth.type`` [Required]
* This parameters provides the authentication type information required for execution engine to connect to glue.
* S3 Glue connector currently only supports ``iam_role`` authentication and the below parameters is required.
* ``glue.auth.role_arn``
* ``glue.indexstore.opensearch.*`` [Required]
* This parameters provides the Opensearch domain host information for glue connector. This opensearch instance is used for writing index data back and also
* ``glue.indexstore.opensearch.uri`` [Required]
* ``glue.indexstore.opensearch.auth`` [Required]
* Default value for auth is ``false``.
* ``glue.indexstore.opensearch.region`` [Required]
* Default value for auth is ``us-west-2``.

Sample Glue dataSource configuration
========================================

Glue datasource configuration::

[{
"name" : "my_glue",
"connector": "s3glue",
"properties" : {
"glue.auth.type": "iam_role",
"glue.auth.role_arn": "role_arn",
"glue.indexstore.opensearch.uri": "http://localhost:9200",
"glue.indexstore.opensearch.auth" :"false",
"glue.indexstore.opensearch.region": "us-west-2"
}
}]


Sample s3Glue datasource queries
================================
<To Be Added>


File renamed without changes.
2 changes: 2 additions & 0 deletions plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelper;
import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl;
import org.opensearch.sql.datasources.encryptor.EncryptorImpl;
import org.opensearch.sql.datasources.glue.GlueDataSourceFactory;
import org.opensearch.sql.datasources.model.transport.CreateDataSourceActionResponse;
import org.opensearch.sql.datasources.model.transport.DeleteDataSourceActionResponse;
import org.opensearch.sql.datasources.model.transport.GetDataSourceActionResponse;
Expand Down Expand Up @@ -241,6 +242,7 @@ private DataSourceServiceImpl createDataSourceService() {
new OpenSearchNodeClient(this.client), pluginSettings))
.add(new PrometheusStorageFactory(pluginSettings))
.add(new SparkStorageFactory(this.client, pluginSettings))
.add(new GlueDataSourceFactory(pluginSettings))
.build(),
dataSourceMetadataStorage,
dataSourceUserAuthorizationHelper);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ void testGetStorageEngineWithMissingURI() {
IllegalArgumentException.class,
() -> prometheusStorageFactory.getStorageEngine(properties));
Assertions.assertEquals(
"Missing [prometheus.uri] fields " + "in the Prometheus connector properties.",
"Missing [prometheus.uri] fields " + "in the connector properties.",
exception.getMessage());
}

Expand All @@ -99,7 +99,7 @@ void testGetStorageEngineWithMissingRegionInAWS() {
IllegalArgumentException.class,
() -> prometheusStorageFactory.getStorageEngine(properties));
Assertions.assertEquals(
"Missing [prometheus.auth.region] fields in the " + "Prometheus connector properties.",
"Missing [prometheus.auth.region] fields in the connector properties.",
exception.getMessage());
}

Expand All @@ -118,7 +118,7 @@ void testGetStorageEngineWithLongConfigProperties() {
() -> prometheusStorageFactory.getStorageEngine(properties));
Assertions.assertEquals(
"Missing [prometheus.auth.region] fields in the "
+ "Prometheus connector properties."
+ "connector properties."
+ "Fields [prometheus.uri] exceeds more than 1000 characters.",
exception.getMessage());
}
Expand Down

0 comments on commit ff52688

Please sign in to comment.