Skip to content

Commit fef20f8

Browse files
authored
Catalog to Datasource changes (opensearch-project#1086)
Signed-off-by: vamsi-amazon <reddyvam@amazon.com>
1 parent 81c9285 commit fef20f8

File tree

74 files changed

+745
-739
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

74 files changed

+745
-739
lines changed

core/build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ jacocoTestReport {
7373
afterEvaluate {
7474
classDirectories.setFrom(files(classDirectories.files.collect {
7575
fileTree(dir: it,
76-
exclude: ['**/ast/**', '**/catalog/model/**'])
76+
exclude: ['**/ast/**', '**/datasource/model/**'])
7777
}))
7878
}
7979
}
@@ -85,7 +85,7 @@ jacocoTestCoverageVerification {
8585
excludes = [
8686
'org.opensearch.sql.utils.MLCommonsConstants',
8787
'org.opensearch.sql.utils.Constants',
88-
'org.opensearch.sql.catalog.model.*'
88+
'org.opensearch.sql.datasource.model.*'
8989
]
9090
limit {
9191
counter = 'LINE'

core/src/main/java/org/opensearch/sql/CatalogSchemaName.java renamed to core/src/main/java/org/opensearch/sql/DataSourceSchemaName.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@
1212

1313
@Getter
1414
@RequiredArgsConstructor
15-
public class CatalogSchemaName {
15+
public class DataSourceSchemaName {
1616

17-
private final String catalogName;
17+
private final String dataSourceName;
1818

1919
private final String schemaName;
2020

core/src/main/java/org/opensearch/sql/analysis/Analyzer.java

Lines changed: 31 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,11 @@
1111
import static org.opensearch.sql.ast.tree.Sort.SortOrder.ASC;
1212
import static org.opensearch.sql.ast.tree.Sort.SortOrder.DESC;
1313
import static org.opensearch.sql.data.type.ExprCoreType.STRUCT;
14-
import static org.opensearch.sql.utils.MLCommonsConstants.ACTION;
15-
import static org.opensearch.sql.utils.MLCommonsConstants.MODELID;
16-
import static org.opensearch.sql.utils.MLCommonsConstants.PREDICT;
1714
import static org.opensearch.sql.utils.MLCommonsConstants.RCF_ANOMALOUS;
1815
import static org.opensearch.sql.utils.MLCommonsConstants.RCF_ANOMALY_GRADE;
1916
import static org.opensearch.sql.utils.MLCommonsConstants.RCF_SCORE;
20-
import static org.opensearch.sql.utils.MLCommonsConstants.RCF_TIMESTAMP;
21-
import static org.opensearch.sql.utils.MLCommonsConstants.STATUS;
22-
import static org.opensearch.sql.utils.MLCommonsConstants.TASKID;
2317
import static org.opensearch.sql.utils.MLCommonsConstants.TIME_FIELD;
24-
import static org.opensearch.sql.utils.MLCommonsConstants.TRAIN;
25-
import static org.opensearch.sql.utils.MLCommonsConstants.TRAINANDPREDICT;
26-
import static org.opensearch.sql.utils.SystemIndexUtils.CATALOGS_TABLE_NAME;
18+
import static org.opensearch.sql.utils.SystemIndexUtils.DATASOURCES_TABLE_NAME;
2719

2820
import com.google.common.collect.ImmutableList;
2921
import com.google.common.collect.ImmutableList.Builder;
@@ -37,7 +29,7 @@
3729
import java.util.stream.Collectors;
3830
import org.apache.commons.lang3.tuple.ImmutablePair;
3931
import org.apache.commons.lang3.tuple.Pair;
40-
import org.opensearch.sql.CatalogSchemaName;
32+
import org.opensearch.sql.DataSourceSchemaName;
4133
import org.opensearch.sql.analysis.symbol.Namespace;
4234
import org.opensearch.sql.analysis.symbol.Symbol;
4335
import org.opensearch.sql.ast.AbstractNodeVisitor;
@@ -69,10 +61,10 @@
6961
import org.opensearch.sql.ast.tree.TableFunction;
7062
import org.opensearch.sql.ast.tree.UnresolvedPlan;
7163
import org.opensearch.sql.ast.tree.Values;
72-
import org.opensearch.sql.catalog.CatalogService;
73-
import org.opensearch.sql.catalog.model.Catalog;
7464
import org.opensearch.sql.data.model.ExprMissingValue;
7565
import org.opensearch.sql.data.type.ExprCoreType;
66+
import org.opensearch.sql.datasource.DataSourceService;
67+
import org.opensearch.sql.datasource.model.DataSource;
7668
import org.opensearch.sql.exception.SemanticCheckException;
7769
import org.opensearch.sql.expression.DSL;
7870
import org.opensearch.sql.expression.Expression;
@@ -101,7 +93,7 @@
10193
import org.opensearch.sql.planner.logical.LogicalRename;
10294
import org.opensearch.sql.planner.logical.LogicalSort;
10395
import org.opensearch.sql.planner.logical.LogicalValues;
104-
import org.opensearch.sql.planner.physical.catalog.CatalogTable;
96+
import org.opensearch.sql.planner.physical.datasource.DataSourceTable;
10597
import org.opensearch.sql.storage.Table;
10698
import org.opensearch.sql.utils.ParseUtils;
10799

@@ -117,7 +109,7 @@ public class Analyzer extends AbstractNodeVisitor<LogicalPlan, AnalysisContext>
117109

118110
private final NamedExpressionAnalyzer namedExpressionAnalyzer;
119111

120-
private final CatalogService catalogService;
112+
private final DataSourceService dataSourceService;
121113

122114
private final BuiltinFunctionRepository repository;
123115

@@ -126,10 +118,10 @@ public class Analyzer extends AbstractNodeVisitor<LogicalPlan, AnalysisContext>
126118
*/
127119
public Analyzer(
128120
ExpressionAnalyzer expressionAnalyzer,
129-
CatalogService catalogService,
121+
DataSourceService dataSourceService,
130122
BuiltinFunctionRepository repository) {
131123
this.expressionAnalyzer = expressionAnalyzer;
132-
this.catalogService = catalogService;
124+
this.dataSourceService = dataSourceService;
133125
this.selectExpressionAnalyzer = new SelectExpressionAnalyzer(expressionAnalyzer);
134126
this.namedExpressionAnalyzer = new NamedExpressionAnalyzer(expressionAnalyzer);
135127
this.repository = repository;
@@ -142,25 +134,27 @@ public LogicalPlan analyze(UnresolvedPlan unresolved, AnalysisContext context) {
142134
@Override
143135
public LogicalPlan visitRelation(Relation node, AnalysisContext context) {
144136
QualifiedName qualifiedName = node.getTableQualifiedName();
145-
Set<String> allowedCatalogNames = catalogService.getCatalogs()
137+
Set<String> allowedDataSourceNames = dataSourceService.getDataSources()
146138
.stream()
147-
.map(Catalog::getName)
139+
.map(DataSource::getName)
148140
.collect(Collectors.toSet());
149-
CatalogSchemaIdentifierNameResolver catalogSchemaIdentifierNameResolver
150-
= new CatalogSchemaIdentifierNameResolver(qualifiedName.getParts(), allowedCatalogNames);
151-
String tableName = catalogSchemaIdentifierNameResolver.getIdentifierName();
141+
DataSourceSchemaIdentifierNameResolver dataSourceSchemaIdentifierNameResolver
142+
= new DataSourceSchemaIdentifierNameResolver(qualifiedName.getParts(),
143+
allowedDataSourceNames);
144+
String tableName = dataSourceSchemaIdentifierNameResolver.getIdentifierName();
152145
context.push();
153146
TypeEnvironment curEnv = context.peek();
154147
Table table;
155-
if (CATALOGS_TABLE_NAME.equals(tableName)) {
156-
table = new CatalogTable(catalogService);
148+
if (DATASOURCES_TABLE_NAME.equals(tableName)) {
149+
table = new DataSourceTable(dataSourceService);
157150
} else {
158-
table = catalogService
159-
.getCatalog(catalogSchemaIdentifierNameResolver.getCatalogName())
151+
table = dataSourceService
152+
.getDataSource(dataSourceSchemaIdentifierNameResolver.getDataSourceName())
160153
.getStorageEngine()
161-
.getTable(new CatalogSchemaName(catalogSchemaIdentifierNameResolver.getCatalogName(),
162-
catalogSchemaIdentifierNameResolver.getSchemaName()),
163-
catalogSchemaIdentifierNameResolver.getIdentifierName());
154+
.getTable(new DataSourceSchemaName(
155+
dataSourceSchemaIdentifierNameResolver.getDataSourceName(),
156+
dataSourceSchemaIdentifierNameResolver.getSchemaName()),
157+
dataSourceSchemaIdentifierNameResolver.getIdentifierName());
164158
}
165159
table.getFieldTypes().forEach((k, v) -> curEnv.define(new Symbol(Namespace.FIELD_NAME, k), v));
166160

@@ -188,28 +182,29 @@ public LogicalPlan visitRelationSubquery(RelationSubquery node, AnalysisContext
188182
@Override
189183
public LogicalPlan visitTableFunction(TableFunction node, AnalysisContext context) {
190184
QualifiedName qualifiedName = node.getFunctionName();
191-
Set<String> allowedCatalogNames = catalogService.getCatalogs()
185+
Set<String> allowedDataSourceNames = dataSourceService.getDataSources()
192186
.stream()
193-
.map(Catalog::getName)
187+
.map(DataSource::getName)
194188
.collect(Collectors.toSet());
195-
CatalogSchemaIdentifierNameResolver catalogSchemaIdentifierNameResolver
196-
= new CatalogSchemaIdentifierNameResolver(qualifiedName.getParts(), allowedCatalogNames);
189+
DataSourceSchemaIdentifierNameResolver dataSourceSchemaIdentifierNameResolver
190+
= new DataSourceSchemaIdentifierNameResolver(qualifiedName.getParts(),
191+
allowedDataSourceNames);
197192

198193
FunctionName functionName
199-
= FunctionName.of(catalogSchemaIdentifierNameResolver.getIdentifierName());
194+
= FunctionName.of(dataSourceSchemaIdentifierNameResolver.getIdentifierName());
200195
List<Expression> arguments = node.getArguments().stream()
201196
.map(unresolvedExpression -> this.expressionAnalyzer.analyze(unresolvedExpression, context))
202197
.collect(Collectors.toList());
203198
TableFunctionImplementation tableFunctionImplementation
204199
= (TableFunctionImplementation) repository.compile(
205-
catalogSchemaIdentifierNameResolver.getCatalogName(), functionName, arguments);
200+
dataSourceSchemaIdentifierNameResolver.getDataSourceName(), functionName, arguments);
206201
context.push();
207202
TypeEnvironment curEnv = context.peek();
208203
Table table = tableFunctionImplementation.applyArguments();
209204
table.getFieldTypes().forEach((k, v) -> curEnv.define(new Symbol(Namespace.FIELD_NAME, k), v));
210205
curEnv.define(new Symbol(Namespace.INDEX_NAME,
211-
catalogSchemaIdentifierNameResolver.getIdentifierName()), STRUCT);
212-
return new LogicalRelation(catalogSchemaIdentifierNameResolver.getIdentifierName(),
206+
dataSourceSchemaIdentifierNameResolver.getIdentifierName()), STRUCT);
207+
return new LogicalRelation(dataSourceSchemaIdentifierNameResolver.getIdentifierName(),
213208
tableFunctionImplementation.applyArguments());
214209
}
215210

core/src/main/java/org/opensearch/sql/analysis/CatalogSchemaIdentifierNameResolver.java renamed to core/src/main/java/org/opensearch/sql/analysis/DataSourceSchemaIdentifierNameResolver.java

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,51 +10,53 @@
1010
import java.util.List;
1111
import java.util.Set;
1212

13-
public class CatalogSchemaIdentifierNameResolver {
13+
public class DataSourceSchemaIdentifierNameResolver {
1414

15-
public static final String DEFAULT_CATALOG_NAME = "@opensearch";
15+
public static final String DEFAULT_DATASOURCE_NAME = "@opensearch";
1616
public static final String DEFAULT_SCHEMA_NAME = "default";
1717
public static final String INFORMATION_SCHEMA_NAME = "information_schema";
1818

19-
private String catalogName = DEFAULT_CATALOG_NAME;
19+
private String dataSourceName = DEFAULT_DATASOURCE_NAME;
2020
private String schemaName = DEFAULT_SCHEMA_NAME;
2121
private String identifierName;
2222

2323
private static final String DOT = ".";
2424

2525
/**
26-
* Data model for capturing catalog, schema and identifier from
26+
* Data model for capturing dataSourceName, schema and identifier from
2727
* fully qualifiedName. In the current state, it is used to capture
28-
* CatalogSchemaTable name and CatalogSchemaFunction in case of table
28+
* DataSourceSchemaTable name and DataSourceSchemaFunction in case of table
2929
* functions.
3030
*
3131
* @param parts parts of qualifiedName.
32-
* @param allowedCatalogs allowedCatalogs.
32+
* @param allowedDataSources allowedDataSources.
3333
*/
34-
public CatalogSchemaIdentifierNameResolver(List<String> parts, Set<String> allowedCatalogs) {
35-
List<String> remainingParts = captureSchemaName(captureCatalogName(parts, allowedCatalogs));
34+
public DataSourceSchemaIdentifierNameResolver(List<String> parts,
35+
Set<String> allowedDataSources) {
36+
List<String> remainingParts
37+
= captureSchemaName(captureDataSourceName(parts, allowedDataSources));
3638
identifierName = String.join(DOT, remainingParts);
3739
}
3840

3941
public String getIdentifierName() {
4042
return identifierName;
4143
}
4244

43-
public String getCatalogName() {
44-
return catalogName;
45+
public String getDataSourceName() {
46+
return dataSourceName;
4547
}
4648

4749
public String getSchemaName() {
4850
return schemaName;
4951
}
5052

5153

52-
// Capture catalog name and return remaining parts(schema name and table name)
54+
// Capture datasource name and return remaining parts(schema name and table name)
5355
// from the fully qualified name.
54-
private List<String> captureCatalogName(List<String> parts, Set<String> allowedCatalogs) {
55-
if (parts.size() > 1 && allowedCatalogs.contains(parts.get(0))
56-
|| DEFAULT_CATALOG_NAME.equals(parts.get(0))) {
57-
catalogName = parts.get(0);
56+
private List<String> captureDataSourceName(List<String> parts, Set<String> allowedDataSources) {
57+
if (parts.size() > 1 && allowedDataSources.contains(parts.get(0))
58+
|| DEFAULT_DATASOURCE_NAME.equals(parts.get(0))) {
59+
dataSourceName = parts.get(0);
5860
return parts.subList(1, parts.size());
5961
} else {
6062
return parts;

core/src/main/java/org/opensearch/sql/ast/tree/Relation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public String getAlias() {
7373

7474
/**
7575
* Get Qualified name preservs parts of the user given identifiers.
76-
* This can later be utilized to determine Catalog,Schema and Table Name during
76+
* This can later be utilized to determine DataSource,Schema and Table Name during
7777
* Analyzer stage. So Passing QualifiedName directly to Analyzer Stage.
7878
*
7979
* @return TableQualifiedName.

core/src/main/java/org/opensearch/sql/catalog/CatalogService.java

Lines changed: 0 additions & 40 deletions
This file was deleted.
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.datasource;
7+
8+
import java.util.Set;
9+
import org.opensearch.sql.datasource.model.DataSource;
10+
import org.opensearch.sql.storage.StorageEngine;
11+
12+
/**
13+
* DataSource Service manages datasources.
14+
*/
15+
public interface DataSourceService {
16+
17+
/**
18+
* Returns all datasource objects.
19+
*
20+
* @return DataSource datasources.
21+
*/
22+
Set<DataSource> getDataSources();
23+
24+
/**
25+
* Returns DataSource with corresponding to the datasource name.
26+
*
27+
* @param dataSourceName Name of the datasource.
28+
* @return DataSource datasource.
29+
*/
30+
DataSource getDataSource(String dataSourceName);
31+
32+
/**
33+
* Default opensearch engine is not defined in datasources config.
34+
* So the registration of default datasource happens separately.
35+
*
36+
* @param storageEngine StorageEngine.
37+
*/
38+
void registerDefaultOpenSearchDataSource(StorageEngine storageEngine);
39+
40+
}

core/src/main/java/org/opensearch/sql/catalog/model/ConnectorType.java renamed to core/src/main/java/org/opensearch/sql/datasource/model/ConnectorType.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package org.opensearch.sql.catalog.model;
6+
package org.opensearch.sql.datasource.model;
77

88
public enum ConnectorType {
99
PROMETHEUS,OPENSEARCH

core/src/main/java/org/opensearch/sql/catalog/model/Catalog.java renamed to core/src/main/java/org/opensearch/sql/datasource/model/DataSource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
*
66
*/
77

8-
package org.opensearch.sql.catalog.model;
8+
package org.opensearch.sql.datasource.model;
99

1010
import lombok.EqualsAndHashCode;
1111
import lombok.Getter;
@@ -15,7 +15,7 @@
1515
@Getter
1616
@RequiredArgsConstructor
1717
@EqualsAndHashCode
18-
public class Catalog {
18+
public class DataSource {
1919

2020
private final String name;
2121

core/src/main/java/org/opensearch/sql/catalog/model/CatalogMetadata.java renamed to core/src/main/java/org/opensearch/sql/datasource/model/DataSourceMetadata.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package org.opensearch.sql.catalog.model;
6+
package org.opensearch.sql.datasource.model;
77

88
import com.fasterxml.jackson.annotation.JsonFormat;
99
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@@ -15,7 +15,7 @@
1515
@JsonIgnoreProperties(ignoreUnknown = true)
1616
@Getter
1717
@Setter
18-
public class CatalogMetadata {
18+
public class DataSourceMetadata {
1919

2020
@JsonProperty(required = true)
2121
private String name;

0 commit comments

Comments
 (0)