-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimize built-in source/sink startup by eliminating redundant NAR unpacking and checksum calculation #9413
Optimize built-in source/sink startup by eliminating redundant NAR unpacking and checksum calculation #9413
Conversation
/pulsarbot run-failure-checks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great work !
I left some minor comments
|
||
@Test | ||
@Test(timeOut = 20000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure we need these timeouts,
if the test breaks then it is not likely that other tests will run.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The individual timeout is in place so we don't just block for 1.5-2 hour which is the absolute test timeout and consume resources for that time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We centrally set 300 seconds timeouts for all the tests that don't have a timeout
@@ -354,6 +377,8 @@ private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration conf | |||
workerConfig.setAuthenticationEnabled(true); | |||
workerConfig.setAuthorizationEnabled(true); | |||
|
|||
workerConfig.setConnectorsDirectory(Files.createTempDirectory("test").toFile().getAbsolutePath()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about naming the directory "tempconnectorsdir" ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
try { | ||
sinkClassName = ConnectorUtils.getIOSinkClass(narClassLoader); | ||
sinkClassName = ConnectorUtils.getIOSinkClass((NarClassLoader) sinkClassLoader); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we are blindly casting to NarClassLoader
can we change the type of the argument ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sinkClassLoader
may not be an instance of NarClassLoader. In the line of code, if the sinkClassName
is not specified, we are only trying to see if we can find a className specified in the NAR.
LGTM |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except some minor comments
public static List<ConfigFieldDefinition> getConnectorConfigDefinition(String narPath, | ||
String configClassName, | ||
String narExtractionDirectory) throws Exception { | ||
public static List<ConfigFieldDefinition> getConnectorConfigDefinition(ClassLoader narClassLoader, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean NarClassLoader narClassLoader
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need a NarClassLoader. A ClassLoader should suffice. I will rename the variable
throws IOException, ClassNotFoundException { | ||
try (NarClassLoader ncl = NarClassLoader.getFromArchive(archive, Collections.emptySet(), workerConfig.getNarExtractionDirectory())) { | ||
String typeArg = getSourceType(className, ncl).getName(); | ||
private void fillSourceTypeClass(FunctionDetails.Builder functionDetails, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method and the following fillSinkTypeClass
method looks very similar, wondering if we could implement them into one single method to reduce code duplication.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure how much code we can save here because fillSourceTypeClass() sets the typeClassName for the source and checks if a sinkSpec exists and set the typeClassName for that as well. The logic for fillSinkTypeClass() is the reverse.
Also, since this refactor is already large, I don't want to make more changes in the code. If we want to refactor more, let's do it in a subsequent PR.
} | ||
|
||
} else { | ||
// if connector class name is provided, we need to try to load it as a JAR and as a NAR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/and/or
File packageFile, | ||
String narExtractionDirectory) { | ||
String connectorClassName = className; | ||
ClassLoader classLoader; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of having this variable and return it at the end. I suggest return the classLoader at the end of each if/else clause. Easier to read
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
if (jarClassLoaderException != null) { | ||
errorMsg.append("Attempts to load it as a JAR package produced error: " + jarClassLoaderException.getMessage()); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you want to add some linebreak or delimiter between these two error message for better reading?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can add a space
/pulsarbot run-failure-checks |
dd2082a
to
a5b7f41
Compare
…packing and checksum calculation (apache#9413) Co-authored-by: Jerry Peng <jerryp@splunk.com>
…y usage - Pulsar loads all nar files to memory after apache/pulsar#9413
I reported an issue #12974 since Functions worker now consume more resident memory after these changes. |
* Reduce CI memory usage * Limit MaxDirectMemorySize to 256m on bookkeeper, broker and function worker * Reduce limits even further because of 143 exit code * Add logging to loading of images to kind * Use single worker node for CI kind cluster * Use datastax/lunastreaming image for functions worker to reduce memory usage - Pulsar loads all nar files to memory after apache/pulsar#9413 * Add solution for skipping yq installation (for local development) * Bump k8s version used in kind * Add instructions how to run locally * Allow less diskspace for CI test run
Motivation
Currently, when a user submits a built-in source or sink to be run, the locally stored NAR file of the built-in source and sink is unpacked or MD5 checksums calculated and loaded into a classloader up to 5 times during the startup and execution process. This is redundant and unnecessary waste of resources. We should just unpack one and cache the results. I have noticed multi-second instance start up latencies because of this.
Modifications
Verifying this change
Add multiple tests for built-in sources and sinks