Skip to content

[CDAP-21172]Added cdap-metadata-ext-spanner Module with ExtensionLoader #15959

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2114,9 +2114,13 @@ public static final class Metadata {
public static final String STORAGE_PROVIDER_IMPLEMENTATION = "metadata.storage.implementation";
public static final String STORAGE_PROVIDER_NOSQL = "nosql";
public static final String STORAGE_PROVIDER_ELASTICSEARCH = "elastic";
public static final String STORAGE_PROVIDER_SPANNER = "gcp-spanner";

public static final String METADATA_WRITER_SUBSCRIBER = "metadata.writer";
public static final String METADATA_CONSUMER_WRITER_SUBSCRIBER = "metadata.consumer.writer";

// Metadata configs
public static final String METADATA_STORAGE_EXT_DIR = "metadata.storage.extensions.dir";
}

/**
Expand Down
5 changes: 5 additions & 0 deletions cdap-common/src/main/resources/cdap-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2845,6 +2845,11 @@
<value>/opt/cdap/master/ext/log-publisher</value>
</property>

<property>
<name>metadata.storage.extensions.dir</name>
<value>/opt/cdap/master/ext/metadata-storage</value>
</property>

<!-- Metrics Configuration -->

<property>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.cdap.cdap.data2.registry.UsageWriter;
import io.cdap.cdap.metadata.elastic.ElasticsearchMetadataStorage;
import io.cdap.cdap.security.impersonation.OwnerStore;
import io.cdap.cdap.spi.metadata.DelegatingMetadataStorage;
import io.cdap.cdap.spi.metadata.MetadataStorage;
import io.cdap.cdap.spi.metadata.dataset.DatasetMetadataStorage;
import io.cdap.cdap.spi.metadata.noop.NoopMetadataStorage;
Expand Down Expand Up @@ -178,11 +179,21 @@ public MetadataStorage get() {
if (Constants.Metadata.STORAGE_PROVIDER_NOSQL.equalsIgnoreCase(config)) {
return injector.getInstance(DatasetMetadataStorage.class);
}

// TODO (CDAP-21173): Load elastic search implementation using DelegatingMetadataStorage
if (Constants.Metadata.STORAGE_PROVIDER_ELASTICSEARCH.equalsIgnoreCase(config)) {
return injector.getInstance(ElasticsearchMetadataStorage.class);
}
throw new IllegalArgumentException("Unsupported MetadataStorage '" + config + "'. Only '"
+ Constants.Metadata.STORAGE_PROVIDER_NOSQL + "' and '"
+ Constants.Metadata.STORAGE_PROVIDER_ELASTICSEARCH + "' are allowed.");
if (Constants.Metadata.STORAGE_PROVIDER_SPANNER.equalsIgnoreCase(config)) {
return injector.getInstance(DelegatingMetadataStorage.class);
}
String errorMessage = String.format(
"Unsupported MetadataStorage '%s'. Only '%s', '%s' and '%s' are allowed.",
config,
Constants.Metadata.STORAGE_PROVIDER_NOSQL,
Constants.Metadata.STORAGE_PROVIDER_SPANNER,
Constants.Metadata.STORAGE_PROVIDER_ELASTICSEARCH
);
throw new IllegalArgumentException(errorMessage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ public void setAuditPublisher(AuditPublisher auditPublisher) {
this.auditPublisher = auditPublisher;
}

@Override
public String getName() {
return getClass().getSimpleName();
}

@Override
public void createIndex() throws IOException {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright © 2025 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.spi.metadata;

import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import java.util.Collections;
import java.util.Map;

// TODO(CDAP-21174): Create metadata storage specify properties
/**
* Default implementation of the {@link MetadataStorageContext}.
*/
public class DefaultMetadataStorageContext implements MetadataStorageContext {

private static final String storageImpl = "gcp-spanner";
private final Map<String, String> properties;

protected DefaultMetadataStorageContext(CConfiguration cConf, String storageName) {
String propertiesPrefix =
Constants.Dataset.STORAGE_EXTENSION_PROPERTY_PREFIX + storageImpl + ".";
this.properties = Collections.unmodifiableMap(cConf.getPropsWithPrefix(propertiesPrefix));
}

@Override
public Map<String, String> getProperties() {
return properties;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright © 2025 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.spi.metadata;

import com.google.inject.Inject;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import java.io.IOException;
import java.util.List;

/**
* Delegates {@link io.cdap.cdap.spi.metadata.MetadataStorage} based on configured extension.
*/
public class DelegatingMetadataStorage implements MetadataStorage {
private final CConfiguration cConf;
private final MetadataStorage delegate;

@Inject
DelegatingMetadataStorage(CConfiguration cConf, MetadataStorageExtensionLoader extensionLoader) throws Exception {
this.cConf = cConf;

this.delegate = extensionLoader.get(getName());

if (this.delegate == null) {
throw new IllegalArgumentException("Unsupported MetadataProvider type: " + getName());
}
this.delegate.initialize(new DefaultMetadataStorageContext(cConf, getName()));
}

@Override
public void createIndex() throws IOException {
delegate.createIndex();
}

@Override
public void close() {
if (delegate != null) {
delegate.close();
}
}

@Override
public String getName() {
return cConf.get(Constants.Metadata.STORAGE_PROVIDER_IMPLEMENTATION);
}

@Override
public void dropIndex() throws IOException {
delegate.dropIndex();
}

@Override
public MetadataChange apply(MetadataMutation mutation, MutationOptions options)
throws IOException {
return delegate.apply(mutation, options);
}

@Override
public List<MetadataChange> batch(List<? extends MetadataMutation> mutations,
MutationOptions options) throws IOException {
return delegate.batch(mutations, options);
}

@Override
public Metadata read(Read read) throws IOException {
return delegate.read(read);
}

@Override
public SearchResponse search(SearchRequest request)
throws IOException {
return delegate.search(request);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright © 2025 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.spi.metadata;

import com.google.inject.Inject;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.lang.ClassPathResources;
import io.cdap.cdap.common.lang.FilterClassLoader;
import io.cdap.cdap.extension.AbstractExtensionLoader;
import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Extension loader for {@link MetadataStorage} implementations.
*/
public class MetadataStorageExtensionLoader extends AbstractExtensionLoader<String, MetadataStorage> {

private static final Logger LOG = LoggerFactory.getLogger(MetadataStorageExtensionLoader.class);
private static final Set<String> ALLOWED_RESOURCES = createAllowedResources();
private static final Set<String> ALLOWED_PACKAGES = createPackageSets(ALLOWED_RESOURCES);

@Inject
public MetadataStorageExtensionLoader(CConfiguration cConf) {
super(cConf.get(Constants.Metadata.METADATA_STORAGE_EXT_DIR));
}

private static Set<String> createAllowedResources() {
try {
return ClassPathResources.getResourcesWithDependencies(MetadataStorage.class.getClassLoader(),
MetadataStorage.class);
} catch (IOException e) {
throw new RuntimeException("Failed to trace dependencies for MetadataStorage extension.", e);
}
}

@Override
protected Set<String> getSupportedTypesForProvider(MetadataStorage metadataStorage) {
return Collections.singleton(metadataStorage.getName());
}

@Override
protected FilterClassLoader.Filter getExtensionParentClassLoaderFilter() {
return new FilterClassLoader.Filter() {
@Override
public boolean acceptResource(String resource) {
return ALLOWED_RESOURCES.contains(resource);
}

@Override
public boolean acceptPackage(String packageName) {
return ALLOWED_PACKAGES.contains(packageName);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ public class DatasetMetadataStorage extends SearchHelper implements MetadataStor
super(txClient, tableDefinition);
}

@Override
public String getName(){
return getClass().getSimpleName();
}

@Override
public void createIndex() throws IOException {
createDatasets();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
*/
public class NoopMetadataStorage implements MetadataStorage {

@Override
public String getName() {
return getClass().getSimpleName();
}

@Override
public void createIndex() throws IOException {
// no-op
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,20 @@

package io.cdap.cdap.metadata.elastic;

import io.cdap.cdap.spi.metadata.ChangeRequest;
import io.cdap.cdap.spi.metadata.MetadataChange;
import org.elasticsearch.action.support.WriteRequest;

/**
* A simple class to pass around an Elasticsearch index write request, along with the metadata
* change that it effects.
*/
public class RequestAndChange {
public class ElasticChangeRequest implements ChangeRequest {

private final WriteRequest<?> request;
private final MetadataChange change;

public RequestAndChange(WriteRequest<?> request, MetadataChange change) {
public ElasticChangeRequest(WriteRequest<?> request, MetadataChange change) {
this.request = request;
this.change = change;
}
Expand All @@ -37,6 +38,7 @@ public WriteRequest<?> getRequest() {
return request;
}

@Override
public MetadataChange getChange() {
return change;
}
Expand Down
Loading
Loading