Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: RPC HTTP Adapter #3630

Merged
merged 15 commits into from
Mar 10, 2023
11 changes: 11 additions & 0 deletions ironfish/src/fileStores/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ export const DEFAULT_DISCORD_INVITE = 'https://discord.ironfish.network'
export const DEFAULT_USE_RPC_IPC = true
export const DEFAULT_USE_RPC_TCP = false
export const DEFAULT_USE_RPC_TLS = true
// TODO(daniel): Setting this to false until we can get HTTPS + basic auth
export const DEFAULT_USE_RPC_HTTP = false
export const DEFAULT_POOL_HOST = '::'
export const DEFAULT_POOL_PORT = 9034
export const DEFAULT_NETWORK_ID = 0
Expand All @@ -33,6 +35,7 @@ export type ConfigOptions = {
enableRpcIpc: boolean
enableRpcTcp: boolean
enableRpcTls: boolean
enableRpcHttp: boolean
enableSyncing: boolean
enableTelemetry: boolean
enableMetrics: boolean
Expand Down Expand Up @@ -93,6 +96,8 @@ export type ConfigOptions = {
rpcTcpPort: number
tlsKeyPath: string
tlsCertPath: string
rpcHttpHost: string
rpcHttpPort: number
/**
* The maximum number of peers we can be connected to at a time. Past this number,
* new connections will be rejected.
Expand Down Expand Up @@ -276,6 +281,7 @@ export const ConfigOptionsSchema: yup.ObjectSchema<Partial<ConfigOptions>> = yup
enableRpcIpc: yup.boolean(),
enableRpcTcp: yup.boolean(),
enableRpcTls: yup.boolean(),
enableRpcHttp: yup.boolean(),
enableSyncing: yup.boolean(),
enableTelemetry: yup.boolean(),
enableMetrics: yup.boolean(),
Expand All @@ -298,6 +304,8 @@ export const ConfigOptionsSchema: yup.ObjectSchema<Partial<ConfigOptions>> = yup
rpcTcpPort: YupUtils.isPort,
tlsKeyPath: yup.string().trim(),
tlsCertPath: yup.string().trim(),
rpcHttpHost: yup.string().trim(),
rpcHttpPort: YupUtils.isPort,
maxPeers: YupUtils.isPositiveInteger,
minPeers: YupUtils.isPositiveInteger,
targetPeers: yup.number().integer().min(1),
Expand Down Expand Up @@ -369,6 +377,7 @@ export class Config extends KeyStore<ConfigOptions> {
enableRpcIpc: DEFAULT_USE_RPC_IPC,
enableRpcTcp: DEFAULT_USE_RPC_TCP,
enableRpcTls: DEFAULT_USE_RPC_TLS,
enableRpcHttp: DEFAULT_USE_RPC_HTTP,
enableSyncing: true,
enableTelemetry: false,
enableMetrics: true,
Expand All @@ -388,6 +397,8 @@ export class Config extends KeyStore<ConfigOptions> {
rpcTcpPort: 8020,
tlsKeyPath: files.resolve(files.join(dataDir, 'certs', 'node-key.pem')),
tlsCertPath: files.resolve(files.join(dataDir, 'certs', 'node-cert.pem')),
rpcHttpHost: 'localhost',
NullSoldier marked this conversation as resolved.
Show resolved Hide resolved
rpcHttpPort: 8021,
maxPeers: 50,
confirmations: 2,
minPeers: 1,
Expand Down
219 changes: 219 additions & 0 deletions ironfish/src/rpc/adapters/httpAdapter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
import http from 'http'
import { v4 as uuid } from 'uuid'
import { Assert } from '../../assert'
import { createRootLogger, Logger } from '../../logger'
import { ErrorUtils } from '../../utils'
import { RpcRequest } from '../request'
import { ApiNamespace, Router } from '../routes'
import { RpcServer } from '../server'
import { IRpcAdapter } from './adapter'
import { ERROR_CODES, ResponseError } from './errors'

const MEGABYTES = 1000 * 1000
const MAX_REQUEST_SIZE = 5 * MEGABYTES

export type HttpRpcError = {
status: number
code: string
message: string
stack?: string
}

export class RpcHttpAdapter implements IRpcAdapter {
server: http.Server | null = null
router: Router | null = null

readonly host: string
readonly port: number
readonly logger: Logger
readonly namespaces: ApiNamespace[]
private requests: Map<
string,
{
rpcRequest?: RpcRequest
req: http.IncomingMessage
waitForClose: Promise<void>
}
>

constructor(
host: string,
port: number,
logger: Logger = createRootLogger(),
namespaces: ApiNamespace[],
) {
this.host = host
this.port = port
this.logger = logger
this.namespaces = namespaces
this.requests = new Map()
}

attach(server: RpcServer): void | Promise<void> {
this.router = server.getRouter(this.namespaces)
}

start(): Promise<void> {
this.logger.debug(`Serving RPC on HTTP ${this.host}:${this.port}`)

const server = http.createServer()
this.server = server

return new Promise((resolve, reject) => {
const onError = (err: unknown) => {
server.off('error', onError)
server.off('listening', onListening)
reject(err)
}

const onListening = () => {
server.off('error', onError)
server.off('listening', onListening)

server.on('request', (req, res) => {
const requestId = uuid()

const waitForClose = new Promise<void>((resolve) => {
req.on('close', () => {
this.cleanUpRequest(requestId)
resolve()
})
})

this.requests.set(requestId, { req, waitForClose })

void this.handleRequest(req, res, requestId).catch((e) => {
const error = ErrorUtils.renderError(e)
this.logger.debug(`Error in HTTP adapter: ${error}`)
let errorResponse: HttpRpcError = {
code: ERROR_CODES.ERROR,
status: 500,
message: error,
}

if (e instanceof ResponseError) {
errorResponse = {
code: e.code,
status: e.status,
message: e.message,
stack: e.stack,
}
}

res.writeHead(errorResponse.status)
res.end(JSON.stringify(errorResponse))

this.cleanUpRequest(requestId)
})
})

resolve()
}

server.on('error', onError)
server.on('listening', onListening)
server.listen(this.port, this.host)
})
}

async stop(): Promise<void> {
NullSoldier marked this conversation as resolved.
Show resolved Hide resolved
for (const { req, rpcRequest } of this.requests.values()) {
req.destroy()
NullSoldier marked this conversation as resolved.
Show resolved Hide resolved
rpcRequest?.close()
}

await new Promise<void>((resolve) => {
this.server?.close(() => resolve()) || resolve()
})

await Promise.all(
Array.from(this.requests.values()).map(({ waitForClose }) => waitForClose),
)
}

cleanUpRequest(requestId: string): void {
const request = this.requests.get(requestId)

// TODO: request.req was is already closed at this point
// but do we need to clean that up here at all
request?.rpcRequest?.close()
this.requests.delete(requestId)
}

async handleRequest(
request: http.IncomingMessage,
response: http.ServerResponse,
requestId: string,
): Promise<void> {
if (this.router === null || this.router.server === null) {
throw new ResponseError('Tried to connect to unmounted adapter')
}

const router = this.router

if (request.url === undefined) {
throw new ResponseError('No request url provided')
}

this.logger.debug(
`Call HTTP RPC: ${request.method || 'undefined'} ${request.url || 'undefined'}`,
)

// TODO(daniel): better way to parse method from request here
const url = new URL(request.url, `http://${request.headers.host || 'localhost'}`)
const route = url.pathname.substring(1)

if (request.method !== 'POST') {
throw new ResponseError(
`Route does not exist, Did you mean to use POST?`,
ERROR_CODES.ROUTE_NOT_FOUND,
404,
)
}

// TODO(daniel): clean up reading body code here a bit of possible
let size = 0
const data: Buffer[] = []

for await (const chunk of request) {
Assert.isInstanceOf(chunk, Buffer)
size += chunk.byteLength
data.push(chunk)

if (size >= MAX_REQUEST_SIZE) {
throw new ResponseError('Max request size exceeded')
}
}

const combined = Buffer.concat(data)
// TODO(daniel): some routes assume that no data will be passed as undefined
// so keeping that convention here. Could think of a better way to handle?
const body = combined.length ? combined.toString('utf8') : undefined

const rpcRequest = new RpcRequest(
body === undefined ? undefined : JSON.parse(body),
route,
(status: number, data?: unknown) => {
response.writeHead(status, {
'Content-Type': 'application/json',
})
response.end(JSON.stringify({ status, data }))
this.cleanUpRequest(requestId)
},
(data: unknown) => {
// TODO: see if this is correct way to implement HTTP streaming.
// do more headers need to be set, etc.??
const bufferData = Buffer.from(JSON.stringify(data))
response.write(bufferData)
},
)

const currRequest = this.requests.get(requestId)
currRequest && this.requests.set(requestId, { ...currRequest, rpcRequest })

await router.route(route, rpcRequest)
}
}
1 change: 1 addition & 0 deletions ironfish/src/rpc/adapters/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

export * from './adapter'
export * from './errors'
export * from './httpAdapter'
export * from './ipcAdapter'
export * from './socketAdapter'
export * from './tcpAdapter'
Expand Down
15 changes: 12 additions & 3 deletions ironfish/src/sdk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import { WebSocketClient } from './network/webSocketClient'
import { IronfishNode } from './node'
import { IronfishPKG, Package } from './package'
import { Platform } from './platform'
import { RpcSocketClient, RpcTlsAdapter } from './rpc'
import { RpcHttpAdapter, RpcSocketClient, RpcTlsAdapter } from './rpc'
import { RpcIpcAdapter } from './rpc/adapters/ipcAdapter'
import { RpcTcpAdapter } from './rpc/adapters/tcpAdapter'
import { RpcClient } from './rpc/clients/client'
Expand Down Expand Up @@ -200,10 +200,19 @@ export class IronfishSdk {
})

if (this.config.get('enableRpcIpc')) {
const namespaces = ALL_API_NAMESPACES
await node.rpc.mount(
new RpcIpcAdapter(this.config.get('ipcPath'), this.logger, ALL_API_NAMESPACES),
)
}

if (this.config.get('enableRpcHttp')) {
await node.rpc.mount(
new RpcIpcAdapter(this.config.get('ipcPath'), this.logger, namespaces),
new RpcHttpAdapter(
this.config.get('rpcHttpHost'),
this.config.get('rpcHttpPort'),
this.logger,
ALL_API_NAMESPACES,
),
)
}

Expand Down