Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add docker runtime TTL; better runtime connection & feedback #135

Merged
merged 1 commit into from
Dec 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 25 additions & 8 deletions api/src/resolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,14 @@ import { listAllRuntimes } from "./resolver_runtime";
import {
spawnRuntime as spawnRuntime_docker,
killRuntime as killRuntime_docker,
infoRuntime as infoRuntime_docker,
loopKillInactiveRoutes as loopKillInactiveRoutes_docker,
initRoutes as initRoutes_docker,
} from "./spawner-docker";
import {
spawnRuntime as spawnRuntime_k8s,
killRuntime as killRuntime_k8s,
infoRuntime as infoRuntime_k8s,
} from "./spawner-k8s";

export const resolvers = {
Expand All @@ -44,6 +48,13 @@ export const resolvers = {
pod,
listAllRuntimes,
myCollabRepos,
...(process.env.RUNTIME_SPAWNER === "k8s"
? {
infoRuntime: infoRuntime_k8s,
}
: {
infoRuntime: infoRuntime_docker,
}),
},
Mutation: {
signup,
Expand All @@ -57,13 +68,19 @@ export const resolvers = {
updatePod,
deletePod,
addCollaborator,
spawnRuntime:
process.env.RUNTIME_SPAWNER === "k8s"
? spawnRuntime_k8s
: spawnRuntime_docker,
killRuntime:
process.env.RUNTIME_SPAWNER === "k8s"
? killRuntime_k8s
: killRuntime_docker,
...(process.env.RUNTIME_SPAWNER === "k8s"
? {
spawnRuntime: spawnRuntime_k8s,
killRuntime: killRuntime_k8s,
}
: {
spawnRuntime: spawnRuntime_docker,
killRuntime: killRuntime_docker,
}),
},
};

if (process.env.RUNTIME_SPAWNER !== "k8s") {
initRoutes_docker();
loopKillInactiveRoutes_docker();
}
15 changes: 10 additions & 5 deletions api/src/resolver_runtime.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import { ApolloClient, InMemoryCache, gql } from "@apollo/client/core";
import Prisma from "@prisma/client";

import { loadOrCreateContainer, removeContainer } from "./spawner-docker";

const { PrismaClient } = Prisma;

const prisma = new PrismaClient();
Expand All @@ -29,16 +27,23 @@ export async function listAllRuntimes(_, {}, { userId }) {
// `,
query: gql`
query {
getUrls
getUrls {
url
lastActive
}
}
`,
fetchPolicy: "network-only",
});
let res = urls.data.getUrls
.map((url) => {
.map(({ url, lastActive }) => {
let match_res = url.match(/\/user_(.*)_repo_(.*)/);
if (match_res) {
if (`user_${match_res[1]}` === userId) return `repo_${match_res[2]}`;
if (`user_${match_res[1]}` === userId)
return {
sessionId: `user_${match_res[1]}_repo_${match_res[2]}`,
lastActive,
};
}
return false;
})
Expand Down
146 changes: 139 additions & 7 deletions api/src/spawner-docker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const apollo_client = new ApolloClient({

const delay = (ms) => new Promise((resolve) => setTimeout(resolve, ms));

export async function removeContainer(name) {
async function removeContainer(name) {
return new Promise((resolve, reject) => {
var docker = new Docker();
console.log("remove if already exist");
Expand Down Expand Up @@ -65,12 +65,7 @@ export async function removeContainer(name) {
* @param Env additional optional env for the container
* @returns Boolean for whether a new container is created.
*/
export async function loadOrCreateContainer(
image,
name,
network,
Env: string[] = []
) {
async function loadOrCreateContainer(image, name, network, Env: string[] = []) {
console.log("loading container", name);
let ip = await loadContainer(name, network);
if (ip) return false;
Expand All @@ -81,6 +76,23 @@ export async function loadOrCreateContainer(
return true;
}

async function getContainerInfo(name): Promise<string | null> {
return new Promise((resolve, reject) => {
var docker = new Docker();
let container = docker.getContainer(name);
container.inspect((err, data) => {
if (err) {
console.log("getContainerInfo: container seems not exist.");
return resolve(null);
}
if (data?.State.Running) {
return resolve(data.State.StartedAt);
}
return resolve(null);
});
});
}

async function loadContainer(name, network) {
// if already exists, just return the IP
// else, create and return the IP
Expand Down Expand Up @@ -177,6 +189,24 @@ async function createContainer(image, name, network, Env) {
});
}

async function scanRunningSessions(): Promise<string[]> {
return new Promise((resolve, reject) => {
var docker = new Docker();
docker.listContainers((err, containers) => {
if (err) {
console.log("ERR:", err);
return;
}
let sessionIds = containers
?.filter(
(c) => c.Names[0].startsWith("/cpkernel_") && c.State === "running"
)
.map((c) => c.Names[0].substring("/cpkernel_".length));
return resolve(sessionIds || []);
});
});
}

export async function spawnRuntime(_, { sessionId }) {
// launch the kernel
console.log("Spawning ");
Expand Down Expand Up @@ -260,3 +290,105 @@ export async function killRuntime(_, { sessionId }) {
});
return true;
}

/**
* Get the runtime info.
* @param sessionId the session ID
* @returns {startedAt} the time when the runtime is started.
*/
export async function infoRuntime(_, { sessionId }) {
let zmq_host = `cpkernel_${sessionId}`;
let ws_host = `cpruntime_${sessionId}`;
let startedAt = await getContainerInfo(ws_host);
return {
startedAt: startedAt ? new Date(startedAt).getTime() : null,
};
}

async function killInactiveRoutes() {
let { data } = await apollo_client.query({
query: gql`
query GetUrls {
getUrls {
url
lastActive
}
}
`,
fetchPolicy: "network-only",
});
const now = new Date();
let inactiveRoutes = data.getUrls
.filter(({ url, lastActive }) => {
if (!lastActive) return false;
let d2 = new Date(parseInt(lastActive));
// Prod: 12 hours TTL
let ttl = 1000 * 60 * 60 * 12;
// DEBUG: 1 minute TTL
// let ttl = 1000 * 60 * 3;
let activeTime = now.getTime() - d2.getTime();
return activeTime > ttl;
})
.map(({ url }) => url);
console.log("Inactive routes", inactiveRoutes);
for (let url of inactiveRoutes) {
let sessionId = url.substring(1);
let zmq_host = `cpkernel_${sessionId}`;
let ws_host = `cpruntime_${sessionId}`;
await removeContainer(zmq_host);
await removeContainer(ws_host);
await apollo_client.mutate({
mutation: gql`
mutation deleteRoute($url: String) {
deleteRoute(url: $url)
}
`,
variables: {
url,
},
});
}
}

/**
* Periodically kill inactive routes every minute.
*/
export function loopKillInactiveRoutes() {
setInterval(async () => {
await killInactiveRoutes();
}, 1000 * 60 * 1);
}

/**
* At startup, check all active containers and add them to the table.
*/
export async function initRoutes() {
let sessionIds = await scanRunningSessions();
console.log("initRoutes sessionIds", sessionIds);
for (let id of sessionIds) {
let url = `/${id}`;
let ws_host = `cpruntime_${id}`;
await apollo_client.mutate({
mutation: gql`
mutation addRoute($url: String, $target: String) {
addRoute(url: $url, target: $target)
}
`,
variables: {
url,
// This 4020 is the WS listening port in WS_RUNTIME_IMAGE
target: `${ws_host}:4020`,
},
// refetchQueries: ["getUrls"],
refetchQueries: [
{
query: gql`
query getUrls {
getUrls
}
`,
},
],
});
}
}
13 changes: 13 additions & 0 deletions api/src/spawner-k8s.ts
Original file line number Diff line number Diff line change
Expand Up @@ -285,3 +285,16 @@ export async function killRuntime(_, { sessionId }) {
});
return true;
}

/**
* Get the runtime info.
* @param sessionId the session ID
* @returns {startedAt} the time when the runtime is started.
*/
export async function infoRuntime(_, { sessionId }) {
// TODO implement
throw new Error("Not implemented");
return {
startedAt: null,
};
}
10 changes: 9 additions & 1 deletion api/src/typedefs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ export const typeDefs = gql`
children: [ID]
}
type RuntimeInfo {
startedAt: String
lastActive: String
sessionId: String
ttl: Int
}
type Query {
hello: String
users: [User]
Expand All @@ -85,8 +92,9 @@ export const typeDefs = gql`
pod(id: ID!): Pod
myRepos: [Repo]
activeSessions: [String]
listAllRuntimes: [String]
listAllRuntimes: [RuntimeInfo]
myCollabRepos: [Repo]
infoRuntime(sessionId: String!): RuntimeInfo
}
type Mutation {
Expand Down
23 changes: 21 additions & 2 deletions proxy/src/node-proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,18 @@ async function getRouteTarget(req) {
return null;
}

// In memory store for active routes.
const activeTable: Record<string, Date> = {};

async function startAPIServer() {
const apollo = new ApolloServer({
typeDefs: gql`
type RouteInfo {
url: String
lastActive: String
}
type Query {
getUrls: [String]
getUrls: [RouteInfo]
getRoute(url: String): String
}
Expand All @@ -233,7 +240,11 @@ async function startAPIServer() {
getUrls: async () => {
// return all routes
const res = await _routes.getAll();
return Object.keys(res);
let urls = Object.keys(res);
return urls.map((url) => ({
url: url,
lastActive: activeTable[url],
}));
},
getRoute: async (_, { url }) => {
return await _routes.get(url);
Expand All @@ -253,6 +264,9 @@ async function startAPIServer() {
deleteRoute: async (_, { url }) => {
console.log("Delete route", url);
await _routes.remove(url);
if (activeTable[url]) {
delete activeTable[url];
}
return true;
},
},
Expand Down Expand Up @@ -289,7 +303,12 @@ function startProxyServer() {
});

server.on("upgrade", async (req, socket, head) => {
if (!req.url) {
return;
}
console.log("proxy ws req", req.url);
// FIXME why there're two leading slashes? "//user_xxx_repo_xxx"
activeTable[req.url.substring(1)] = new Date();
let match = await getRouteTarget(req);
if (!match) {
return;
Expand Down
Loading