Skip to content

Commit

Permalink
feat: implement port forward and polling utility function
Browse files Browse the repository at this point in the history
Signed-off-by: Lenin Mehedy <lenin.mehedy@swirldslabs.com>
  • Loading branch information
leninmehedy committed Jan 16, 2024
1 parent fc26a01 commit 2eddad2
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 15 deletions.
31 changes: 31 additions & 0 deletions fullstack-network-manager/src/core/helpers.mjs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { MissingArgumentError } from './errors.mjs'

export function sleep (ms) {
return new Promise((resolve) => {
setTimeout(resolve, ms)
Expand All @@ -7,3 +9,32 @@ export function sleep (ms) {
export function cloneArray (arr) {
return JSON.parse(JSON.stringify(arr))
}

/**
* Utility function to poll
* @param pollFunc a function that should return true if polling should be stopped
* @param timeoutFunc an optional function that would be invoked after the timeout
* @param delay polling delay in milliseconds
* @param timeout timeout in milliseconds
*/
export function poll (pollFunc, timeoutFunc = null, delay = 100, timeout = 5000) {
if (!pollFunc) throw new MissingArgumentError('polling function is required')
if (delay <= 0) throw new MissingArgumentError('polling delay cannot be negative or zero')
if (timeout <= 0) throw new MissingArgumentError('timeout cannot be negative or zero')

// poll
const timerId = setInterval(() => {
if (pollFunc()) {
clearInterval(timerId) // stop polling
}
}, delay)

// timeout polling
setTimeout(() => {
if (timeoutFunc) {
timeoutFunc()
}

clearInterval(timerId)
}, timeout)
}
45 changes: 30 additions & 15 deletions fullstack-network-manager/src/core/kubectl2.mjs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import * as k8s from '@kubernetes/client-node'
import net from 'net'
import path from 'path'
import { FullstackTestingError, MissingArgumentError } from './errors.mjs'
import * as sb from 'stream-buffers'
import * as helpers from './helpers.mjs'

/**
* A kubectl wrapper class providing custom functionalities required by fsnetman
Expand Down Expand Up @@ -394,9 +396,10 @@ export class Kubectl2 {
* @param timeoutMs timout in milliseconds
* @returns {Promise<string>} console output as string
*/
async getExecOutput (podName, containerName, command = [], timeoutMs = 5000) {
async getExecOutput (podName, containerName, command = [], timeoutMs = 1000) {
try {
if (!command) return ''
if (timeoutMs < 0 || timeoutMs === 0) throw MissingArgumentError('timeout cannot be negative or zero')

const execInstance = new k8s.Exec(this._kubeConfig)
const outStream = new sb.WritableStreamBuffer()
Expand All @@ -421,38 +424,50 @@ export class Kubectl2 {
)

// we need to poll to get the output contents
const pollingDelay = 100
return new Promise((resolve, reject) => {
let resolved = false
const timerId = setInterval(() => {

const pollFunc = () => {
if (execStatus === 'Success') {
clearInterval(timerId)
resolved = true
resolve(outStream.getContentsAsString())
return true
}
}, pollingDelay)

setTimeout(() => {
return false
}

const timeoutFunc = () => {
if (!resolved) {
clearInterval(timerId)
reject(new FullstackTestingError(`timeout occurred during exec in '${podName}:${containerName}': ${command.join[' ']}`))
}
}, timeoutMs)
}

helpers.poll(pollFunc, timeoutFunc, 100, timeoutMs)
})
} catch (e) {
throw new FullstackTestingError(`error occurred during exec in '${podName}:${containerName}': ${command.join[' ']}`)
}
}

/**
* Invoke `kubectl port-forward svc/<svc name>` command
* @param resource name of the service or podName. Must be of the format podName/<podName name> or svc/<service name>
* @param localPort port of the host machine
* @param remotePort port to be forwarded from the service or podName
* @returns {Promise<Array>} console output as an array of strings
* Port forward a port from a pod to localhost
*
* This simple server just forwards traffic from itself to a service running in kubernetes
* -> localhost:localPort -> port-forward-tunnel -> kubernetes-pod:targetPort
*
* @param podName pod name
* @param localPort local port
* @param podPort port of the pod
* @returns {Promise<net.Server>} return the instance of the server that caller must close
*/
async portForward (resource, localPort, remotePort) {
// return this.run(this.prepareCommand(`port-forward ${resource} ${localPort}:${remotePort} &`))
async portForward (podName, localPort, podPort) {
const forwarder = new k8s.PortForward(this._kubeConfig, true)
const server = net.createServer((socket) => {
forwarder.portForward(this.getCurrentNamespace(), podName, [podPort], socket, null, socket)
})

return server.listen(localPort, '127.0.0.1')
}

/**
Expand Down
35 changes: 35 additions & 0 deletions fullstack-network-manager/test/e2e/core/kubectl_e2e.test.mjs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { describe, expect, it } from '@jest/globals'
import fs from 'fs'
import net from 'net'
import os from 'os'
import path from 'path'
import { v4 as uuid4 } from 'uuid'
import { FullstackTestingError } from '../../../src/core/errors.mjs'
import * as helpers from '../../../src/core/helpers.mjs'
import { constants, Templates } from '../../../src/core/index.mjs'
import { Kubectl2 } from '../../../src/core/kubectl2.mjs'

Expand Down Expand Up @@ -73,4 +75,37 @@ describe('Kubectl', () => {
fs.rmdirSync(tmpDir, { recursive: true })
fs.rmdirSync(tmpDir2, { recursive: true })
}, 10000)

it('should be able to port forward gossip port', async () => {
const podName = Templates.renderNetworkPodName('node0')
const localPort = constants.HEDERA_NODE_INTERNAL_GOSSIP_PORT
const server = await kubectl.portForward(podName, localPort, constants.HEDERA_NODE_INTERNAL_GOSSIP_PORT)
expect(server).not.toBeNull()

// client
const client = new net.Socket()
let connected = false
client.connect(localPort).on('connection', () => {
connected = true
})

const pollFunc = () => {
if (connected) {
client.end()
server.close()
}

return connected // if connected return true to stop polling
}
const timeoutFunc = () => {
if (!connected) {
client.end()
server.close()
}

expect(connected).toBeTruthy()
}

helpers.poll(pollFunc, timeoutFunc, 100, 1000)
})
})

0 comments on commit 2eddad2

Please sign in to comment.