Skip to content

Commit c0c3a33

Browse files
committed
Using akka extensions mechanism
1 parent 7969547 commit c0c3a33

File tree

11 files changed

+84
-80
lines changed

11 files changed

+84
-80
lines changed

src/main/resources/reference.conf

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
# This file has all the default settings, so all these could be removed with no visible effect.
66
# Modify as needed.
77
akka {
8+
extensions = ["com.thenewmotion.akka.http.HttpExtension$"]
89
http {
910
system-name = http # name of the system used in servlet
1011
endpoints-path = endpoints # path of the actor to use for receiving and adding endpoints, call 'actorFor(/user/endpoints)'

src/main/scala/com/thenewmotion/akka/http/ActorHttpSystem.scala

-32
This file was deleted.

src/main/scala/com/thenewmotion/akka/http/AkkaHttpServlet.scala

+6-10
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,16 @@ package com.thenewmotion.akka.http
22

33
import javax.servlet.http.{HttpServletResponse, HttpServletRequest, HttpServlet}
44
import akka.actor.{ActorSystem, Props}
5-
import Http._
65

76

87
class AkkaHttpServlet extends HttpServlet {
98

10-
private[http] var _actorSystem: Option[ActorHttpSystem] = None
9+
private[http] var _actorSystem: Option[ActorSystem] = None
1110

1211
override def init() {
1312
super.init()
14-
15-
val system = ActorHttpSystem()
16-
system.actorOf(Props[EndpointsActor], system.endpointsPath)
17-
_actorSystem = Some(system)
18-
onSystemInit(system)
13+
_actorSystem = Some(HttpSystem())
14+
onSystemInit(_actorSystem.get)
1915
}
2016

2117
override def destroy() {
@@ -29,8 +25,8 @@ class AkkaHttpServlet extends HttpServlet {
2925
_actorSystem = None
3026
}
3127

32-
def onSystemInit(system: ActorHttpSystem) {}
33-
def onSystemDestroy(system: ActorHttpSystem) {}
28+
def onSystemInit(system: ActorSystem) {}
29+
def onSystemDestroy(system: ActorSystem) {}
3430

3531
override def doPost(req: HttpServletRequest, res: HttpServletResponse) {doActor(req, res)}
3632
override def doPut(req: HttpServletRequest, res: HttpServletResponse) {doActor(req, res)}
@@ -45,7 +41,7 @@ class AkkaHttpServlet extends HttpServlet {
4541
val actor = system.actorOf(props)
4642

4743
val asyncContext = req.startAsync()
48-
asyncContext.setTimeout(system.asyncTimeout)
44+
asyncContext.setTimeout(HttpExtension(system).asyncTimeout)
4945
asyncContext.addListener(new Listener(actor, system))
5046

5147
actor ! asyncContext

src/main/scala/com/thenewmotion/akka/http/AsyncActor.scala

+11-13
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import akka.actor._
55
import akka.util.duration._
66
import javax.servlet.{ServletRequest, ServletResponse, AsyncContext}
77
import Endpoints._
8-
import Http._
98
import Async._
109

1110
/**
@@ -15,18 +14,18 @@ class AsyncActor extends Actor with LoggingFSM[State, Data] {
1514

1615
implicit def res2HttpRes(res: ServletResponse) = res.asInstanceOf[HttpServletResponse]
1716
implicit def req2HttpReq(req: ServletRequest) = req.asInstanceOf[HttpServletRequest]
18-
val endpointTimeout = context.system.http.endpointRetrievalTimeout
17+
val endpointTimeout = HttpExtension(context.system).endpointRetrievalTimeout
1918

2019
startWith(Idle, Empty)
2120

2221
when(Idle) {
2322
case Event(async: AsyncContext, Empty) =>
2423
val url = async.getRequest.getPathInfo
25-
log.debug("Started async for '{}'", url)
26-
context.system.http.endpoints ! Find(url)
27-
goto(Started) using Context(async, url)
24+
log.debug("AboutToProcess$ async for '{}'", url)
25+
HttpExtension(context.system).endpoints ! Find(url)
26+
goto(AboutToProcess) using Context(async, url)
2827
}
29-
when(Started, endpointTimeout millis) {
28+
when(AboutToProcess, endpointTimeout millis) {
3029
case Event(Found(EndpointFunc(func)), ctx@Context(_, url)) =>
3130
log.debug("Processing async for '{}'", url)
3231
safeProcess(func, ctx)
@@ -37,9 +36,9 @@ class AsyncActor extends Actor with LoggingFSM[State, Data] {
3736
log.debug("No endpoint received within {} millis for '{}'", endpointTimeout, url)
3837
safeProcess(NotFound, ctx)
3938
}
40-
when(Completing) {
39+
when(AboutToComplete) {
4140
case Event(Complete(completing), ctx@Context(async, url)) =>
42-
log.debug("Completing async for '{}'", url)
41+
log.debug("AboutToComplete$ async for '{}'", url)
4342

4443
def doComplete(callback: Callback) {
4544
val success = try {
@@ -93,25 +92,24 @@ class AsyncActor extends Actor with LoggingFSM[State, Data] {
9392
catch InternalErrorOnException(async.url)
9493

9594
//we want receive different messages before responding, for example 'Timeout'
96-
goto(Completing) using async
95+
goto(AboutToComplete) using async
9796
}
9897

9998
def safeProcess(actor: ActorRef, async: Context): State = {
10099
try actor ! async.context.getRequest
101100
catch InternalErrorOnException(async.url)
102101

103102
//actor should respond with Complete(..) message
104-
goto(Completing) using async
103+
goto(AboutToComplete) using async
105104
}
106105
}
107106

108107

109108
object Async {
110109
sealed trait State
111110
case object Idle extends State
112-
case object Started extends State
113-
case object ProcessingRequest extends State
114-
case object Completing extends State
111+
case object AboutToProcess extends State
112+
case object AboutToComplete extends State
115113

116114
sealed trait Data
117115
case class Context(context: AsyncContext, url: String) extends Data

src/main/scala/com/thenewmotion/akka/http/Http.scala

-17
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package com.thenewmotion.akka.http
2+
3+
import com.typesafe.config.ConfigFactory
4+
import akka.actor._
5+
6+
7+
/**
8+
* @author Yaroslav Klymko
9+
*/
10+
11+
object HttpSystem {
12+
def apply(): ActorSystem = {
13+
val name = ConfigFactory.load().getString("akka.http.system-name")
14+
val system = ActorSystem(name)
15+
system.actorOf(Props[EndpointsActor], HttpExtension(system).endpointsName)
16+
system
17+
}
18+
}
19+
20+
class HttpExtension(val system: ActorSystem) extends Extension {
21+
private val config = system.settings.config
22+
import config._
23+
24+
val asyncTimeout: Long = system.settings.config.getLong("akka.http.timeout")
25+
val endpointsName: String = getString("akka.http.endpoints-path")
26+
def endpoints: ActorRef = system.actorFor("/user/" + endpointsName)
27+
val endpointRetrievalTimeout: Long = getLong("akka.http.endpoint-retrieval-timeout")
28+
val expiredHeader: (String, String) =
29+
getString("akka.http.expired-header-name") -> getString("akka.http.expired-header-value")
30+
}
31+
32+
object HttpExtension extends ExtensionId[HttpExtension] with ExtensionIdProvider {
33+
def lookup() = HttpExtension
34+
def createExtension(system: ExtendedActorSystem) = new HttpExtension(system)
35+
}

src/main/scala/com/thenewmotion/akka/http/Listener.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
package com.thenewmotion.akka.http
22

3-
import akka.actor.ActorRef
43
import javax.servlet.{AsyncEvent, AsyncListener}
54
import javax.servlet.http.HttpServletResponse
5+
import akka.actor.{ActorSystem, ActorRef}
66

77
/**
88
* @author Yaroslav Klymko
99
*/
10-
class Listener(actor: ActorRef, system: ActorHttpSystem) extends AsyncListener {
10+
class Listener(actor: ActorRef, system: ActorSystem) extends AsyncListener {
1111

1212
import Listener._
1313

@@ -28,7 +28,7 @@ class Listener(actor: ActorRef, system: ActorHttpSystem) extends AsyncListener {
2828
val asyncContext = event.getAsyncContext
2929

3030
val res = asyncContext.getResponse.asInstanceOf[HttpServletResponse]
31-
val (name, value) = system.expiredHeader()
31+
val (name, value) = HttpExtension(system).expiredHeader
3232
res.addHeader(name, value)
3333
asyncContext.complete()
3434
}

src/main/scala/com/thenewmotion/akka/http/StaticAkkaHttpServlet.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
package com.thenewmotion.akka.http
22

33
import Endpoints._
4+
import akka.actor.ActorSystem
45

56
/**
67
* @author Yaroslav Klymko
78
*/
89
abstract class StaticAkkaHttpServlet extends AkkaHttpServlet {
910

10-
override def onSystemInit(system: ActorHttpSystem) {
11-
system.endpoints ! Attach("static", providers)
11+
override def onSystemInit(system: ActorSystem) {
12+
HttpExtension(system).endpoints ! Attach("static", providers)
1213
}
1314

1415
def providers: Provider

src/test/scala/com/thenewmotion/akka/http/AkkaHttpServletSpec.scala

+1-2
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,7 @@ class AkkaHttpServletSpec extends SpecificationWithJUnit {
3232

3333
"shutdown ActorSystem on destroy" >> {
3434
servlet._actorSystem must beSome
35-
val httpSystem = servlet._actorSystem.get
36-
val system = httpSystem.system
35+
val system = servlet._actorSystem.get
3736
servlet.destroy()
3837
system.awaitTermination()
3938
system.isTerminated must beTrue

src/test/scala/com/thenewmotion/akka/http/AsyncActorSpec.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ class AsyncActorSpec extends SpecificationWithJUnit with Mockito {
3939
actorRef.stateName mustEqual Idle
4040
actorRef.stateData mustEqual Empty
4141
actorRef ! asyncContext
42-
actorRef.stateName mustEqual Started
42+
actorRef.stateName mustEqual AboutToProcess
4343
actorRef.stateData mustEqual Context(asyncContext, "/test")
4444
}
4545

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package com.thenewmotion.akka.http
2+
3+
import org.specs2.mutable.SpecificationWithJUnit
4+
import akka.actor.ActorSystem
5+
6+
/**
7+
* @author Yaroslav Klymko
8+
*/
9+
class HttpExtensionSpec extends SpecificationWithJUnit {
10+
11+
"HttpExtension" should {
12+
"load on startup" >> {
13+
println(">>")
14+
println("system >> ")
15+
val system = ActorSystem()
16+
println("system << ")
17+
println("Extension >> ")
18+
val extension = HttpExtension(system)
19+
println("<<")
20+
todo
21+
}
22+
}
23+
}

0 commit comments

Comments
 (0)