Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support prefetching plugins metadata from an http api #5887

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package nextflow.plugin

import com.google.gson.Gson
import dev.failsafe.Failsafe
import dev.failsafe.FailsafeExecutor
import dev.failsafe.Fallback
import dev.failsafe.RetryPolicy
import dev.failsafe.event.EventListener
import dev.failsafe.event.ExecutionAttemptedEvent
import dev.failsafe.function.CheckedSupplier
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.BuildInfo
import org.pf4j.update.FileDownloader
import org.pf4j.update.FileVerifier
import org.pf4j.update.PluginInfo
import org.pf4j.update.SimpleFileDownloader
import org.pf4j.update.verifier.CompoundVerifier

import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse

/**
* Represents an update repository served via an HTTP api.
*
* It implements PrefetchUpdateRepository so that all relevant
* plugin metadata can be loaded with a single HTTP request, rather
* than a request-per-plugin.
*
* Metadata is prefetched into memory when Nextflow starts and expires
* upon termination (or when 'refresh()' is called).
*/
@Slf4j
@CompileStatic
class HttpPluginRepository implements PrefetchUpdateRepository {
private final HttpClient client = HttpClient.newHttpClient()
private final String id
private final URI url

private Map<String, PluginInfo> plugins = new HashMap<>()

HttpPluginRepository(String id, URI url) {
this.id = id
this.url = url
}

// NOTE ON PREFETCHING
//
// The prefetch mechanism is used to work around a limitation in the
// UpdateRepository interface from pf4j.
//
// Specifically, p4fj expects that getPlugins() returns a Map<> of all
// metadata about all plugins. To implement this for an HTTP repository
// would require either downloading the entire contents of the remote
// repository or implementing a lazy map and making an HTTP request for
// each required plugin.
//
// Instead we can use the list of configured plugins to load all relevant
// metadata in a single HTTP request at startup, and use this to populate
// the map. Once the prefetch is complete, this repository will behave
// like any other implementation of UpdateRepository.
@Override
void prefetch(List<PluginSpec> plugins) {
if (plugins && !plugins.isEmpty()) {
this.plugins = fetchMetadata(plugins)
}
}

@Override
String getId() {
return id
}

@Override
URL getUrl() {
return url.toURL()
}

@Override
Map<String, PluginInfo> getPlugins() {
if (plugins.isEmpty()) {
log.warn "getPlugins() called before prefetch() - plugins map will be empty"
return Map.of()
}
return Collections.unmodifiableMap(plugins)
}

@Override
PluginInfo getPlugin(String id) {
return plugins.computeIfAbsent(id) { key -> fetchMetadataByIds([key]).get(key) }
}

@Override
void refresh() {
plugins = fetchMetadataByIds(plugins.keySet())
}

@Override
FileDownloader getFileDownloader() {
return new SimpleFileDownloader()
}

@Override
FileVerifier getFileVerifier() {
return new CompoundVerifier()
}

// ----------------------------------------------------------------------------
// http handling

private Map<String, PluginInfo> fetchMetadataByIds(Collection<String> ids) {
def specs = ids.collect(id -> new PluginSpec(id, null))
return fetchMetadata(specs)
}

private Map<String, PluginInfo> fetchMetadata(Collection<PluginSpec> specs) {
final ordered = specs.sort(false)
final CheckedSupplier<Map<String, PluginInfo>> supplier = () -> fetchMetadata0(ordered)
return retry().get(supplier)
}

private Map<String, PluginInfo> fetchMetadata0(List<PluginSpec> specs) {
final gson = new Gson()

def reqBody = gson.toJson([
'nextflowVersion': BuildInfo.version,
'plugins' : specs
])

def req = HttpRequest.newBuilder()
.uri(url.resolve("plugins/collect"))
.POST(HttpRequest.BodyPublishers.ofString(reqBody))
.build()

def rep = client.send(req, HttpResponse.BodyHandlers.ofString())
def repBody = gson.fromJson(rep.body(), FetchResponse)
return repBody.plugins.collectEntries { p -> Map.entry(p.id, p) }
}

// create a retry executor using failsafe
private static FailsafeExecutor retry() {
EventListener<ExecutionAttemptedEvent> logAttempt = (ExecutionAttemptedEvent attempt) -> {
log.debug("Retrying download of plugins metadata - attempt ${attempt.attemptCount}, ${attempt.lastFailure.message}", attempt.lastFailure)
}
Fallback fallback = Fallback.ofException { e ->
new ConnectException("Failed to download plugins metadata")
}
final policy = RetryPolicy.builder()
.withMaxAttempts(3)
.handle(ConnectException)
.onRetry(logAttempt)
.build()
return Failsafe.with(fallback, policy)
}

// ---------------------

/**
* Response format object expected from repository
*/
private static class FetchResponse {
List<PluginInfo> plugins
}
}
18 changes: 17 additions & 1 deletion modules/nf-commons/src/main/nextflow/plugin/PluginUpdater.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,11 @@ class PluginUpdater extends UpdateManager {
result.add(new LocalUpdateRepository('downloaded', local))
}
else {
result.add(new DefaultUpdateRepository('nextflow.io', remote))
def remoteRepo = remote.path.endsWith('.json')
? new DefaultUpdateRepository('nextflow.io', remote)
: new HttpPluginRepository('registry', remote.toURI())

result.add(remoteRepo)
result.addAll(customRepos())
}
return result
Expand Down Expand Up @@ -138,6 +142,18 @@ class PluginUpdater extends UpdateManager {
return new DefaultUpdateRepository('uri', new URL(uri), fileName)
}

/**
* Prefetch metadata for plugins. This gives an opportunity for certain
* repository types to perform some data-loading optimisations.
*/
void prefetchMetadata(List<PluginSpec> plugins) {
for( def repo : this.@repositories ) {
if( repo instanceof PrefetchUpdateRepository ) {
repo.prefetch(plugins)
}
}
}

/**
* Resolve a plugin installing or updating the dependencies if necessary
* and start the plugin
Expand Down
57 changes: 38 additions & 19 deletions modules/nf-commons/src/main/nextflow/plugin/PluginsFacade.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,25 @@ class PluginsFacade implements PluginStateListener {
private PluginUpdater updater
private CustomPluginManager manager
private DefaultPlugins defaultPlugins = DefaultPlugins.INSTANCE
private String indexUrl = Plugins.DEFAULT_PLUGINS_REPO
private String indexUrl
private boolean embedded

PluginsFacade() {
mode = getPluginsMode()
root = getPluginsDir()
indexUrl = getPluginsIndexUrl()
offline = env.get('NXF_OFFLINE') == 'true'
if( mode==DEV_MODE && root.toString()=='plugins' && !isRunningFromDistArchive() )
root = detectPluginsDevRoot()
System.setProperty('pf4j.mode', mode)
}

PluginsFacade(Path root, String mode=PROD_MODE, boolean offline=false) {
PluginsFacade(Path root, String mode=PROD_MODE, boolean offline=false,
String indexUrl=Plugins.DEFAULT_PLUGINS_REPO) {
this.mode = mode
this.root = root
this.offline = offline
this.indexUrl = indexUrl
System.setProperty('pf4j.mode', mode)
}

Expand Down Expand Up @@ -95,6 +98,28 @@ class PluginsFacade implements PluginStateListener {
}
}

protected String getPluginsIndexUrl() {
final url = env.get('NXF_PLUGINS_INDEX_URL')
if( url ) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally there should not be any warning if the url is the current (github) index or the uri host is *.seqera.io

log.trace "Detected NXF_PLUGINS_INDEX_URL=$url"
// warn that this is experimental behaviour
log.warn """\
=======================================================================
= WARNING =
= You are running this script using a custom plugins index url. =
= =
= ${url}
= =
= This is an experimental feature and should not be used in production. =
=============================================================================
""".stripIndent(true)
return url
} else {
log.trace "Using default plugins url"
return Plugins.DEFAULT_PLUGINS_REPO
}
}

private boolean isNextflowDevRoot(File file) {
file.name=='nextflow' && file.isDirectory() && new File(file, 'settings.gradle').isFile()
}
Expand Down Expand Up @@ -320,27 +345,21 @@ class PluginsFacade implements PluginStateListener {
new DefaultPluginManager()
}

void start( String pluginId ) {
if( isEmbedded() && defaultPlugins.hasPlugin(pluginId) ) {
log.debug "Plugin 'start' is not required in embedded mode -- ignoring for plugin: $pluginId"
return
}

start(PluginSpec.parse(pluginId, defaultPlugins))
void start(String pluginId) {
start([PluginSpec.parse(pluginId, defaultPlugins)])
}

void start(PluginSpec plugin) {
if( isEmbedded() && defaultPlugins.hasPlugin(plugin.id) ) {
log.debug "Plugin 'start' is not required in embedded mode -- ignoring for plugin: $plugin.id"
return
void start(List<PluginSpec> specs) {
def split = specs.split { plugin -> isEmbedded() && defaultPlugins.hasPlugin(plugin.id) }
def (skip, toStart) = [split[0], split[1]]
if( !skip.isEmpty() ) {
def skippedIds = skip.collect{ plugin -> plugin.id }
log.debug "Plugin 'start' is not required in embedded mode -- ignoring for plugins: $skippedIds"
}

updater.prepareAndStart(plugin.id, plugin.version)
}

void start(List<PluginSpec> specs) {
for( PluginSpec it : specs ) {
start(it)
updater.prefetchMetadata(toStart)
for( PluginSpec plugin : toStart ) {
updater.prepareAndStart(plugin.id, plugin.version)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package nextflow.plugin

import groovy.transform.CompileStatic
import org.pf4j.update.UpdateRepository

/**
* Extension to pf4j's UpdateRepository which supports pre-fetching
* metadata for a specified set of plugins.
*
* This gives the ability to avoid downloading metadata for unused
* plugins.
*/
@CompileStatic
interface PrefetchUpdateRepository extends UpdateRepository {
/**
* This will be called when Nextflow starts, before
* initialising the plugins.
*/
void prefetch(List<PluginSpec> plugins)
}