Skip to content

Commit

Permalink
feat: use docker events for better reliability.
Browse files Browse the repository at this point in the history
Adds a docker event bus that can be used for improving the dockest experience. It is currently used in the following ways: Wait for container start before trying to resolve the container id (which solves possible timeouts due to first downloading the image) and stop resolving the container id in case the container unexpectedly dies. This results in less flakiness. In the future the readiness check API could be based upon docker events (see erikengervall#85 (comment)).
  • Loading branch information
n1ru4l committed Feb 3, 2020
1 parent 2765b1f commit 4e50164
Show file tree
Hide file tree
Showing 17 changed files with 268 additions and 8 deletions.
3 changes: 2 additions & 1 deletion packages/dockest/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
"chalk": "^3.0.0",
"execa": "^4.0.0",
"is-docker": "^2.0.0",
"js-yaml": "^3.13.1"
"js-yaml": "^3.13.1",
"rxjs": "^6.5.4"
},
"devDependencies": {
"@types/jest": "^24.9.1",
Expand Down
4 changes: 4 additions & 0 deletions packages/dockest/src/@types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { Logger } from './Logger'
import { DockerServiceEventStream } from './run/createDockerServiceEventStream'
import { DockerEventEmitter } from './run/createDockerEventEmitter'

type ContainerId = string
type DefaultHealthcheck = () => Promise<void>
Expand Down Expand Up @@ -34,6 +36,7 @@ export interface Runner {
serviceName: ServiceName
host?: string
isBridgeNetworkMode?: boolean
dockerEventStream$: DockerServiceEventStream
}

export interface RunnersObj {
Expand Down Expand Up @@ -76,6 +79,7 @@ export interface MutablesConfig {
/** Jest has finished executing and has returned a result */
jestRanWithResult: boolean
runners: RunnersObj
dockerEventEmitter: DockerEventEmitter
}

export interface DockestOpts {
Expand Down
14 changes: 12 additions & 2 deletions packages/dockest/src/run/bootstrap/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { setupExitHandler } from './setupExitHandler'
import { transformDockestServicesToRunners } from './transformDockestServicesToRunners'
import { writeComposeFile } from './writeComposeFile'
import { DockestConfig, DockestService } from '../../@types'
import { createDockerEventEmitter } from '../createDockerEventEmitter'

export const bootstrap = async ({
composeFile,
Expand All @@ -27,9 +28,18 @@ export const bootstrap = async ({

const { mergedComposeFiles } = await mergeComposeFiles({ composeFile })
const { dockerComposeFile } = getParsedComposeFile(mergedComposeFiles)
writeComposeFile(mergedComposeFiles, dockerComposeFile)
const composeFilePath = writeComposeFile(mergedComposeFiles, dockerComposeFile)

mutables.runners = transformDockestServicesToRunners({ dockerComposeFile, dockestServices, isInsideDockerContainer })
const dockerEventEmitter = createDockerEventEmitter(composeFilePath)

mutables.runners = transformDockestServicesToRunners({
dockerComposeFile,
dockestServices,
isInsideDockerContainer,
dockerEventEmitter,
})

mutables.dockerEventEmitter = dockerEventEmitter

configureLogger(mutables.runners)
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { EventEmitter } from 'events'
import { transformDockestServicesToRunners } from './transformDockestServicesToRunners'
import { DockestService, DockerComposeFile } from '../../@types'
import { getOpts } from '../../utils/getOpts'
Expand All @@ -20,6 +21,7 @@ describe('transformDockestServicesToRunners', () => {
dockerComposeFile,
dockestServices,
isInsideDockerContainer,
dockerEventEmitter: new EventEmitter() as any,
})

expect(runners).toMatchInlineSnapshot(`
Expand Down Expand Up @@ -63,6 +65,7 @@ describe('transformDockestServicesToRunners', () => {
dockerComposeFile,
dockestServices,
isInsideDockerContainer,
dockerEventEmitter: new EventEmitter() as any,
}),
).toThrow(
`Unable to find compose service "${invalidServiceName}", make sure that the serviceName corresponds with your Compose File's service`,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
import { ConfigurationError } from '../../Errors'
import { DockestConfig, DockerComposeFile, Runner, RunnersObj, DockestService } from '../../@types'
import { Logger } from '../../Logger'
import { DockerEventEmitter } from '../createDockerEventEmitter'
import { createDockerServiceEventStream } from '../createDockerServiceEventStream'

export const transformDockestServicesToRunners = ({
dockerComposeFile,
dockestServices,
isInsideDockerContainer,
dockerEventEmitter,
}: {
dockerComposeFile: DockerComposeFile
dockestServices: DockestService[]
isInsideDockerContainer: DockestConfig['isInsideDockerContainer']
dockerEventEmitter: DockerEventEmitter
}) => {
const createRunner = (dockestService: DockestService) => {
const { commands = [], dependents = [], healthcheck = () => Promise.resolve(), serviceName } = dockestService
Expand All @@ -29,6 +33,7 @@ export const transformDockestServicesToRunners = ({
healthcheck,
logger: new Logger(serviceName),
serviceName,
dockerEventStream$: createDockerServiceEventStream(serviceName, dockerEventEmitter),
}

if (isInsideDockerContainer) {
Expand Down
3 changes: 2 additions & 1 deletion packages/dockest/src/run/bootstrap/writeComposeFile.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ import fs from 'fs'
import { DockerComposeFile } from '../../@types'
import { GENERATED_COMPOSE_FILE_PATH, DOCKEST_ATTACH_TO_PROCESS } from '../../constants'

export const writeComposeFile = (mergedComposeFiles: string, composeFileAsObject: DockerComposeFile) => {
export const writeComposeFile = (mergedComposeFiles: string, composeFileAsObject: DockerComposeFile): string => {
// set environment variable that can be used with the test-helpers
// jest.runCLI will pass this environment variable into the testcase runners
process.env[DOCKEST_ATTACH_TO_PROCESS] = JSON.stringify(composeFileAsObject)

fs.writeFileSync(GENERATED_COMPOSE_FILE_PATH, mergedComposeFiles)
return GENERATED_COMPOSE_FILE_PATH
}
39 changes: 39 additions & 0 deletions packages/dockest/src/run/createContainerDieCheck.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { interval, Subject, race } from 'rxjs'
import { takeUntil, tap, first, mapTo } from 'rxjs/operators'
import { Runner } from '../@types'

export const createContainerDieCheck = ({ runner }: { runner: Runner }) => {
const { dockerEventStream$ } = runner
const stop$ = new Subject()
const cancel$ = new Subject()

const info$ = interval(1000).pipe(
takeUntil(stop$),
tap(() => {
runner.logger.info('Container is still running...')
}),
)

const containerDies$ = dockerEventStream$.pipe(
takeUntil(stop$),
first(event => event.action === 'die'),
)

return {
service: runner.serviceName,
done: race(containerDies$, info$, cancel$)
.pipe(
tap({
next: () => {
stop$.next()
stop$.complete()
},
}),
mapTo(undefined),
)
.toPromise(),
cancel: () => {
cancel$.complete()
},
}
}
40 changes: 40 additions & 0 deletions packages/dockest/src/run/createContainerStartCheck.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { interval, Subject, race } from 'rxjs'
import { takeUntil, tap, first, mapTo } from 'rxjs/operators'
import { Runner } from '../@types'

export const createContainerStartCheck = ({ runner }: { runner: Runner }) => {
const { dockerEventStream$ } = runner
const stop$ = new Subject()

const cancel$ = new Subject()

const info$ = interval(1000).pipe(
takeUntil(stop$),
tap(() => {
runner.logger.info('Still waiting for start event...')
}),
)

const containerStarts$ = dockerEventStream$.pipe(
takeUntil(stop$),
first(event => event.action === 'start'),
)

return {
service: runner.serviceName,
done: race(containerStarts$, info$, cancel$)
.pipe(
tap({
next: () => {
stop$.next()
stop$.complete()
},
}),
mapTo(undefined),
)
.toPromise(),
cancel: () => {
cancel$.complete()
},
}
}
98 changes: 98 additions & 0 deletions packages/dockest/src/run/createDockerEventEmitter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import { EventEmitter } from 'events'
import execa from 'execa' /* eslint-disable-line import/default */

const parseJsonSafe = (data: string) => {
try {
return JSON.parse(data)
} catch (err) {
return null
}
}

export interface DockerComposeEventInterface<TActionName extends string, TAdditionalAttributes extends {} = {}> {
time: string
type: 'container'
action: TActionName
id: string
service: string
attributes: {
image: string
name: string
} & TAdditionalAttributes
}

export type CreateDockerComposeEvent = DockerComposeEventInterface<'create'>
export type AttachDockerComposeEvent = DockerComposeEventInterface<'attach'>
export type StartDockerComposeEvent = DockerComposeEventInterface<'start'>
export type HealthStatusDockerComposeEvent = DockerComposeEventInterface<
'health_status',
{ healthStatus: 'healthy' | 'unhealthy' }
>
export type KillDockerComposeEvent = DockerComposeEventInterface<'kill'>
export type DieDockerComposeEvent = DockerComposeEventInterface<'die'>

export type DockerEventType =
| CreateDockerComposeEvent
| AttachDockerComposeEvent
| StartDockerComposeEvent
| HealthStatusDockerComposeEvent
| KillDockerComposeEvent
| DieDockerComposeEvent

export type UnknownDockerComposeEvent = DockerComposeEventInterface<string>

export type DockerEventEmitterListener = (event: DockerEventType) => void

export interface DockerEventEmitter {
addListener(serviceName: string, eventListener: DockerEventEmitterListener): void
removeListener(serviceName: string, eventListener: DockerEventEmitterListener): void
destroy(): void
}

export const createDockerEventEmitter = (composeFilePath: string): DockerEventEmitter => {
const command = ` \
docker-compose \
-f ${composeFilePath} \
events \
--json
`
const childProcess = execa(command, { shell: true, reject: false })

if (!childProcess.stdout) {
childProcess.kill()
throw new Error('Event Process has not output stream.')
}

const emitter = new EventEmitter()

// without this line only the first data event is fired (in some undefinable cases)
// eslint-disable-next-line @typescript-eslint/no-empty-function
childProcess.then(() => {})

childProcess.stdout.addListener('data', chunk => {
const lines: string[] = chunk
.toString()
.split(`\n`)
.filter(Boolean)

for (const line of lines) {
const data: UnknownDockerComposeEvent = parseJsonSafe(line)
if (!data) return

// convert health status to friendlier format
if (data.action.startsWith('health_status: ')) {
const healthStatus = data.action
.replace('health_status: ', '')
.trim() as HealthStatusDockerComposeEvent['attributes']['healthStatus']
data.action = 'health_status'
;(data as HealthStatusDockerComposeEvent).attributes.healthStatus = healthStatus
}

emitter.emit(data.service, data)
}
})

return Object.assign(emitter, {
destroy: () => childProcess.cancel(),
})
}
23 changes: 23 additions & 0 deletions packages/dockest/src/run/createDockerServiceEventStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { fromEventPattern, Observable } from 'rxjs'
import { shareReplay } from 'rxjs/operators'
import { DockerEventEmitter, DockerEventType } from './createDockerEventEmitter'

export type DockerServiceEventStream = Observable<DockerEventType>

export const createDockerServiceEventStream = (
serviceName: string,
eventEmitter: DockerEventEmitter,
): DockerServiceEventStream => {
return (
fromEventPattern<DockerEventType>(
handler => {
eventEmitter.addListener(serviceName, handler)
},
handler => {
eventEmitter.removeListener(serviceName, handler)
},
)
// Every new subscriber should receive access to all previous emitted events, because of this we use shareReplay.
.pipe(shareReplay())
)
}
4 changes: 3 additions & 1 deletion packages/dockest/src/run/teardown.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { teardownSingle } from '../utils/teardownSingle'
export const teardown = async ({
hostname,
isInsideDockerContainer,
mutables: { runners },
mutables: { runners, dockerEventEmitter },
perfStart,
}: {
hostname: DockestConfig['hostname']
Expand All @@ -24,5 +24,7 @@ export const teardown = async ({
await removeBridgeNetwork()
}

dockerEventEmitter.destroy()

Logger.measurePerformance(perfStart)
}
5 changes: 3 additions & 2 deletions packages/dockest/src/run/waitForServices/index.spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { EventEmitter } from 'events'
import { waitForServices } from './index'
import { checkConnection } from './checkConnection'
import { runHealthcheck } from './runHealthcheck'
Expand Down Expand Up @@ -40,7 +41,7 @@ describe('waitForServices', () => {
composeOpts,
hostname,
isInsideDockerContainer,
mutables: { runners, jestRanWithResult: false },
mutables: { runners, jestRanWithResult: false, dockerEventEmitter: new EventEmitter() as any },
runInBand,
})

Expand Down Expand Up @@ -88,7 +89,7 @@ describe('waitForServices', () => {
composeOpts,
hostname,
isInsideDockerContainer,
mutables: { runners, jestRanWithResult: false },
mutables: { runners, jestRanWithResult: false, dockerEventEmitter: new EventEmitter() as any },
runInBand,
})

Expand Down
6 changes: 6 additions & 0 deletions packages/dockest/src/run/waitForServices/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { DOCKEST_HOST_ADDRESS } from '../../constants'
import { DockestConfig, Runner } from '../../@types'
import { joinBridgeNetwork } from '../../utils/network/joinBridgeNetwork'
import { bridgeNetworkExists } from '../../utils/network/bridgeNetworkExists'
import { createContainerStartCheck } from '../createContainerStartCheck'

const LOG_PREFIX = '[Setup]'

Expand Down Expand Up @@ -36,6 +37,11 @@ export const waitForServices = async ({
runner.logger.debug(`${LOG_PREFIX} Initiating...`)

await dockerComposeUp({ composeOpts, serviceName })

// wait until container has started before trying to resolve the container id.
const containerStartCheck = createContainerStartCheck({ runner })
await containerStartCheck.done

await resolveContainerId({ runner })

if (isBridgeNetworkMode) {
Expand Down
Loading

0 comments on commit 4e50164

Please sign in to comment.