Skip to content

Commit

Permalink
feat: implement wait for pod and add check for copy methods
Browse files Browse the repository at this point in the history
Signed-off-by: Lenin Mehedy <lenin.mehedy@swirldslabs.com>
  • Loading branch information
leninmehedy committed Jan 18, 2024
1 parent 02fdaa8 commit 82a80f4
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 116 deletions.
2 changes: 2 additions & 0 deletions fullstack-network-manager/src/core/constants.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ export const DEFAULT_CHART_REPO = new Map()
export const OPERATOR_ID = process.env.FST_OPERATOR_ID || '0.0.2'
export const OPERATOR_KEY = process.env.FST_OPERATOR_KEY || '302e020100300506032b65700422042091132178e72057a1d7528025956fe39b0b847f200ab59b2fdd367017f3087137'

export const POD_STATUS_RUNNING = 'Running'

// Listr related
export const LISTR_DEFAULT_RENDERER_TIMER_OPTION = {
...PRESET_TIMER,
Expand Down
31 changes: 1 addition & 30 deletions fullstack-network-manager/src/core/helpers.mjs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { MissingArgumentError } from './errors.mjs'
import { FullstackTestingError, MissingArgumentError } from './errors.mjs'

export function sleep (ms) {
return new Promise((resolve) => {
Expand All @@ -9,32 +9,3 @@ export function sleep (ms) {
export function cloneArray (arr) {
return JSON.parse(JSON.stringify(arr))
}

/**
* Utility function to poll
* @param pollFunc a function that should return true if polling should be stopped
* @param timeoutFunc an optional function that would be invoked after the timeout
* @param delay polling delay in milliseconds
* @param timeout timeout in milliseconds
*/
export function poll (pollFunc, timeoutFunc = null, delay = 100, timeout = 5000) {
if (!pollFunc) throw new MissingArgumentError('polling function is required')
if (delay <= 0) throw new MissingArgumentError('polling delay cannot be negative or zero')
if (timeout <= 0) throw new MissingArgumentError('timeout cannot be negative or zero')

// poll
const timerId = setInterval(() => {
if (pollFunc()) {
clearInterval(timerId) // stop polling
}
}, delay)

// timeout polling
setTimeout(() => {
if (timeoutFunc) {
timeoutFunc()
}

clearInterval(timerId)
}, timeout)
}
166 changes: 86 additions & 80 deletions fullstack-network-manager/src/core/kubectl2.mjs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import * as k8s from '@kubernetes/client-node'
import fs from 'fs'
import net from 'net'
import path from 'path'
import { flags } from '../commands/index.mjs'
import { FullstackTestingError, MissingArgumentError } from './errors.mjs'
import * as sb from 'stream-buffers'
import { sleep } from './helpers.mjs'
import * as helpers from './helpers.mjs'

/**
* A kubectl wrapper class providing custom functionalities required by fsnetman
Expand Down Expand Up @@ -121,9 +124,7 @@ export class Kubectl2 {
* @return {Promise<{}>} k8s.V1Pod object
*/
async getPodByName (name) {
const ns = this.configManager.flagValue(flags.namespace)
if (!ns) throw new MissingArgumentError('namespace is not set')

const ns = this._getNamespace()
const fieldSelector = `metadata.name=${name}`
const resp = await this.kubeClient.listNamespacedPod(
ns,
Expand Down Expand Up @@ -156,9 +157,7 @@ export class Kubectl2 {
* @return {Promise<{}>} k8s.V1Service object
*/
async getSvcByName (name) {
const ns = this.configManager.flagValue(flags.namespace)
if (!ns) throw new MissingArgumentError('namespace is not set')

const ns = this._getNamespace()
const fieldSelector = `metadata.name=${name}`
const resp = await this.kubeClient.listNamespacedService(
ns,
Expand Down Expand Up @@ -316,19 +315,29 @@ export class Kubectl2 {
* @param containerName container name
* @param srcPath source file path in the local
* @param destDir destination directory in the container
* @returns {Promise<boolean>}
* @returns {Promise<>}
*/
async copyTo (podName, containerName, srcPath, destDir) {
const srcFile = path.basename(srcPath)
const srcDir = path.dirname(srcPath)
const ns = this.configManager.flagValue(flags.namespace)
if (!ns) throw new MissingArgumentError('namespace is not set')
const ns = this._getNamespace()

try {
const srcFile = path.basename(srcPath)
const srcDir = path.dirname(srcPath)
const destPath = `${destDir}/${srcFile}`

await this.kubeCopy.cpToPod(ns, podName, containerName, srcFile, destDir, srcDir)
return true

// check if the file is copied successfully or not
for (let attempt = 0; attempt < 10; attempt++) {
if (await this.hasFile(podName, containerName, destPath)) {
return true
}
await sleep(200)
}

throw new FullstackTestingError(`failed to find file after invoking copy: ${destPath}`)
} catch (e) {
throw new FullstackTestingError(`failed to copy file to container [pod: ${podName} container:${containerName}]: ${srcPath} -> ${destDir}: ${e.message}`, e)
throw new FullstackTestingError(`failed to copy file to ${podName}:${containerName} [${srcPath} -> ${destDir}]: ${e.message}`, e)
}
}

Expand All @@ -344,17 +353,27 @@ export class Kubectl2 {
* @returns {Promise<boolean>}
*/
async copyFrom (podName, containerName, srcPath, destDir) {
const ns = this.configManager.flagValue(flags.namespace)
if (!ns) throw new MissingArgumentError('namespace is not set')

const srcFile = path.basename(srcPath)
const srcDir = path.dirname(srcPath)
const ns = this._getNamespace()

try {
const srcFile = path.basename(srcPath)
const srcDir = path.dirname(srcPath)
const destPath = `${destDir}/${srcFile}`
if (!fs.existsSync(destDir)) throw new Error(`invalid destination dir: ${destDir}`)

await this.kubeCopy.cpFromPod(ns, podName, containerName, srcFile, destDir, srcDir)
return true

// check if the file is copied successfully or not
for (let attempt = 0; attempt < 10; attempt++) {
if (fs.existsSync(destPath)) {
return true
}
await sleep(200)
}

throw new FullstackTestingError(`failed to find file after invoking copy: ${destPath}`)
} catch (e) {
throw new FullstackTestingError(`failed to copy file from container [pod: ${podName} container:${containerName}]: ${srcPath} -> ${destDir}: ${e.message}`, e)
throw new FullstackTestingError(`failed to copy file from ${podName}:${containerName} [${srcPath} -> ${destDir}]: ${e.message}`, e)
}
}

Expand All @@ -367,11 +386,10 @@ export class Kubectl2 {
* @param timeoutMs timout in milliseconds
* @returns {Promise<string>} console output as string
*/
async getExecOutput (podName, containerName, command = [], timeoutMs = 1000) {
const ns = this.configManager.flagValue(flags.namespace)
if (!ns) throw new MissingArgumentError('namespace is not set')
if (!command) return ''
async getExecOutput (podName, containerName, command, timeoutMs = 1000) {
const ns = this._getNamespace()
if (timeoutMs < 0 || timeoutMs === 0) throw MissingArgumentError('timeout cannot be negative or zero')
if (!command || !Array.isArray(command)) throw MissingArgumentError('command cannot be empty')

return new Promise((resolve, reject) => {
const execInstance = new k8s.Exec(this.kubeConfig)
Expand Down Expand Up @@ -410,9 +428,7 @@ export class Kubectl2 {
* @param podPort port of the pod
*/
async portForward (podName, localPort, podPort) {
const ns = this.configManager.flagValue(flags.namespace)
if (!ns) throw new MissingArgumentError('namespace is not set')

const ns = this._getNamespace()
const forwarder = new k8s.PortForward(this.kubeConfig, true)
const server = net.createServer((socket) => {
forwarder.portForward(ns, podName, [podPort], socket, null, socket)
Expand All @@ -422,58 +438,48 @@ export class Kubectl2 {
}

/**
* Invoke `kubectl wait` command
* @param resource a kubernetes resource type (e.g. podName | svc etc.)
* @param args args of the command
* @returns {Promise<Array>} console output as an array of strings
* Wait for pod
* @param status phase of the pod
* @param labels pod labels
* @param timeoutSeconds timeout in seconds
* @return {Promise<boolean>}
*/
// async waitForPod(namespace, phase = 'Running', labels = [], timeoutSeconds = 0.3) {
// // await this.kubectl.wait('podName',
// // '--for=jsonpath=\'{.status.phase}\'=Running',
// // '-l fullstack.hedera.com/type=network-node',
// // `-l fullstack.hedera.com/node-name=${nodeId}`,
// // `--timeout=${timeout}`,
// // `-n "${namespace}"`
// // )
// const self = this
// const delay = 100
// let status = false
//
// const fieldSelector = `status.phase=${phase}`
// const labelSelector = labels.join(',')
//
// const podNames = await self.kubeClient.listPodForAllNamespaces(
// false,
// false,
// fieldSelector,
// labelSelector,
// )
//
// const check = function () {
// console.log(new Date())
// const podNames = self.kubeClient.listPodForAllNamespaces(
// false,
// false,
// fieldSelector,
// labelSelector,
// )
// return false
// }
//
// let timerId = setTimeout( () => {
// status = check()
// if (status) {
// clearTimeout(timerId)
// } else {
// timerId = setTimeout(check, delay)
// }
// }, timeout)
//
// if (!status) {
// throw new FullstackTestingError(`timeout occurred during waiting for podName`)
// }
//
// clearTimeout(timerId)
// return true
// }
async waitForPod (status = 'Running', labels = [], timeoutSeconds = 1) {
const ns = this._getNamespace()
const fieldSelector = `status.phase=${status}`
const labelSelector = labels.join(',')

const delay = 200
const maxAttempts = Math.round(timeoutSeconds * 1000 / delay)
if (maxAttempts <= 0) {
throw new FullstackTestingError(`invalid timeoutSeconds '${timeoutSeconds}'. maxAttempts calculated to be negative or zero`)
}

// wait for the pod to be available with the given status and labels
for (let attempts = 0; attempts < maxAttempts; attempts++) {
const resp = await this.kubeClient.listNamespacedPod(
ns,
false,
false,
undefined,
fieldSelector,
labelSelector
)

const found = resp.body && resp.body.items && resp.body.items.length
if (found) {
return true
}

await sleep(delay)
}

throw new FullstackTestingError('pod not found')
}

_getNamespace () {
const ns = this.configManager.flagValue(flags.namespace)
if (!ns) throw new MissingArgumentError('namespace is not set')
return ns
}
}
22 changes: 16 additions & 6 deletions fullstack-network-manager/test/e2e/core/kubectl_e2e.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ describe('Kubectl', () => {

it('should be able to check if a path is directory inside a container', async () => {
const podName = Templates.renderNetworkPodName('node0')
await expect(kubectl.hasDir(podName, constants.ROOT_CONTAINER, constants.HEDERA_HAPI_PATH)).resolves.toBeTruthy()
await expect(kubectl.hasDir(podName, constants.ROOT_CONTAINER, constants.HEDERA_USER_HOME_DIR)).resolves.toBeTruthy()
})

it('should be able to copy a file to and from a container', async () => {
Expand All @@ -62,18 +62,18 @@ describe('Kubectl', () => {
const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'kubectl-'))
const tmpDir2 = fs.mkdtempSync(path.join(os.tmpdir(), 'kubectl-'))
const tmpFile = path.join(tmpDir, testFileName)
const destDir = constants.HEDERA_HAPI_PATH
const destDir = constants.HEDERA_USER_HOME_DIR
const destPath = `${destDir}/${testFileName}`
fs.writeFileSync(tmpFile, 'TEST')

await expect(kubectl.copyTo(podName, containerName, tmpFile, destDir)).resolves.toBeTruthy()
await expect(kubectl.hasFile(podName, containerName, destPath)).resolves.toBeTruthy()
fs.rmdirSync(tmpDir, { recursive: true })

await expect(kubectl.copyFrom(podName, containerName, destPath, tmpDir2)).resolves.toBeTruthy()
expect(fs.existsSync(`${tmpDir2}/${testFileName}`))

fs.rmdirSync(tmpDir, { recursive: true })
fs.rmdirSync(tmpDir2, { recursive: true })

// rm file inside the container
await expect(kubectl.getExecOutput(podName, containerName, ['rm', '-f', destPath])).resolves
}, 10000)

it('should be able to port forward gossip port', (done) => {
Expand All @@ -99,4 +99,14 @@ describe('Kubectl', () => {
client.connect(localPort)
})
})

it('should be able to run watch for pod', async () => {
const nodeId = 'node0'
const labels = [
'fullstack.hedera.com/type=network-node',
`fullstack.hedera.com/node-name=${nodeId}`
]

await expect(kubectl.waitForPod(constants.POD_STATUS_RUNNING, labels)).resolves.toBeTruthy()
})
})

0 comments on commit 82a80f4

Please sign in to comment.