Skip to content

Commit

Permalink
http (fix): Properly report exception stack traces to server-side logs (
Browse files Browse the repository at this point in the history
#3358)

The server-side stack trace was not recorded in airframe-http-netty.
This PR fixes this issue.
  • Loading branch information
xerial authored Jan 30, 2024
1 parent 76dd486 commit 5defb02
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,18 @@ import io.netty.handler.stream.ChunkedWriteHandler
import wvlet.airframe.codec.{MessageCodec, MessageCodecFactory}
import wvlet.airframe.control.ThreadUtil
import wvlet.airframe.http.HttpMessage.Response
import wvlet.airframe.http.{HttpMessage, *}
import wvlet.airframe.http.client.{AsyncClient, SyncClient}
import wvlet.airframe.http.internal.{LogRotationHttpLogger, RPCLoggingFilter, RPCResponseFilter}
import wvlet.airframe.http.internal.{LogRotationHttpLogger, RPCResponseFilter}
import wvlet.airframe.http.router.{ControllerProvider, HttpRequestDispatcher}
import wvlet.airframe.http.{HttpMessage, *}
import wvlet.airframe.rx.Rx
import wvlet.airframe.surface.Surface
import wvlet.airframe.{Design, Session}
import wvlet.log.LogSupport
import wvlet.log.io.IOUtil

import java.util.concurrent.{Executors, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{Executors, TimeUnit}
import javax.annotation.PostConstruct
import scala.collection.immutable.ListMap
import scala.concurrent.ExecutionContext
Expand All @@ -52,7 +52,6 @@ case class NettyServerConfig(
httpLoggerProvider: HttpLoggerConfig => HttpLogger = { (config: HttpLoggerConfig) =>
new LogRotationHttpLogger(config)
},
loggingFilter: HttpLogger => RxHttpFilter = { new RPCLoggingFilter(_) },
customCodec: PartialFunction[Surface, MessageCodec[_]] = PartialFunction.empty,
// Thread manager for handling Future[_] responses
executionContext: ExecutionContext = {
Expand Down Expand Up @@ -86,7 +85,6 @@ case class NettyServerConfig(
}
def noLogging: NettyServerConfig = {
this.copy(
loggingFilter = { _ => RxHttpFilter.identity },
httpLoggerProvider = HttpLogger.emptyLogger(_)
)
}
Expand Down Expand Up @@ -138,8 +136,8 @@ case class NettyServerConfig(

class NettyServer(config: NettyServerConfig, session: Session) extends HttpServer with LogSupport {

private val httpLogger: HttpLogger = config.newHttpLogger
private val loggingFilter: RxHttpFilter = config.loggingFilter(httpLogger)
private val httpLogger: HttpLogger = config.newHttpLogger
private val rpcFilter: RxHttpFilter = new RPCResponseFilter(httpLogger)

private val bossGroup = {
val tf = ThreadUtil.newDaemonThreadFactory("airframe-netty-boss")
Expand Down Expand Up @@ -235,8 +233,7 @@ class NettyServer(config: NettyServerConfig, session: Session) extends HttpServe
NettyBackend
.rxFilterAdapter(
attachContextFilter
.andThen(loggingFilter)
.andThen(RPCResponseFilter)
.andThen(rpcFilter)
)
.andThen(
HttpRequestDispatcher.newDispatcher(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import wvlet.log.LogSupport
* Interface for writing HTTP request/response logs
*/
trait HttpLogger extends AutoCloseable {
// Headers to exclude from the logs
val excludeHeaders: HttpMultiMap = HttpMultiMap.fromHeaderNames(config.excludeHeaders)

def config: HttpLoggerConfig

final def write(log: Map[String, Any]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ sealed abstract class RPCStatus(

def shouldReportStackTrace: Boolean = {
this match {
// Do not report stack traces for non-authorized user requests by default
case UNAUTHENTICATED_U13 | PERMISSION_DENIED_U14 => false
case _ => true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,27 @@ import wvlet.airframe.http.{HttpLogger, HttpLoggerConfig, HttpMultiMap, RxHttpEn
import wvlet.airframe.rx.Rx
import wvlet.log.LogSupport

import scala.util.{Failure, Success}

/**
* A client-side filter for logging HTTP requests and responses
*/
class HttpClientLoggingFilter(httpLogger: HttpLogger) extends HttpClientFilter with AutoCloseable with LogSupport {

private val excludeHeaders = HttpMultiMap.fromHeaderNames(httpLogger.config.excludeHeaders)

override def close(): Unit = {
httpLogger.close()
}

def apply(context: HttpClientContext): RxHttpFilter = new RxHttpFilter {
override def apply(request: Request, next: RxHttpEndpoint): Rx[Response] = {
HttpLogs.reportLog(httpLogger, excludeHeaders, request, next, Some(context))
val logContext = new HttpLogs.LogContext(request, httpLogger, Some(context), None)
next(request)
.tap { resp =>
// TODO Record exceptions returned from the server
logContext.logResponse(resp, None)
}
.tapOnFailure { ex =>
logContext.logError(ex)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,69 +19,68 @@ import wvlet.airframe.http.client.HttpClientContext
import wvlet.airframe.rx.Rx
import wvlet.airframe.surface.{Parameter, Surface, TypeName}
import wvlet.airframe.ulid.ULID
import wvlet.log.LogTimestampFormatter
import wvlet.log.{LogSupport, LogTimestampFormatter}

import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import scala.annotation.tailrec
import scala.collection.immutable.ListMap
import scala.concurrent.ExecutionException
import scala.util.Try
import scala.util.{Failure, Success, Try}

/**
* Internal utilities for HTTP request/response logging
*/
object HttpLogs {
object HttpLogs extends LogSupport {

def reportLog(
httpLogger: HttpLogger,
excludeHeaders: HttpMultiMap,
class LogContext(
request: Request,
next: RxHttpEndpoint,
clientContext: Option[HttpClientContext] = None,
rpcContext: Option[RPCContext] = None
): Rx[Response] = {
val baseTime = System.currentTimeMillis()
val start = System.nanoTime()
val m = ListMap.newBuilder[String, Any]
m ++= unixTimeLogs(baseTime)
m ++= commonRequestLogs(request)
m ++= requestHeaderLogs(request, excludeHeaders)

def reportLogs: Unit = {
// Finally, write the log
httpLogger.write(httpLogger.config.logFilter(m.result()))
}
httpLogger: HttpLogger,
clientContext: Option[HttpClientContext],
rpcContext: Option[RPCContext]
) {
private val baseTime = System.currentTimeMillis()
private val start = System.nanoTime()
private val m = ListMap.newBuilder[String, Any]

def rpcCallLogs(): Unit = {
// RPC call context will be set to a TLS after dispatching an event
// TODO Pass RPC call context without using TLS
init()

private def init(): Unit = {
m ++= unixTimeLogs(baseTime)
m ++= commonRequestLogs(request)
m ++= requestHeaderLogs(request, httpLogger.excludeHeaders)

// Log RPC context in the client side
clientContext.foreach {
_.rpcMethod.map { rpc => m ++= rpcMethodLogs(rpc) }
}
// Log RPC context in the server side
rpcContext.flatMap(_.rpcCallContext).foreach { rcc =>
m ++= rpcLogs(rcc)
}
}

clientContext.foreach {
_.rpcMethod.map { rpc => m ++= rpcMethodLogs(rpc) }
private def logResponse(): Unit = {
m ++= durationLogs(baseTime, start)
}

next
.apply(request)
.toRx
.map { resp =>
m ++= durationLogs(baseTime, start)
rpcCallLogs()
m ++= commonResponseLogs(resp)
m ++= responseHeaderLogs(resp, excludeHeaders)
reportLogs
resp
}
.recoverWith { case e: Throwable =>
m ++= durationLogs(baseTime, start)
rpcCallLogs()
m ++= errorLogs(e)
reportLogs
Rx.exception(e)
}
def logResponse(response: Response, exception: Option[Throwable]): Response = {
logResponse()
m ++= commonResponseLogs(response)
m ++= responseHeaderLogs(response, httpLogger.excludeHeaders)
exception.foreach(ex => m ++= errorLogs(ex))

// Write the log
httpLogger.write(httpLogger.config.logFilter(m.result()))
response
}

def logError(e: Throwable): Unit = {
logResponse()
m ++= errorLogs(e)

// Write the log
httpLogger.write(httpLogger.config.logFilter(m.result()))
}
}

def durationLogs(baseTime: Long, sinceNano: Long): ListMap[String, Any] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ case class RPCCallContext(
// The full class name of the RPC implementation class
def rpcClassName: String = TypeName.sanitizeTypeName(rpcMethodSurface.owner.fullName)
def rpcInterfaceName: String = rpcMethod.rpcInterfaceName
def rpcMethodName: String = rpcMethodSurface.name
def rpcMethodName: String = rpcMethod.methodName
def withRPCArgs(rpcArgs: Seq[Any]): RPCCallContext = this.copy(rpcArgs = rpcArgs)
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ package wvlet.airframe.http.internal
import wvlet.airframe.http.{
Http,
HttpHeader,
HttpLogger,
HttpMessage,
HttpMultiMap,
HttpServerException,
HttpStatus,
RPCContext,
RPCException,
RPCStatus,
RxHttpEndpoint,
Expand All @@ -30,23 +33,28 @@ import wvlet.log.LogSupport
import scala.util.{Failure, Success}

/**
* Add RPCStatus to the response header and embed the error message to the request body
* A filter for managing RPC status header, logs, and errors. Exception messages will be embedded to the response body.
*/
object RPCResponseFilter extends RxHttpFilter with LogSupport {
class RPCResponseFilter(httpLogger: HttpLogger) extends RxHttpFilter with LogSupport {
override def apply(request: HttpMessage.Request, next: RxHttpEndpoint): Rx[HttpMessage.Response] = {

val logContext = new HttpLogs.LogContext(request, httpLogger, None, Some(RPCContext.current))

next(request)
.transform {
case Success(resp) =>
setRPCStatus(resp)
logContext.logResponse(setRPCStatus(resp), None)
case Failure(e) =>
e match {
case ex: HttpServerException =>
val re = RPCStatus.fromHttpStatus(ex.status).newException(ex.getMessage, ex.getCause)
re.toResponse
logContext.logResponse(re.toResponse, Some(re))
case ex: RPCException =>
ex.toResponse
logContext.logResponse(ex.toResponse, Some(ex))
case other =>
RPCStatus.INTERNAL_ERROR_I0.newException(other.getMessage, other).toResponse
val ex = RPCStatus.INTERNAL_ERROR_I0.newException(other.getMessage, other)
// Report the original error to the log
logContext.logResponse(ex.toResponse, Some(other))
}
}
}
Expand Down

0 comments on commit 5defb02

Please sign in to comment.