Skip to content

Commit

Permalink
feat(server): support deployment rolling update (#1112)
Browse files Browse the repository at this point in the history
* feat: support deployment rolling updates

* feat: formatting code

* feat: add status judgment
  • Loading branch information
skyoct authored May 8, 2023
1 parent d5109ec commit 633b2f3
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 141 deletions.
76 changes: 33 additions & 43 deletions server/src/instance/instance-task.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ export class InstanceTaskService {

constructor(
private readonly instanceService: InstanceService,
private readonly cronService: CronJobService,
) {}
) { }


@Cron(CronExpression.EVERY_SECOND)
async tick() {
Expand Down Expand Up @@ -47,16 +47,10 @@ export class InstanceTaskService {
this.logger.debug(err?.response?.toJSON() || JSON.stringify(err))
})

// Phase `Started` -> `Stopping`
this.handleRestartingStateDown().catch((err) => {
this.logger.error('handleRestartingStateDown error', err)
this.logger.debug(err?.response?.toJSON() || JSON.stringify(err))
})

// Phase `Stopped` -> `Starting`
this.handleRestartingStateUp().catch((err) => {
this.logger.error('handleRestartingStateUp error', err)
this.logger.debug(err?.response?.toJSON() || JSON.stringify(err))
// Phase `Started` -> `Starting`
this.handleRestartingState().catch((err) => {
this.logger.error('handleRestartingPhase error', err)
err?.response && this.logger.debug(err?.response?.data || err?.response)
})
}

Expand Down Expand Up @@ -128,6 +122,12 @@ export class InstanceTaskService {

const appid = app.appid
const instance = await this.instanceService.get(app)
const unavailable = instance.deployment?.status?.unavailableReplicas || false
if (unavailable) {
await this.relock(appid, waitingTime)
return
}

const available = isConditionTrue(
'Available',
instance.deployment?.status?.conditions || [],
Expand Down Expand Up @@ -247,42 +247,31 @@ export class InstanceTaskService {
this.logger.log(`Application ${app.appid} updated to phase Stopped`)
}

/**
* State `Restarting`:
* - move phase `Started` to `Stopping`
*/
async handleRestartingStateDown() {

async handleRestartingState() {


const db = SystemDatabase.db

await db.collection<Application>('Application').updateMany(
{
state: ApplicationState.Restarting,
phase: ApplicationPhase.Started,
lockedAt: { $lt: new Date(Date.now() - 1000 * this.lockTimeout) },
},
{
$set: {
phase: ApplicationPhase.Stopping,
lockedAt: TASK_LOCK_INIT_TIME,
updatedAt: new Date(),
const res = await db
.collection<Application>('Application')
.findOneAndUpdate(
{
state: ApplicationState.Restarting,
phase: ApplicationPhase.Started,
lockedAt: { $lt: new Date(Date.now() - 1000 * this.lockTimeout) },
},
},
)
}
{ $set: { lockedAt: new Date() } },
)

/**
* State `Restarting`:
* - move phase `Stopped` to `Starting`
*/
async handleRestartingStateUp() {
const db = SystemDatabase.db
if (!res.value) return
const app = res.value

await db.collection<Application>('Application').updateMany(
{
state: ApplicationState.Restarting,
phase: ApplicationPhase.Stopped,
lockedAt: { $lt: new Date(Date.now() - 1000 * this.lockTimeout) },
},
await this.instanceService.restart(app.appid)

// update application phase to `Starting`
await db.collection<Application>('Application').updateOne(
{ appid: app.appid, phase: ApplicationPhase.Started },
{
$set: {
phase: ApplicationPhase.Starting,
Expand All @@ -291,6 +280,7 @@ export class InstanceTaskService {
},
},
)
this.logger.log(`Application ${app.appid} updated to phase Starting`)
}

/**
Expand Down
228 changes: 130 additions & 98 deletions server/src/instance/instance.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { V1Deployment } from '@kubernetes/client-node'
import { V1Deployment, V1DeploymentSpec } from '@kubernetes/client-node'
import { Injectable, Logger } from '@nestjs/common'
import { GetApplicationNamespaceByAppId } from '../utils/getter'
import {
Expand Down Expand Up @@ -26,7 +26,7 @@ export class InstanceService {
private readonly storageService: StorageService,
private readonly databaseService: DatabaseService,
private readonly prisma: PrismaService,
) {}
) { }

async create(app: Application) {
const appid = app.appid
Expand Down Expand Up @@ -67,6 +67,127 @@ export class InstanceService {
// add bundle label
labels[LABEL_KEY_BUNDLE] = app.bundle.name

// create deployment
const data = new V1Deployment()
data.metadata = { name: app.appid, labels }
data.spec = await this.makeDeploymentSpec(app, labels)

const appsV1Api = this.clusterService.makeAppsV1Api(app.region)
const res = await appsV1Api.createNamespacedDeployment(namespace, data)

this.logger.log(`create k8s deployment ${res.body?.metadata?.name}`)

return res.body
}

async createService(app: ApplicationWithRegion, labels: any) {
const namespace = GetApplicationNamespaceByAppId(app.appid)
const serviceName = app.appid
const coreV1Api = this.clusterService.makeCoreV1Api(app.region)
const res = await coreV1Api.createNamespacedService(namespace, {
metadata: { name: serviceName, labels },
spec: {
selector: labels,
type: 'ClusterIP',
ports: [{ port: 8000, targetPort: 8000, protocol: 'TCP' }],
},
})
this.logger.log(`create k8s service ${res.body?.metadata?.name}`)
return res.body
}

async remove(app: Application) {
const appid = app.appid
const region = await this.regionService.findByAppId(appid)
const { deployment, service } = await this.get(app)

const namespace = await this.clusterService.getAppNamespace(
region,
app.appid,
)
if (!namespace) return

const appsV1Api = this.clusterService.makeAppsV1Api(region)
const coreV1Api = this.clusterService.makeCoreV1Api(region)

if (deployment) {
await appsV1Api.deleteNamespacedDeployment(appid, namespace.metadata.name)
}
if (service) {
const name = appid
await coreV1Api.deleteNamespacedService(name, namespace.metadata.name)
}
this.logger.log(`remove k8s deployment ${deployment?.metadata?.name}`)
}

async get(app: Application) {
const region = await this.regionService.findByAppId(app.appid)
const namespace = await this.clusterService.getAppNamespace(
region,
app.appid,
)
if (!namespace) {
return { deployment: null, service: null }
}

const appWithRegion = { ...app, region }
const deployment = await this.getDeployment(appWithRegion)
const service = await this.getService(appWithRegion)
return { deployment, service }
}

async getDeployment(app: ApplicationWithRegion) {
const appid = app.appid
const appsV1Api = this.clusterService.makeAppsV1Api(app.region)
try {
const namespace = GetApplicationNamespaceByAppId(appid)
const res = await appsV1Api.readNamespacedDeployment(appid, namespace)
return res.body
} catch (error) {
if (error?.response?.body?.reason === 'NotFound') return null
throw error
}
}

async getService(app: ApplicationWithRegion) {
const appid = app.appid
const coreV1Api = this.clusterService.makeCoreV1Api(app.region)

try {
const serviceName = appid
const namespace = GetApplicationNamespaceByAppId(appid)
const res = await coreV1Api.readNamespacedService(serviceName, namespace)
return res.body
} catch (error) {
if (error?.response?.body?.reason === 'NotFound') return null
throw error
}
}

// 修改整个spec
async restart(appid: string) {
const app = await this.prisma.application.findUnique({
where: { appid },
include: {
configuration: true,
bundle: true,
runtime: true,
region: true,
},
})
const { deployment } = await this.get(app)
deployment.spec = await this.makeDeploymentSpec(app, deployment.spec.template.metadata.labels)
const region = await this.regionService.findByAppId(app.appid)
const appsV1Api = this.clusterService.makeAppsV1Api(region)
const namespace = GetApplicationNamespaceByAppId(app.appid)
const res = await appsV1Api.replaceNamespacedDeployment(app.appid, namespace, deployment)

this.logger.log(`restart k8s deployment ${res.body?.metadata?.name}`)

}

async makeDeploymentSpec(app: any, labels: any): Promise<V1DeploymentSpec> {

// prepare params
const limitMemory = app.bundle.resource.limitMemory
const limitCpu = app.bundle.resource.limitCPU
Expand All @@ -78,13 +199,13 @@ export class InstanceService {
const dependencies_string = dependencies.join(' ')

// db connection uri
const database = await this.databaseService.findOne(appid)
const database = await this.databaseService.findOne(app.appid)
const dbConnectionUri = this.databaseService.getInternalConnectionUri(
app.region,
database,
)

const storage = await this.storageService.findOne(appid)
const storage = await this.storageService.findOne(app.appid)

const env = [
{ name: 'DB_URI', value: dbConnectionUri },
Expand All @@ -105,6 +226,9 @@ export class InstanceService {
value: `--max_old_space_size=${max_old_space_size} --max-http-header-size=${max_http_header_size}`,
},
{ name: 'DEPENDENCIES', value: dependencies_string },
{
name: 'RESTART_AT', value: new Date().getTime().toString(),
}
]

// merge env from app configuration, override if exists
Expand All @@ -118,10 +242,7 @@ export class InstanceService {
}
})

// create deployment
const data = new V1Deployment()
data.metadata = { name: app.appid, labels }
data.spec = {
const spec = {
replicas: 1,
selector: { matchLabels: labels },
template: {
Expand Down Expand Up @@ -234,95 +355,6 @@ export class InstanceService {
}, // end of spec {}
}, // end of template {}
}

const appsV1Api = this.clusterService.makeAppsV1Api(app.region)
const res = await appsV1Api.createNamespacedDeployment(namespace, data)

this.logger.log(`create k8s deployment ${res.body?.metadata?.name}`)

return res.body
}

async createService(app: ApplicationWithRegion, labels: any) {
const namespace = GetApplicationNamespaceByAppId(app.appid)
const serviceName = app.appid
const coreV1Api = this.clusterService.makeCoreV1Api(app.region)
const res = await coreV1Api.createNamespacedService(namespace, {
metadata: { name: serviceName, labels },
spec: {
selector: labels,
type: 'ClusterIP',
ports: [{ port: 8000, targetPort: 8000, protocol: 'TCP' }],
},
})
this.logger.log(`create k8s service ${res.body?.metadata?.name}`)
return res.body
}

async remove(app: Application) {
const appid = app.appid
const region = await this.regionService.findByAppId(appid)
const { deployment, service } = await this.get(app)

const namespace = await this.clusterService.getAppNamespace(
region,
app.appid,
)
if (!namespace) return

const appsV1Api = this.clusterService.makeAppsV1Api(region)
const coreV1Api = this.clusterService.makeCoreV1Api(region)

if (deployment) {
await appsV1Api.deleteNamespacedDeployment(appid, namespace.metadata.name)
}
if (service) {
const name = appid
await coreV1Api.deleteNamespacedService(name, namespace.metadata.name)
}
}

async get(app: Application) {
const region = await this.regionService.findByAppId(app.appid)
const namespace = await this.clusterService.getAppNamespace(
region,
app.appid,
)
if (!namespace) {
return { deployment: null, service: null }
}

const appWithRegion = { ...app, region }
const deployment = await this.getDeployment(appWithRegion)
const service = await this.getService(appWithRegion)
return { deployment, service }
}

async getDeployment(app: ApplicationWithRegion) {
const appid = app.appid
const appsV1Api = this.clusterService.makeAppsV1Api(app.region)
try {
const namespace = GetApplicationNamespaceByAppId(appid)
const res = await appsV1Api.readNamespacedDeployment(appid, namespace)
return res.body
} catch (error) {
if (error?.response?.body?.reason === 'NotFound') return null
throw error
}
}

async getService(app: ApplicationWithRegion) {
const appid = app.appid
const coreV1Api = this.clusterService.makeCoreV1Api(app.region)

try {
const serviceName = appid
const namespace = GetApplicationNamespaceByAppId(appid)
const res = await coreV1Api.readNamespacedService(serviceName, namespace)
return res.body
} catch (error) {
if (error?.response?.body?.reason === 'NotFound') return null
throw error
}
return spec
}
}

0 comments on commit 633b2f3

Please sign in to comment.