Skip to content

Commit

Permalink
fix: do not poll after copyToPod or copyFromPod
Browse files Browse the repository at this point in the history
Signed-off-by: Lenin Mehedy <lenin.mehedy@swirldslabs.com>
  • Loading branch information
leninmehedy committed Jan 24, 2024
1 parent 5a9b96c commit af5e7a4
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 57 deletions.
20 changes: 12 additions & 8 deletions fullstack-network-manager/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions fullstack-network-manager/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"got": "^13.0.0",
"inquirer": "^9.2.11",
"listr2": "^7.0.2",
"tar": "^6.1.11",
"uuid": "^9.0.1",
"winston": "^3.11.0",
"yaml": "^2.3.4",
Expand Down
157 changes: 119 additions & 38 deletions fullstack-network-manager/src/core/kubectl2.mjs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import * as k8s from '@kubernetes/client-node'
import fs from 'fs'
import net from 'net'
import os from 'os'
import path from 'path'
import { flags } from '../commands/index.mjs'
import { FullstackTestingError, MissingArgumentError } from './errors.mjs'
import * as sb from 'stream-buffers'
import { sleep } from './helpers.mjs'
import * as tar from 'tar'
import { v4 as uuid4 } from 'uuid'

/**
* A kubectl wrapper class providing custom functionalities required by fsnetman
Expand Down Expand Up @@ -287,7 +289,7 @@ export class Kubectl2 {

return items
} catch (e) {
throw new FullstackTestingError(`error occurred during listDir operation for path: ${destPath}: ${e.message}`, e)
throw new FullstackTestingError(`unable to check path in '${podName}':${containerName}' - ${destPath}: ${e.message}`, e)
}
}

Expand Down Expand Up @@ -358,31 +360,62 @@ export class Kubectl2 {
* @param containerName container name
* @param srcPath source file path in the local
* @param destDir destination directory in the container
* @param maxAttempts max attempts to check if file is copied successfully or not
* @param delay delay between attempts to check if file is copied successfully or not
* @returns {Promise<>}
*/
async copyTo (podName, containerName, srcPath, destDir, maxAttempts = 100, delay = 500) {
const ns = this._getNamespace()
async copyTo (podName, containerName, srcPath, destDir) {
const namespace = this._getNamespace()

if (!await this.hasDir(podName, containerName, destDir)) {
throw new FullstackTestingError(`invalid destination path: ${destDir}`)
}

if (!fs.existsSync(srcPath)) {
throw new FullstackTestingError(`invalid source path: ${srcPath}`)
}

try {
const srcFile = path.basename(srcPath)
const srcDir = path.dirname(srcPath)
const destPath = `${destDir}/${srcFile}`

await this.kubeCopy.cpToPod(ns, podName, containerName, srcFile, destDir, srcDir)
// zip the source file
const tmpFile = this._tempFileFor(srcFile)
await tar.c({ file: tmpFile, cwd: srcDir }, [srcFile])

// check if the file is copied successfully or not
const fileStat = fs.statSync(srcPath)
for (let attempt = 0; attempt < maxAttempts; attempt++) {
if (await this.hasFile(podName, containerName, destPath, { size: fileStat.size })) {
return true
}
await sleep(delay)
}
const self = this
return new Promise((resolve, reject) => {
const execInstance = new k8s.Exec(this.kubeConfig)
const command = ['tar', 'xf', '-', '-C', destDir]
const readStream = fs.createReadStream(tmpFile)
const errStream = new sb.WritableStreamBuffer()
const fileStat = fs.statSync(srcPath)
let statusError = null

this.logger.debug(`File check failed after copy ${podName}:${containerName} [${srcPath} -> ${destDir}]`)
throw new FullstackTestingError(`failed to find file after invoking copy: ${destPath}`)
execInstance.exec(namespace, podName, containerName, command, null, errStream, readStream, false, async ({ status }) => {
if (status === 'Failure' || errStream.size()) {
statusError = new Error(`Error from copyToPod - details: \n ${errStream.getContentsAsString()}`)
}
}).then(conn => {
conn.on('close', async () => {
self._deleteTempFile(tmpFile)

if (statusError) {
return reject(new FullstackTestingError(`failed to copy because of error: ${statusError.message}`, statusError))
}

if (!await self.hasFile(podName, containerName, destPath, { size: fileStat.size })) {
return reject(new FullstackTestingError(`failed to find file after copy: ${destPath}`))
}

return resolve(true)
})

conn.on('error', (e) => {
self._deleteTempFile(tmpFile)
return reject(new FullstackTestingError(`failed to copy file ${destPath} because of connection error: ${e.message}`, e))
})
})
})
} catch (e) {
throw new FullstackTestingError(`failed to copy file to ${podName}:${containerName} [${srcPath} -> ${destDir}]: ${e.message}`, e)
}
Expand All @@ -397,39 +430,76 @@ export class Kubectl2 {
* @param containerName container name
* @param srcPath source file path in the container
* @param destDir destination directory in the local
* @param maxAttempts max attempts to check if file is copied successfully or not
* @param delay delay between attempts to check if file is copied successfully or not
* @returns {Promise<boolean>}
*/
async copyFrom (podName, containerName, srcPath, destDir, maxAttempts = 100, delay = 500) {
const ns = this._getNamespace()
async copyFrom (podName, containerName, srcPath, destDir) {
const namespace = this._getNamespace()

// get stat for source file in the container
const entries = await this.listDir(podName, containerName, srcPath)
if (entries.length !== 1) {
throw new FullstackTestingError(`invalid source path: ${srcPath}`)
}
const srcFileDesc = entries[0] // cache for later comparison after copy

if (!fs.existsSync(destDir)) {
throw new FullstackTestingError(`invalid destination path: ${destDir}`)
}

try {
const srcFileSize = Number.parseInt(srcFileDesc.size)

const srcFile = path.basename(srcPath)
const srcDir = path.dirname(srcPath)
const destPath = `${destDir}/${srcFile}`
if (!fs.existsSync(destDir)) throw new Error(`invalid destination dir: ${destDir}`)

await this.kubeCopy.cpFromPod(ns, podName, containerName, srcFile, destDir, srcDir)
// download the tar file to a temp location
const tmpFile = this._tempFileFor(srcFile)

// check if the file is copied successfully or not
const entries = await this.listDir(podName, containerName, srcPath)
if (entries.length !== 1) throw new FullstackTestingError(`exepected 1 entry, found ${entries.length}`, { entries })
const srcFileDesc = entries[0]
const self = this
return new Promise((resolve, reject) => {
const execInstance = new k8s.Exec(this.kubeConfig)
const command = ['tar', 'zcf', '-', '-C', srcDir, srcFile]
const writerStream = fs.createWriteStream(tmpFile)
const errStream = new sb.WritableStreamBuffer()
let statusError = null

for (let attempt = 0; attempt < maxAttempts; attempt++) {
if (fs.existsSync(destPath)) {
const stat = fs.statSync(destPath)
if (stat && `${stat.size}` === `${srcFileDesc.size}`) {
return true
execInstance.exec(namespace, podName, containerName, command, writerStream, errStream, null, false, async ({ status }) => {
writerStream.close()
if (status === 'Failure' || errStream.size()) {
statusError = new Error(`Error from copyFromPod - details: \n ${errStream.getContentsAsString()}`)
self._deleteTempFile(tmpFile)
}
}
await sleep(delay)
}
}).then(conn => {
conn.on('close', async () => {
if (statusError) {
return reject(new FullstackTestingError(`failed to copy because of error: ${statusError.message}`, statusError))
}

// extract the downloaded file
await tar.x({
file: tmpFile,
cwd: destDir
})

self._deleteTempFile(tmpFile)

throw new FullstackTestingError(`failed to find file after invoking copy: ${destPath}`)
const stat = fs.statSync(destPath)
if (stat && stat.size === srcFileSize) {
return resolve(true)
}

return reject(new FullstackTestingError(`failed to download file completely: ${destPath}`))
})

conn.on('error', (e) => {
self._deleteTempFile(tmpFile)
return reject(new FullstackTestingError(`failed to copy file ${destPath} because of connection error: ${e.message}`, e))
})
})
})
} catch (e) {
throw new FullstackTestingError(`failed to copy file from ${podName}:${containerName} [${srcPath} -> ${destDir}]: ${e.message}`, e)
throw new FullstackTestingError(`failed to download file from ${podName}:${containerName} [${srcPath} -> ${destDir}]: ${e.message}`, e)
}
}

Expand Down Expand Up @@ -520,7 +590,7 @@ export class Kubectl2 {
this.logger.debug(`WaitForPod [${fieldSelector}, ${labelSelector}], maxAttempts: ${maxAttempts}`)

return new Promise((resolve, reject) => {
let attempts = 0
const attempts = 0

const check = async () => {
this.logger.debug(`Checking for pod ${fieldSelector}, ${labelSelector} [attempt: ${attempts}/${maxAttempts}]`)
Expand Down Expand Up @@ -557,4 +627,15 @@ export class Kubectl2 {
if (!ns) throw new MissingArgumentError('namespace is not set')
return ns
}

_tempFileFor (fileName) {
const tmpFile = `${fileName}-${uuid4()}`
return path.join(os.tmpdir(), tmpFile)
}

_deleteTempFile (tmpFile) {
if (fs.existsSync(tmpFile)) {
fs.rmSync(tmpFile)
}
}
}
30 changes: 19 additions & 11 deletions fullstack-network-manager/test/e2e/core/kubectl_e2e.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ import os from 'os'
import path from 'path'
import { v4 as uuid4 } from 'uuid'
import { FullstackTestingError } from '../../../src/core/errors.mjs'
import { ConfigManager, constants, logging, Templates } from '../../../src/core/index.mjs'
import { ConfigManager, constants, logging, PackageDownloader, Templates } from '../../../src/core/index.mjs'
import { Kubectl2 } from '../../../src/core/kubectl2.mjs'

describe('Kubectl', () => {
const testLogger = logging.NewLogger('debug')
const configManager = new ConfigManager(testLogger)
const kubectl = new Kubectl2(configManager, testLogger)
const downloader = new PackageDownloader(testLogger)

it('should be able to list clusters', async () => {
const clusters = await kubectl.getClusters()
Expand Down Expand Up @@ -57,23 +58,30 @@ describe('Kubectl', () => {
it('should be able to copy a file to and from a container', async () => {
const podName = Templates.renderNetworkPodName('node0')
const containerName = constants.ROOT_CONTAINER
const testFileName = 'test.txt'

// attempt fetch platform jar as we need to check if a big zip file can be uploaded/downloaded
const testCacheDir = 'test/data/tmp'
const tag = 'v0.42.5'
const releasePrefix = Templates.prepareReleasePrefix(tag)
const pkgPath = `${testCacheDir}/${releasePrefix}/build-${tag}.zip`
await expect(downloader.fetchPlatform(tag, testCacheDir)).resolves.toBe(pkgPath)
expect(fs.existsSync(pkgPath)).toBeTruthy()

const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'kubectl-'))
const tmpDir2 = fs.mkdtempSync(path.join(os.tmpdir(), 'kubectl-'))
const tmpFile = path.join(tmpDir, testFileName)
const destDir = constants.HEDERA_USER_HOME_DIR
const destPath = `${destDir}/${testFileName}`
fs.writeFileSync(tmpFile, 'TEST')
const destPath = `${destDir}/build-${tag}.zip`

await expect(kubectl.copyTo(podName, containerName, tmpFile, destDir)).resolves.toBeTruthy()
fs.rmdirSync(tmpDir, { recursive: true })
// upload the file
await expect(kubectl.copyTo(podName, containerName, pkgPath, destDir)).resolves.toBeTruthy()

await expect(kubectl.copyFrom(podName, containerName, destPath, tmpDir2)).resolves.toBeTruthy()
fs.rmdirSync(tmpDir2, { recursive: true })
// download the same file
await expect(kubectl.copyFrom(podName, containerName, destPath, tmpDir)).resolves.toBeTruthy()

// rm file inside the container
await expect(kubectl.execContainer(podName, containerName, ['rm', '-f', destPath])).resolves
}, 10000)

fs.rmdirSync(tmpDir, { recursive: true })
}, 50000)

it('should be able to port forward gossip port', (done) => {
const podName = Templates.renderNetworkPodName('node0')
Expand Down

0 comments on commit af5e7a4

Please sign in to comment.