Skip to content

Commit

Permalink
Describe Metric and List Metrics Changes
Browse files Browse the repository at this point in the history
Signed-off-by: vamsi-amazon <reddyvam@amazon.com>
  • Loading branch information
vamsi-amazon committed Oct 21, 2022
1 parent 863f751 commit 125b140
Show file tree
Hide file tree
Showing 34 changed files with 608 additions and 121 deletions.
21 changes: 21 additions & 0 deletions core/src/main/java/org/opensearch/sql/CatalogSchemaName.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql;

import lombok.Getter;
import lombok.RequiredArgsConstructor;

@Getter
@RequiredArgsConstructor
public class CatalogSchemaName {

private final String catalogName;

private final String schemaName;

}
5 changes: 4 additions & 1 deletion core/src/main/java/org/opensearch/sql/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.sql.CatalogSchemaName;
import org.opensearch.sql.analysis.model.CatalogSchemaIdentifierName;
import org.opensearch.sql.analysis.symbol.Namespace;
import org.opensearch.sql.analysis.symbol.Symbol;
Expand Down Expand Up @@ -136,7 +137,9 @@ public LogicalPlan visitRelation(Relation node, AnalysisContext context) {
TypeEnvironment curEnv = context.peek();
Table table = catalogService
.getStorageEngine(catalogSchemaIdentifierName.getCatalogName())
.getTable(catalogSchemaIdentifierName.getIdentifierName());
.getTable(new CatalogSchemaName(catalogSchemaIdentifierName.getCatalogName(),
catalogSchemaIdentifierName.getSchemaName()),
catalogSchemaIdentifierName.getIdentifierName());
table.getFieldTypes().forEach((k, v) -> curEnv.define(new Symbol(Namespace.FIELD_NAME, k), v));

// Put index name or its alias in index namespace on type environment so qualifier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import java.util.Collection;
import java.util.Collections;
import org.opensearch.sql.CatalogSchemaName;
import org.opensearch.sql.expression.function.FunctionResolver;

/**
Expand All @@ -18,14 +19,14 @@ public interface StorageEngine {
/**
* Get {@link Table} from storage engine.
*/
Table getTable(String name);
Table getTable(CatalogSchemaName catalogSchemaName, String tableName);

/**
* Get list of catalog related functions.
*
* @return FunctionResolvers of catalog functions.
*/
default Collection<FunctionResolver> getFunctions() {
default Collection<FunctionResolver> getFunctions(CatalogSchemaName catalogSchemaName) {
return Collections.emptyList();
}

Expand Down
30 changes: 16 additions & 14 deletions core/src/main/java/org/opensearch/sql/utils/SystemIndexUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,31 @@
*/
@UtilityClass
public class SystemIndexUtils {

public static final String TABLE_NAME_FOR_TABLES_INFO = "tables";
/**
* The prefix of all the system tables.
* The suffix of all the system tables.
*/
private static final String SYS_TABLES_PREFIX = "_ODFE_SYS_TABLE";
private static final String SYS_TABLES_SUFFIX = "ODFE_SYS_TABLE";

/**
* The prefix of all the meta tables.
* The suffix of all the meta tables.
*/
private static final String SYS_META_PREFIX = SYS_TABLES_PREFIX + "_META";
private static final String SYS_META_SUFFIX = "META_" + SYS_TABLES_SUFFIX;

/**
* The prefix of all the table mappings.
* The suffix of all the table mappings.
*/
private static final String SYS_MAPPINGS_PREFIX = SYS_TABLES_PREFIX + "_MAPPINGS";
private static final String SYS_MAPPINGS_SUFFIX = "MAPPINGS_" + SYS_TABLES_SUFFIX;

/**
* The _ODFE_SYS_TABLE_META.ALL contain all the table info.
* The ALL.META_ODFE_SYS_TABLE contain all the table info.
*/
public static final String TABLE_INFO = SYS_META_PREFIX + ".ALL";
public static final String TABLE_INFO = "ALL." + SYS_META_SUFFIX;


public static Boolean isSystemIndex(String indexName) {
return indexName.startsWith(SYS_TABLES_PREFIX);
return indexName.endsWith(SYS_TABLES_SUFFIX);
}

/**
Expand All @@ -47,7 +49,7 @@ public static Boolean isSystemIndex(String indexName) {
* @return system mapping table.
*/
public static String mappingTable(String indexName) {
return String.join(".", SYS_MAPPINGS_PREFIX, indexName);
return String.join(".", indexName, SYS_MAPPINGS_SUFFIX);
}

/**
Expand All @@ -57,13 +59,13 @@ public static String mappingTable(String indexName) {
*/
public static SystemTable systemTable(String indexName) {
final int lastDot = indexName.indexOf(".");
String prefix = indexName.substring(0, lastDot);
String tableName = indexName.substring(lastDot + 1)
String suffix = indexName.substring(lastDot + 1);
String tableName = indexName.substring(0, lastDot)
.replace("%", "*");

if (prefix.equalsIgnoreCase(SYS_META_PREFIX)) {
if (suffix.equalsIgnoreCase(SYS_META_SUFFIX)) {
return new SystemInfoTable(tableName);
} else if (prefix.equalsIgnoreCase(SYS_MAPPINGS_PREFIX)) {
} else if (suffix.equalsIgnoreCase(SYS_MAPPINGS_SUFFIX)) {
return new MetaInfoTable(tableName);
} else {
throw new IllegalStateException("Invalid system index name: " + indexName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.swing.JTable;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.sql.CatalogSchemaName;
import org.opensearch.sql.analysis.symbol.Namespace;
import org.opensearch.sql.analysis.symbol.Symbol;
import org.opensearch.sql.analysis.symbol.SymbolTable;
import org.opensearch.sql.ast.expression.Argument;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.catalog.CatalogService;
import org.opensearch.sql.config.TestConfig;
Expand Down Expand Up @@ -51,12 +50,7 @@ protected Map<String, ExprType> typeMapping() {

@Bean
protected StorageEngine storageEngine() {
return new StorageEngine() {
@Override
public Table getTable(String name) {
return table;
}
};
return (catalogSchemaName, tableName) -> table;
}

@Bean
Expand Down
3 changes: 2 additions & 1 deletion core/src/test/java/org/opensearch/sql/config/TestConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.opensearch.sql.CatalogSchemaName;
import org.opensearch.sql.analysis.symbol.Namespace;
import org.opensearch.sql.analysis.symbol.Symbol;
import org.opensearch.sql.analysis.symbol.SymbolTable;
Expand Down Expand Up @@ -61,7 +62,7 @@ public class TestConfig {
protected StorageEngine storageEngine() {
return new StorageEngine() {
@Override
public Table getTable(String name) {
public Table getTable(CatalogSchemaName catalogSchemaName, String name) {
return new Table() {
@Override
public Map<String, ExprType> getFieldTypes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.sql.CatalogSchemaName;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.expression.DSL;
import org.opensearch.sql.planner.logical.LogicalAggregation;
Expand Down Expand Up @@ -56,7 +57,7 @@ public class PlannerTest extends PhysicalPlanTestBase {

@BeforeEach
public void setUp() {
when(storageEngine.getTable(any())).thenReturn(new MockTable());
when(storageEngine.getTable(any(), any())).thenReturn(new MockTable());
}

@Test
Expand All @@ -77,7 +78,9 @@ public void planner_test() {
LogicalPlanDSL.rename(
LogicalPlanDSL.aggregation(
LogicalPlanDSL.filter(
LogicalPlanDSL.relation("schema", storageEngine.getTable("schema")),
LogicalPlanDSL.relation("schema",
storageEngine.getTable(new CatalogSchemaName(".opensearch", "default"),
"schema")),
dsl.equal(DSL.ref("response", INTEGER), DSL.literal(10))
),
ImmutableList.of(DSL.named("avg(response)", dsl.avg(DSL.ref("response", INTEGER)))),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,16 @@
import java.util.Collections;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.opensearch.sql.CatalogSchemaName;

public class StorageEngineTest {


@Test
void testFunctionsMethod() {
StorageEngine k = new StorageEngine() {
@Override
public Table getTable(String name) {
return null;
}
};
Assertions.assertEquals(Collections.emptyList(), k.getFunctions());
StorageEngine k = (catalogSchemaName, tableName) -> null;
Assertions.assertEquals(Collections.emptyList(),
k.getFunctions(new CatalogSchemaName(".opensearch", "default")));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@ class SystemIndexUtilsTest {

@Test
void test_system_index() {
assertTrue(isSystemIndex("_ODFE_SYS_TABLE_META.ALL"));
assertTrue(isSystemIndex("ALL.META_ODFE_SYS_TABLE"));
assertFalse(isSystemIndex(".opensearch_dashboards"));
}

@Test
void test_compose_mapping_table() {
assertEquals("_ODFE_SYS_TABLE_MAPPINGS.employee", mappingTable("employee"));
assertEquals("employee.MAPPINGS_ODFE_SYS_TABLE", mappingTable("employee"));
}

@Test
void test_system_info_table() {
final SystemIndexUtils.SystemTable table = systemTable("_ODFE_SYS_TABLE_META.ALL");
final SystemIndexUtils.SystemTable table = systemTable("ALL.META_ODFE_SYS_TABLE");

assertTrue(table.isSystemInfoTable());
assertFalse(table.isMetaInfoTable());
Expand All @@ -40,7 +40,7 @@ void test_system_info_table() {

@Test
void test_mapping_info_table() {
final SystemIndexUtils.SystemTable table = systemTable("_ODFE_SYS_TABLE_MAPPINGS.employee");
final SystemIndexUtils.SystemTable table = systemTable("employee.MAPPINGS_ODFE_SYS_TABLE");

assertTrue(table.isMetaInfoTable());
assertFalse(table.isSystemInfoTable());
Expand All @@ -50,7 +50,7 @@ void test_mapping_info_table() {
@Test
void throw_exception_for_invalid_index() {
final IllegalStateException exception =
assertThrows(IllegalStateException.class, () -> systemTable("_ODFE_SYS_TABLE.employee"));
assertEquals("Invalid system index name: _ODFE_SYS_TABLE.employee", exception.getMessage());
assertThrows(IllegalStateException.class, () -> systemTable("employee._ODFE_SYS_TABLE"));
assertEquals("Invalid system index name: employee._ODFE_SYS_TABLE", exception.getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,48 @@
import static org.opensearch.sql.utils.SystemIndexUtils.isSystemIndex;

import lombok.RequiredArgsConstructor;
import org.opensearch.sql.CatalogSchemaName;
import org.opensearch.sql.analysis.model.CatalogSchemaIdentifierName;
import org.opensearch.sql.analysis.model.SchemaName;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.exception.SemanticCheckException;
import org.opensearch.sql.opensearch.client.OpenSearchClient;
import org.opensearch.sql.opensearch.storage.system.OpenSearchSystemIndex;
import org.opensearch.sql.storage.StorageEngine;
import org.opensearch.sql.storage.Table;
import org.opensearch.sql.utils.SystemIndexUtils;

/** OpenSearch storage engine implementation. */
/**
* OpenSearch storage engine implementation.
*/
@RequiredArgsConstructor
public class OpenSearchStorageEngine implements StorageEngine {

/** OpenSearch client connection. */
/**
* OpenSearch client connection.
*/
private final OpenSearchClient client;

private final Settings settings;

@Override
public Table getTable(String name) {
if (isSystemIndex(name)) {
return new OpenSearchSystemIndex(client, name);
public Table getTable(CatalogSchemaName catalogSchemaName, String tableName) {
if (isSystemIndex(tableName)) {
return new OpenSearchSystemIndex(client, tableName);
} else if (SchemaName.INFORMATION_SCHEMA_NAME.equals(catalogSchemaName.getSchemaName())) {
return resolveInformationSchemaTable(catalogSchemaName, tableName);
} else {
return new OpenSearchIndex(client, settings, tableName);
}
}

private Table resolveInformationSchemaTable(CatalogSchemaName catalogSchemaName,
String tableName) {
if (SystemIndexUtils.TABLE_NAME_FOR_TABLES_INFO.equals(tableName)) {
return new OpenSearchSystemIndex(client, SystemIndexUtils.TABLE_INFO);
} else {
return new OpenSearchIndex(client, settings, name);
throw new SemanticCheckException(
String.format("Information Schema doesn't contain %s table", tableName));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,19 @@

package org.opensearch.sql.opensearch.storage;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.opensearch.sql.utils.SystemIndexUtils.TABLE_INFO;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.sql.CatalogSchemaName;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.exception.SemanticCheckException;
import org.opensearch.sql.opensearch.client.OpenSearchClient;
import org.opensearch.sql.opensearch.storage.system.OpenSearchSystemIndex;
import org.opensearch.sql.storage.Table;
Expand All @@ -29,15 +33,33 @@ class OpenSearchStorageEngineTest {
@Test
public void getTable() {
OpenSearchStorageEngine engine = new OpenSearchStorageEngine(client, settings);
Table table = engine.getTable("test");
Table table = engine.getTable(new CatalogSchemaName(".opensearch", "default"),"test");
assertNotNull(table);
}

@Test
public void getSystemTable() {
OpenSearchStorageEngine engine = new OpenSearchStorageEngine(client, settings);
Table table = engine.getTable(TABLE_INFO);
Table table = engine.getTable(new CatalogSchemaName(".opensearch", "default"), TABLE_INFO);
assertNotNull(table);
assertTrue(table instanceof OpenSearchSystemIndex);
}


@Test
public void getSystemTableForAllTablesInfo() {
OpenSearchStorageEngine engine = new OpenSearchStorageEngine(client, settings);
Table table
= engine.getTable(new CatalogSchemaName(".opensearch", "information_schema"), "tables");
assertNotNull(table);
assertTrue(table instanceof OpenSearchSystemIndex);
}

@Test
public void getSystemTableWithWrongInformationSchemaTable() {
OpenSearchStorageEngine engine = new OpenSearchStorageEngine(client, settings);
SemanticCheckException exception = assertThrows(SemanticCheckException.class,
() -> engine.getTable(new CatalogSchemaName(".opensearch", "information_schema"), "test"));
assertEquals("Information Schema doesn't contain test table", exception.getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package org.opensearch.sql.ppl.config;

import org.opensearch.sql.CatalogSchemaName;
import org.opensearch.sql.catalog.CatalogService;
import org.opensearch.sql.catalog.model.ConnectorType;
import org.opensearch.sql.executor.ExecutionEngine;
Expand Down Expand Up @@ -41,7 +42,7 @@ public class PPLServiceConfig {
public PPLService pplService() {
catalogService.getCatalogs()
.forEach(catalog -> catalogService.getStorageEngine(catalog)
.getFunctions()
.getFunctions(new CatalogSchemaName(catalog, "default"))
.forEach(functionResolver -> functionRepository.register(catalog, functionResolver)));
return new PPLService(new PPLSyntaxParser(), executionEngine,
functionRepository, catalogService);
Expand Down
Loading

0 comments on commit 125b140

Please sign in to comment.