Skip to content

Commit

Permalink
feat: add docker runtime TTL; better runtime connection & feedback (#135
Browse files Browse the repository at this point in the history
)
  • Loading branch information
lihebi authored Dec 10, 2022
1 parent e4feb41 commit 115cfc9
Show file tree
Hide file tree
Showing 10 changed files with 332 additions and 63 deletions.
33 changes: 25 additions & 8 deletions api/src/resolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,14 @@ import { listAllRuntimes } from "./resolver_runtime";
import {
spawnRuntime as spawnRuntime_docker,
killRuntime as killRuntime_docker,
infoRuntime as infoRuntime_docker,
loopKillInactiveRoutes as loopKillInactiveRoutes_docker,
initRoutes as initRoutes_docker,
} from "./spawner-docker";
import {
spawnRuntime as spawnRuntime_k8s,
killRuntime as killRuntime_k8s,
infoRuntime as infoRuntime_k8s,
} from "./spawner-k8s";

export const resolvers = {
Expand All @@ -44,6 +48,13 @@ export const resolvers = {
pod,
listAllRuntimes,
myCollabRepos,
...(process.env.RUNTIME_SPAWNER === "k8s"
? {
infoRuntime: infoRuntime_k8s,
}
: {
infoRuntime: infoRuntime_docker,
}),
},
Mutation: {
signup,
Expand All @@ -57,13 +68,19 @@ export const resolvers = {
updatePod,
deletePod,
addCollaborator,
spawnRuntime:
process.env.RUNTIME_SPAWNER === "k8s"
? spawnRuntime_k8s
: spawnRuntime_docker,
killRuntime:
process.env.RUNTIME_SPAWNER === "k8s"
? killRuntime_k8s
: killRuntime_docker,
...(process.env.RUNTIME_SPAWNER === "k8s"
? {
spawnRuntime: spawnRuntime_k8s,
killRuntime: killRuntime_k8s,
}
: {
spawnRuntime: spawnRuntime_docker,
killRuntime: killRuntime_docker,
}),
},
};

if (process.env.RUNTIME_SPAWNER !== "k8s") {
initRoutes_docker();
loopKillInactiveRoutes_docker();
}
15 changes: 10 additions & 5 deletions api/src/resolver_runtime.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import { ApolloClient, InMemoryCache, gql } from "@apollo/client/core";
import Prisma from "@prisma/client";

import { loadOrCreateContainer, removeContainer } from "./spawner-docker";

const { PrismaClient } = Prisma;

const prisma = new PrismaClient();
Expand All @@ -29,16 +27,23 @@ export async function listAllRuntimes(_, {}, { userId }) {
// `,
query: gql`
query {
getUrls
getUrls {
url
lastActive
}
}
`,
fetchPolicy: "network-only",
});
let res = urls.data.getUrls
.map((url) => {
.map(({ url, lastActive }) => {
let match_res = url.match(/\/user_(.*)_repo_(.*)/);
if (match_res) {
if (`user_${match_res[1]}` === userId) return `repo_${match_res[2]}`;
if (`user_${match_res[1]}` === userId)
return {
sessionId: `user_${match_res[1]}_repo_${match_res[2]}`,
lastActive,
};
}
return false;
})
Expand Down
146 changes: 139 additions & 7 deletions api/src/spawner-docker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const apollo_client = new ApolloClient({

const delay = (ms) => new Promise((resolve) => setTimeout(resolve, ms));

export async function removeContainer(name) {
async function removeContainer(name) {
return new Promise((resolve, reject) => {
var docker = new Docker();
console.log("remove if already exist");
Expand Down Expand Up @@ -65,12 +65,7 @@ export async function removeContainer(name) {
* @param Env additional optional env for the container
* @returns Boolean for whether a new container is created.
*/
export async function loadOrCreateContainer(
image,
name,
network,
Env: string[] = []
) {
async function loadOrCreateContainer(image, name, network, Env: string[] = []) {
console.log("loading container", name);
let ip = await loadContainer(name, network);
if (ip) return false;
Expand All @@ -81,6 +76,23 @@ export async function loadOrCreateContainer(
return true;
}

async function getContainerInfo(name): Promise<string | null> {
return new Promise((resolve, reject) => {
var docker = new Docker();
let container = docker.getContainer(name);
container.inspect((err, data) => {
if (err) {
console.log("getContainerInfo: container seems not exist.");
return resolve(null);
}
if (data?.State.Running) {
return resolve(data.State.StartedAt);
}
return resolve(null);
});
});
}

async function loadContainer(name, network) {
// if already exists, just return the IP
// else, create and return the IP
Expand Down Expand Up @@ -177,6 +189,24 @@ async function createContainer(image, name, network, Env) {
});
}

async function scanRunningSessions(): Promise<string[]> {
return new Promise((resolve, reject) => {
var docker = new Docker();
docker.listContainers((err, containers) => {
if (err) {
console.log("ERR:", err);
return;
}
let sessionIds = containers
?.filter(
(c) => c.Names[0].startsWith("/cpkernel_") && c.State === "running"
)
.map((c) => c.Names[0].substring("/cpkernel_".length));
return resolve(sessionIds || []);
});
});
}

export async function spawnRuntime(_, { sessionId }) {
// launch the kernel
console.log("Spawning ");
Expand Down Expand Up @@ -260,3 +290,105 @@ export async function killRuntime(_, { sessionId }) {
});
return true;
}

/**
* Get the runtime info.
* @param sessionId the session ID
* @returns {startedAt} the time when the runtime is started.
*/
export async function infoRuntime(_, { sessionId }) {
let zmq_host = `cpkernel_${sessionId}`;
let ws_host = `cpruntime_${sessionId}`;
let startedAt = await getContainerInfo(ws_host);
return {
startedAt: startedAt ? new Date(startedAt).getTime() : null,
};
}

async function killInactiveRoutes() {
let { data } = await apollo_client.query({
query: gql`
query GetUrls {
getUrls {
url
lastActive
}
}
`,
fetchPolicy: "network-only",
});
const now = new Date();
let inactiveRoutes = data.getUrls
.filter(({ url, lastActive }) => {
if (!lastActive) return false;
let d2 = new Date(parseInt(lastActive));
// Prod: 12 hours TTL
let ttl = 1000 * 60 * 60 * 12;
// DEBUG: 1 minute TTL
// let ttl = 1000 * 60 * 3;
let activeTime = now.getTime() - d2.getTime();
return activeTime > ttl;
})
.map(({ url }) => url);
console.log("Inactive routes", inactiveRoutes);
for (let url of inactiveRoutes) {
let sessionId = url.substring(1);
let zmq_host = `cpkernel_${sessionId}`;
let ws_host = `cpruntime_${sessionId}`;
await removeContainer(zmq_host);
await removeContainer(ws_host);
await apollo_client.mutate({
mutation: gql`
mutation deleteRoute($url: String) {
deleteRoute(url: $url)
}
`,
variables: {
url,
},
});
}
}

/**
* Periodically kill inactive routes every minute.
*/
export function loopKillInactiveRoutes() {
setInterval(async () => {
await killInactiveRoutes();
}, 1000 * 60 * 1);
}

/**
* At startup, check all active containers and add them to the table.
*/
export async function initRoutes() {
let sessionIds = await scanRunningSessions();
console.log("initRoutes sessionIds", sessionIds);
for (let id of sessionIds) {
let url = `/${id}`;
let ws_host = `cpruntime_${id}`;
await apollo_client.mutate({
mutation: gql`
mutation addRoute($url: String, $target: String) {
addRoute(url: $url, target: $target)
}
`,
variables: {
url,
// This 4020 is the WS listening port in WS_RUNTIME_IMAGE
target: `${ws_host}:4020`,
},
// refetchQueries: ["getUrls"],
refetchQueries: [
{
query: gql`
query getUrls {
getUrls
}
`,
},
],
});
}
}
13 changes: 13 additions & 0 deletions api/src/spawner-k8s.ts
Original file line number Diff line number Diff line change
Expand Up @@ -285,3 +285,16 @@ export async function killRuntime(_, { sessionId }) {
});
return true;
}

/**
* Get the runtime info.
* @param sessionId the session ID
* @returns {startedAt} the time when the runtime is started.
*/
export async function infoRuntime(_, { sessionId }) {
// TODO implement
throw new Error("Not implemented");
return {
startedAt: null,
};
}
10 changes: 9 additions & 1 deletion api/src/typedefs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ export const typeDefs = gql`
children: [ID]
}
type RuntimeInfo {
startedAt: String
lastActive: String
sessionId: String
ttl: Int
}
type Query {
hello: String
users: [User]
Expand All @@ -85,8 +92,9 @@ export const typeDefs = gql`
pod(id: ID!): Pod
myRepos: [Repo]
activeSessions: [String]
listAllRuntimes: [String]
listAllRuntimes: [RuntimeInfo]
myCollabRepos: [Repo]
infoRuntime(sessionId: String!): RuntimeInfo
}
type Mutation {
Expand Down
23 changes: 21 additions & 2 deletions proxy/src/node-proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,18 @@ async function getRouteTarget(req) {
return null;
}

// In memory store for active routes.
const activeTable: Record<string, Date> = {};

async function startAPIServer() {
const apollo = new ApolloServer({
typeDefs: gql`
type RouteInfo {
url: String
lastActive: String
}
type Query {
getUrls: [String]
getUrls: [RouteInfo]
getRoute(url: String): String
}
Expand All @@ -233,7 +240,11 @@ async function startAPIServer() {
getUrls: async () => {
// return all routes
const res = await _routes.getAll();
return Object.keys(res);
let urls = Object.keys(res);
return urls.map((url) => ({
url: url,
lastActive: activeTable[url],
}));
},
getRoute: async (_, { url }) => {
return await _routes.get(url);
Expand All @@ -253,6 +264,9 @@ async function startAPIServer() {
deleteRoute: async (_, { url }) => {
console.log("Delete route", url);
await _routes.remove(url);
if (activeTable[url]) {
delete activeTable[url];
}
return true;
},
},
Expand Down Expand Up @@ -289,7 +303,12 @@ function startProxyServer() {
});

server.on("upgrade", async (req, socket, head) => {
if (!req.url) {
return;
}
console.log("proxy ws req", req.url);
// FIXME why there're two leading slashes? "//user_xxx_repo_xxx"
activeTable[req.url.substring(1)] = new Date();
let match = await getRouteTarget(req);
if (!match) {
return;
Expand Down
Loading

0 comments on commit 115cfc9

Please sign in to comment.