Skip to content

Commit

Permalink
fix: remove unnecessary polling from exec call and cache configManage…
Browse files Browse the repository at this point in the history
…r for cli args

Signed-off-by: Lenin Mehedy <lenin.mehedy@swirldslabs.com>
  • Loading branch information
leninmehedy committed Jan 17, 2024
1 parent d51cdfa commit 02fdaa8
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 139 deletions.
171 changes: 61 additions & 110 deletions fullstack-network-manager/src/core/kubectl2.mjs
Original file line number Diff line number Diff line change
@@ -1,71 +1,29 @@
import * as k8s from '@kubernetes/client-node'
import net from 'net'
import path from 'path'
import { flags } from '../commands/index.mjs'
import { FullstackTestingError, MissingArgumentError } from './errors.mjs'
import * as sb from 'stream-buffers'
import * as helpers from './helpers.mjs'

/**
* A kubectl wrapper class providing custom functionalities required by fsnetman
*
* This class isn't threadsafe
*/
export class Kubectl2 {
constructor () {
this._kubeConfig = new k8s.KubeConfig()
this._kubeConfig.loadFromDefault()
}
constructor (configManager, logger) {
if (!configManager) throw new MissingArgumentError('An instance of core/ConfigManager is required')
if (!logger) throw new MissingArgumentError('An instance of core/Logger is required')

/**
* Get current context object
*/
getCurrentContext () {
const name = this._kubeConfig.getCurrentContext()
return this._kubeConfig.getContextObject(name)
}
this.configManager = configManager
this.logger = logger

/**
* Set current context
* @param contextName context name
*/
setCurrentContext (contextName) {
if (!this._kubeConfig.getContextObject(contextName)) throw new FullstackTestingError(`context not found with name: ${name}`)

// set current context
this._kubeConfig.setCurrentContext(contextName)

this.kubeClient = null // reset client
this.init()
}

setCurrentNamespace (namespace) {
this._namespace = namespace
}

getCurrentNamespace () {
return this._namespace
}

_initChecks () {
if (!this._kubeConfig.getCurrentContext()) throw new FullstackTestingError('context is not set')
if (!this._namespace) throw new FullstackTestingError('namespace is not set')
}

_initKubeClient () {
this._initChecks()
if (!this._kubeClient) {
this._kubeClient = this._kubeConfig.makeApiClient(k8s.CoreV1Api)
}

return this._kubeClient
}

_initKubeCopy () {
this._initChecks()
if (!this._kubeCopy) {
this._kubeCopy = new k8s.Cp(this._kubeConfig)
}

return this._kubeCopy
init () {
this.kubeConfig = new k8s.KubeConfig()
this.kubeConfig.loadFromDefault()
this.kubeClient = this.kubeConfig.makeApiClient(k8s.CoreV1Api)
this.kubeCopy = new k8s.Cp(this.kubeConfig)
}

/**
Expand Down Expand Up @@ -125,7 +83,7 @@ export class Kubectl2 {
}
}

const resp = await this._initKubeClient().createNamespace(payload)
const resp = await this.kubeClient.createNamespace(payload)
return resp.response.statusCode === 201
}

Expand All @@ -135,7 +93,7 @@ export class Kubectl2 {
* @return {Promise<boolean>}
*/
async deleteNamespace (name) {
const resp = await this._initKubeClient().deleteNamespace(name)
const resp = await this.kubeClient.deleteNamespace(name)
return resp.response.statusCode === 200.0
}

Expand All @@ -144,7 +102,7 @@ export class Kubectl2 {
* @return {Promise<[string]>} list of namespaces
*/
async getNamespaces () {
const resp = await this._initKubeClient().listNamespace()
const resp = await this.kubeClient.listNamespace()
if (resp.body && resp.body.items) {
const namespaces = []
resp.body.items.forEach(item => {
Expand All @@ -163,9 +121,12 @@ export class Kubectl2 {
* @return {Promise<{}>} k8s.V1Pod object
*/
async getPodByName (name) {
const ns = this.configManager.flagValue(flags.namespace)
if (!ns) throw new MissingArgumentError('namespace is not set')

const fieldSelector = `metadata.name=${name}`
const resp = await this._initKubeClient().listNamespacedPod(
this.getCurrentNamespace(),
const resp = await this.kubeClient.listNamespacedPod(
ns,
undefined,
undefined,
undefined,
Expand All @@ -181,9 +142,9 @@ export class Kubectl2 {
* @returns {Promise<string>} podName IP
*/
async getPodIP (podNameName) {
const podName = await this.getPodByName(podNameName)
if (podName && podName.status && podName.status.hostIP) {
return podName.status.hostIP
const pod = await this.getPodByName(podNameName)
if (pod && pod.status && pod.status.hostIP) {
return pod.status.hostIP
}

throw new FullstackTestingError(`unable to find host IP of podName: ${podNameName}`)
Expand All @@ -195,9 +156,12 @@ export class Kubectl2 {
* @return {Promise<{}>} k8s.V1Service object
*/
async getSvcByName (name) {
const ns = this.configManager.flagValue(flags.namespace)
if (!ns) throw new MissingArgumentError('namespace is not set')

const fieldSelector = `metadata.name=${name}`
const resp = await this._initKubeClient().listNamespacedService(
this.getCurrentNamespace(),
const resp = await this.kubeClient.listNamespacedService(
ns,
undefined,
undefined,
undefined,
Expand Down Expand Up @@ -227,7 +191,7 @@ export class Kubectl2 {
*/
async getClusters () {
const clusters = []
for (const cluster of this._kubeConfig.getClusters()) {
for (const cluster of this.kubeConfig.getClusters()) {
clusters.push(cluster.name)
}

Expand All @@ -240,7 +204,7 @@ export class Kubectl2 {
*/
async getContexts () {
const contexts = []
for (const context of this._kubeConfig.getContexts()) {
for (const context of this.kubeConfig.getContexts()) {
contexts.push(context.name)
}

Expand Down Expand Up @@ -355,10 +319,13 @@ export class Kubectl2 {
* @returns {Promise<boolean>}
*/
async copyTo (podName, containerName, srcPath, destDir) {
const srcFile = path.basename(srcPath)
const srcDir = path.dirname(srcPath)
const ns = this.configManager.flagValue(flags.namespace)
if (!ns) throw new MissingArgumentError('namespace is not set')

try {
const srcFile = path.basename(srcPath)
const srcDir = path.dirname(srcPath)
await this._initKubeCopy().cpToPod(this.getCurrentNamespace(), podName, containerName, srcFile, destDir, srcDir)
await this.kubeCopy.cpToPod(ns, podName, containerName, srcFile, destDir, srcDir)
return true
} catch (e) {
throw new FullstackTestingError(`failed to copy file to container [pod: ${podName} container:${containerName}]: ${srcPath} -> ${destDir}: ${e.message}`, e)
Expand All @@ -377,10 +344,14 @@ export class Kubectl2 {
* @returns {Promise<boolean>}
*/
async copyFrom (podName, containerName, srcPath, destDir) {
const ns = this.configManager.flagValue(flags.namespace)
if (!ns) throw new MissingArgumentError('namespace is not set')

const srcFile = path.basename(srcPath)
const srcDir = path.dirname(srcPath)

try {
const srcFile = path.basename(srcPath)
const srcDir = path.dirname(srcPath)
await this._initKubeCopy().cpFromPod(this.getCurrentNamespace(), podName, containerName, srcFile, destDir, srcDir)
await this.kubeCopy.cpFromPod(ns, podName, containerName, srcFile, destDir, srcDir)
return true
} catch (e) {
throw new FullstackTestingError(`failed to copy file from container [pod: ${podName} container:${containerName}]: ${srcPath} -> ${destDir}: ${e.message}`, e)
Expand All @@ -397,17 +368,18 @@ export class Kubectl2 {
* @returns {Promise<string>} console output as string
*/
async getExecOutput (podName, containerName, command = [], timeoutMs = 1000) {
try {
if (!command) return ''
if (timeoutMs < 0 || timeoutMs === 0) throw MissingArgumentError('timeout cannot be negative or zero')
const ns = this.configManager.flagValue(flags.namespace)
if (!ns) throw new MissingArgumentError('namespace is not set')
if (!command) return ''
if (timeoutMs < 0 || timeoutMs === 0) throw MissingArgumentError('timeout cannot be negative or zero')

const execInstance = new k8s.Exec(this._kubeConfig)
return new Promise((resolve, reject) => {
const execInstance = new k8s.Exec(this.kubeConfig)
const outStream = new sb.WritableStreamBuffer()
const errStream = new sb.WritableStreamBuffer()
let execStatus = 'Failure'

await execInstance.exec(
this.getCurrentNamespace(),
execInstance.exec(
ns,
podName,
containerName,
command,
Expand All @@ -416,38 +388,15 @@ export class Kubectl2 {
null,
false,
({ status }) => {
execStatus = status
if (status === 'Failure' || errStream.size()) {
throw new FullstackTestingError(`Error - details: \n ${errStream.getContentsAsString()}`)
}
}
)

// we need to poll to get the output contents
return new Promise((resolve, reject) => {
let resolved = false

const pollFunc = () => {
if (execStatus === 'Success') {
resolved = true
resolve(outStream.getContentsAsString())
return true
reject(new FullstackTestingError(`Exec error:
'exec ${podName} -c ${containerName} -- ${command.join(' ')}'\n${errStream.getContentsAsString()}`))
}

return false
resolve(outStream.getContentsAsString())
}

const timeoutFunc = () => {
if (!resolved) {
reject(new FullstackTestingError(`timeout occurred during exec in '${podName}:${containerName}': ${command.join[' ']}`))
}
}

helpers.poll(pollFunc, timeoutFunc, 100, timeoutMs)
})
} catch (e) {
throw new FullstackTestingError(`error occurred during exec in '${podName}:${containerName}': ${command.join[' ']}`)
}
)
})
}

/**
Expand All @@ -459,12 +408,14 @@ export class Kubectl2 {
* @param podName pod name
* @param localPort local port
* @param podPort port of the pod
* @returns {Promise<net.Server>} return the instance of the server that caller must close
*/
async portForward (podName, localPort, podPort) {
const forwarder = new k8s.PortForward(this._kubeConfig, true)
const ns = this.configManager.flagValue(flags.namespace)
if (!ns) throw new MissingArgumentError('namespace is not set')

const forwarder = new k8s.PortForward(this.kubeConfig, true)
const server = net.createServer((socket) => {
forwarder.portForward(this.getCurrentNamespace(), podName, [podPort], socket, null, socket)
forwarder.portForward(ns, podName, [podPort], socket, null, socket)
})

return server.listen(localPort, '127.0.0.1')
Expand Down
49 changes: 20 additions & 29 deletions fullstack-network-manager/test/e2e/core/kubectl_e2e.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import os from 'os'
import path from 'path'
import { v4 as uuid4 } from 'uuid'
import { FullstackTestingError } from '../../../src/core/errors.mjs'
import * as helpers from '../../../src/core/helpers.mjs'
import { constants, Templates } from '../../../src/core/index.mjs'
import { ConfigManager, constants, logging, Templates } from '../../../src/core/index.mjs'
import { Kubectl2 } from '../../../src/core/kubectl2.mjs'

describe('Kubectl', () => {
const kubectl = new Kubectl2()
// kubectl.setCurrentContext(constants.CONTEXT_NAME)
kubectl.setCurrentNamespace(constants.NAMESPACE_NAME)
const testLogger = logging.NewLogger('debug')
const configManager = new ConfigManager(testLogger)
const kubectl = new Kubectl2(configManager, testLogger)
// configManager.load({namespace: constants.NAMESPACE_NAME})

it('should be able to list clusters', async () => {
const clusters = await kubectl.getClusters()
Expand Down Expand Up @@ -76,36 +76,27 @@ describe('Kubectl', () => {
fs.rmdirSync(tmpDir2, { recursive: true })
}, 10000)

it('should be able to port forward gossip port', async () => {
it('should be able to port forward gossip port', (done) => {
const podName = Templates.renderNetworkPodName('node0')
const localPort = constants.HEDERA_NODE_INTERNAL_GOSSIP_PORT
const server = await kubectl.portForward(podName, localPort, constants.HEDERA_NODE_INTERNAL_GOSSIP_PORT)
expect(server).not.toBeNull()

// client
const client = new net.Socket()
let connected = false
client.connect(localPort).on('connection', () => {
connected = true
})
kubectl.portForward(podName, localPort, constants.HEDERA_NODE_INTERNAL_GOSSIP_PORT).then((server) => {
expect(server).not.toBeNull()

const pollFunc = () => {
if (connected) {
client.end()
// client
const client = new net.Socket()
client.on('ready', () => {
client.destroy()
server.close()
}
done()
})

return connected // if connected return true to stop polling
}
const timeoutFunc = () => {
if (!connected) {
client.end()
client.on('error', (e) => {
client.destroy()
server.close()
}

expect(connected).toBeTruthy()
}
done(new FullstackTestingError(`could not connect to local port '${localPort}': ${e.message}`, e))
})

helpers.poll(pollFunc, timeoutFunc, 100, 1000)
client.connect(localPort)
})
})
})

0 comments on commit 02fdaa8

Please sign in to comment.