Skip to content

Commit 732a034

Browse files
committed
Merge pull request #14 from rampart81/master
#3 Chunked Encoding 지원 추가
2 parents b2e814d + 7ef2b41 commit 732a034

File tree

9 files changed

+376
-28
lines changed

9 files changed

+376
-28
lines changed

build.sbt

Lines changed: 0 additions & 12 deletions
This file was deleted.

project/Build.scala

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import sbt._
2+
import Keys._
3+
4+
object ScalaHttpServerBuild extends Build {
5+
import Dependencies._
6+
7+
lazy val root = Project(
8+
id = "scala-http-server",
9+
base = file("."),
10+
settings = BuildSetting.settings ++ Seq(libraryDependencies ++= coreDeps)
11+
)
12+
}
13+
14+
object Resolvers {
15+
val typesafe = "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
16+
val all = Seq(typesafe)
17+
}
18+
19+
object Dependencies {
20+
val akka = "com.typesafe.akka" % "akka-actor_2.10" % "2.1.0"
21+
val akkaTest = "com.typesafe.akka" % "akka-testkit_2.10" % "2.1.0" % "test"
22+
val tika = "org.apache.tika" % "tika-core" % "1.2"
23+
val scalaTest = "org.scalatest" % "scalatest_2.10" % "1.9.1" % "test"
24+
val coreDeps = Seq(
25+
akka,
26+
akkaTest,
27+
tika,
28+
scalaTest
29+
)
30+
}
31+
32+
object BuildSetting {
33+
val org = "laScala Korea"
34+
val ver = "1.0.0-SNAPSHOT"
35+
val scalaVer = "2.10.0"
36+
val settings = Defaults.defaultSettings ++ Seq(
37+
organization := org,
38+
version := ver,
39+
scalaVersion := scalaVer,
40+
shellPrompt := SBTPrompt.setting,
41+
resolvers := Resolvers.all)
42+
}
43+
44+
// SBT Prompt 를 튜닝하기 위한 세팅.
45+
object SBTPrompt {
46+
47+
object devnull extends ProcessLogger {
48+
def info(s: => String) {}
49+
def error(s: => String) {}
50+
def buffer[T](f: => T): T = f
51+
}
52+
53+
def gitBranches = ("git branch --no-color" lines_! devnull mkString)
54+
55+
val current = """\*\s+([\w-/]+)""".r
56+
val setting = {
57+
(state: State) => {
58+
val currBranch =
59+
current findFirstMatchIn gitBranches map (_ group (1)) getOrElse "-"
60+
val currProject = Project.extract(state).currentProject.id
61+
"%s:%s:%s> ".format(
62+
currProject, currBranch, BuildSetting.ver
63+
)
64+
}
65+
}
66+
}
67+

src/main/resource/stream_data1.txt

47.7 MB
Binary file not shown.

src/main/resource/stream_data2.txt

47.7 MB
Binary file not shown.

src/main/scala/com/lascala/http/HttpResponse.scala

Lines changed: 60 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,17 @@ package com.lascala.http
1919

2020
import com.lascala.http.HttpConstants._
2121

22-
import akka.util.{ ByteString, ByteStringBuilder }
22+
import akka.util.{ByteString, ByteStringBuilder}
2323
import java.io.File
2424
import org.apache.tika.Tika
2525
import java.io.FileInputStream
2626
import java.util.Date
2727
import java.util.Locale
2828
import java.text.SimpleDateFormat
2929
import java.util.TimeZone
30+
import akka.actor.IO
31+
import util.Failure
32+
import com.lascala.libs.Enumerator
3033

3134
trait HttpResponse {
3235
def lastModified: Date = null
@@ -42,12 +45,12 @@ trait HttpResponse {
4245
}
4346

4447
object HttpResponse {
45-
val version = ByteString("HTTP/1.1")
46-
val server = ByteString("Server: lascala-http")
47-
val connection = ByteString("Connection: ")
48-
val keepAlive = ByteString("Keep-Alive")
49-
val close = ByteString("Close")
50-
val date = ByteString("Date: ")
48+
val version = ByteString("HTTP/1.1")
49+
val server = ByteString("Server: lascala-http")
50+
val connection = ByteString("Connection: ")
51+
val keepAlive = ByteString("Keep-Alive")
52+
val close = ByteString("Close")
53+
val date = ByteString("Date: ")
5154
val lastModified = ByteString("Last-Modified: ")
5255

5356
def httpDateFormat = {
@@ -69,26 +72,71 @@ object HttpResponse {
6972
rsp.contentLength ++= CRLF ++=
7073
connection ++= (if (rsp.shouldKeepAlive) keepAlive else close) ++= CRLF ++= CRLF ++= rsp.body).result
7174
}
75+
76+
def stream(rsp: HttpResponse with ChunkedEncodable, socket: IO.SocketHandle) = {
77+
val headers = (new ByteStringBuilder ++=
78+
version ++= SP ++= rsp.status ++= SP ++= rsp.reason ++= CRLF ++=
79+
rsp.contentType ++ CRLF ++=
80+
rsp.cacheControl ++= CRLF ++=
81+
date ++= httpDate(new Date) ++= CRLF ++=
82+
Option(rsp.lastModified).map(lastModified ++ httpDate(_) ++ CRLF).getOrElse(ByteString("")) ++=
83+
server ++= CRLF ++=
84+
connection ++= (if (rsp.shouldKeepAlive) keepAlive else close) ++= CRLF ++=
85+
ByteString("Transfer-Encoding: chunked") ++= CRLF ++= CRLF).result
86+
87+
socket write headers.compact
88+
89+
rsp.chunkedData.foreach { chunk =>
90+
val chunkedMessageBody = (
91+
new ByteStringBuilder ++= ByteString(chunk.size.toHexString) ++= CRLF
92+
++= chunk ++= CRLF).result
93+
94+
socket write chunkedMessageBody.compact
95+
}
96+
97+
// According to the HTTP spec, need to write 0 at the end of the chunk data
98+
socket write ByteString("0") ++ CRLF ++ CRLF
99+
}
100+
}
101+
102+
trait ChunkedEncodable extends HttpResponse {
103+
def chunkedData: Enumerator[ByteString]
72104
}
73105

74106
case class OKFileResponse(file: File, shouldKeepAlive: Boolean = true) extends HttpResponse {
107+
val body = readFile(file)
108+
val mimeType = new Tika().detect(file)
109+
val status = ByteString("200")
110+
val reason = ByteString("OK")
111+
112+
override def lastModified = new Date(file.lastModified)
75113
def readFile(file: File) = {
76114
val resource = new Array[Byte](file.length.toInt)
77115
val in = new FileInputStream(file)
78116
in.read(resource)
79117
in.close()
80118
ByteString(resource)
81119
}
82-
val body = readFile(file)
83-
val mimeType = new Tika().detect(file)
84-
val status = ByteString("200")
85-
val reason = ByteString("OK")
86-
override def lastModified = new Date(file.lastModified)
87120
}
88121

89122
case class OKResponse(body: ByteString, shouldKeepAlive: Boolean = true, mimeType: String = "text/html") extends HttpResponse {
90123
val status = ByteString("200")
91124
val reason = ByteString("OK")
125+
126+
def withMimeType(mimeType: String) = this match {
127+
// In case of ChunkedEncodable, need to manually instantiate a new OKResponse with ChunkedEncodable
128+
// Instead of just using copy method in order to preserve the ChunkedEncodable type.
129+
case t: ChunkedEncodable => new OKResponse(this.body, this.shouldKeepAlive, mimeType) with ChunkedEncodable {
130+
def chunkedData: Enumerator[ByteString] = t.chunkedData
131+
}
132+
case _ => this.copy(mimeType = mimeType)
133+
}
134+
}
135+
136+
object OKResponse {
137+
def stream(chunk: Enumerator[ByteString]) = new OKResponse(body = ByteString.empty, mimeType = "text/html") with ChunkedEncodable {
138+
def chunkedData: Enumerator[ByteString] = chunk
139+
}
92140
}
93141

94142
case class NotModifiedResponse(body: ByteString = ByteString.empty, shouldKeepAlive: Boolean = false, mimeType: String = "") extends HttpResponse {

src/main/scala/com/lascala/http/HttpServer.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,12 @@ object HttpServer {
2929

3030
def processRequest(socket: IO.SocketHandle, handler: ActorRef): IO.Iteratee[Unit] = {
3131
def processResponse(response: HttpResponse) {
32-
socket write HttpResponse.bytes(response).compact
32+
response match {
33+
case chunkEncoded: HttpResponse with ChunkedEncodable =>
34+
HttpResponse.stream(chunkEncoded, socket)
35+
case _ => socket write HttpResponse.bytes(response).compact
36+
}
37+
3338
if (!response.shouldKeepAlive) socket.close()
3439
}
3540

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
* This software is licensed under the Apache 2 license, quoted below.
3+
*
4+
* Copyright 2009-2012 Typesafe Inc. <http://www.typesafe.com>
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
7+
* use this file except in compliance with the License. You may obtain a copy of
8+
* the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15+
* License for the specific language governing permissions and limitations under
16+
* the License.
17+
*/
18+
package com.lascala.libs
19+
20+
import akka.actor.IO
21+
import akka.util.ByteString
22+
import concurrent.{Await, Future}
23+
import concurrent.ExecutionContext.Implicits.global
24+
import java.io.InputStream
25+
import util.{Success, Failure}
26+
import concurrent.duration.Duration
27+
import java.util.concurrent.TimeUnit
28+
29+
30+
/**
31+
* Chunked encoding 을 서포트 하기위해 기본적인 Enumerator(or Producer) 패턴 을 구현한것이다.
32+
* future 가 각각 처리되어야 할 데이타 를 담당한다.
33+
* Foreach 메소드 가 이 Enumerator 와 연결되어있는 다른 enumerator 를 연동해
34+
* 각 Enumerator 의 future 를 호출해서 각각 chunk 데이타를 처리한다.
35+
*
36+
*/
37+
trait Enumerator[T] {
38+
parent =>
39+
40+
/**
41+
* 스칼라 Future 를 사용해서 인풋을 처리함.
42+
*
43+
* @note Some 은 계속해서 처리할 인풋이 있다는 뜻이고 None 은 그 반대임.
44+
*/
45+
def future: Future[Option[T]]
46+
47+
/**
48+
* 각각 Enumerator 의 퓨처를 호출한후 리턴된 값을 주어진 콜백 function 에
49+
* 적용한다. 퓨처는 None 을 리턴하기까지 귀재 방식으로 계속해서 호출됨.
50+
*
51+
* @param f callbakc function 을 사용하요 퓨처가 리턴한 값을 처리함.
52+
*/
53+
@scala.annotation.tailrec
54+
final def foreach[E](f: T => E) {
55+
Await.result(future, Duration.apply(60, TimeUnit.SECONDS)) match {
56+
case Some(v) => {
57+
f(v)
58+
foreach(f)
59+
}
60+
case None => if (next.isDefined) next.get.foreach(f)
61+
}
62+
}
63+
64+
def next: Option[Enumerator[T]] = None
65+
66+
/**
67+
* 여러 Enumerator 들을 연결시켜 주는 메소드
68+
*
69+
* @param other
70+
* @return
71+
*/
72+
def andThen(other: Enumerator[T]) = {
73+
def _chainOtherEnumerators(current: Option[Enumerator[T]], other: Enumerator[T]): Option[Enumerator[T]] = current match {
74+
case Some(curr) => Some(new Enumerator[T] {
75+
def future = curr.future
76+
override def next = _chainOtherEnumerators(curr.next, other)
77+
})
78+
case None => Some(other)
79+
}
80+
81+
new Enumerator[T] {
82+
def future: Future[Option[T]] = parent.future
83+
override def next = _chainOtherEnumerators(parent.next, other)
84+
}
85+
}
86+
}
87+
88+
/**
89+
* Enumerator companion object.
90+
*/
91+
object Enumerator {
92+
93+
def fromCallback(callback: () => Option[ByteString]) = new Enumerator[ByteString] {
94+
def future = Future(callback())
95+
}
96+
97+
/**
98+
* Takes InputStream as input source and returns a Enumerator that produces data.
99+
* InputStream 을 받아서 Enumerator 를 리턴해주는 유틸펑션.
100+
*
101+
* @param in
102+
* @param chunkSize
103+
*/
104+
def fromStream(in: InputStream, chunkSize: Int = 1024 * 8) = {
105+
def callback = { () =>
106+
val buffer = new Array[Byte](chunkSize)
107+
val chunk = in.read(buffer) match {
108+
case -1 =>
109+
in.close()
110+
None
111+
case read =>
112+
val input = new Array[Byte](read)
113+
System.arraycopy(buffer, 0, input, 0, read)
114+
Some(ByteString(input))
115+
}
116+
117+
chunk
118+
}
119+
120+
fromCallback(callback)
121+
}
122+
123+
/**
124+
* Convinient function that takes a file and returns a Enumerator
125+
*
126+
* 파일 오브젝트를 받아서 Enumerator 를 리턴해주는 유틸펑션.
127+
*
128+
* @param file
129+
* @param chunkSize
130+
*/
131+
def fromFile(file: java.io.File, chunkSize: Int = 1024 * 8) = fromStream(new java.io.FileInputStream(file), chunkSize)
132+
}

src/main/scala/common/main.scala

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ import com.lascala.http.HttpResponse._
2929
import com.lascala.http.HttpConstants._
3030
import akka.util.ByteString
3131
import java.io.File
32+
import com.lascala.libs.Enumerator
33+
import org.apache.tika.Tika
3234

3335
/**
3436
* Sample Demo Application
@@ -43,16 +45,30 @@ class RequestHandler extends Actor {
4345
val docroot = "."
4446

4547
def receive = {
46-
case HttpRequest("GET", pathSegments, _, _, headers, _) =>
48+
case HttpRequest("GET", List("chunked"), _, _, headers, _) =>
49+
sender ! OKResponse.stream(getEnumerator)
50+
case HttpRequest("GET", List("download"), _, _, headers, _) =>
51+
sender ! OKResponse.stream(getEnumerator).withMimeType("text/plain")
52+
case HttpRequest("GET", pathSegments, _, _, headers, _) =>
4753
new File(docroot, "/" + pathSegments.mkString(File.separator)) match {
4854
case file if file.isFile() =>
4955
headers.find(_.name.toLowerCase == "if-modified-since") match {
5056
case Some(d) if HttpResponse.httpDateFormat.parse(d.value).compareTo(new java.util.Date(file.lastModified)) != -1 => sender ! NotModifiedResponse()
5157
case _ => sender ! OKFileResponse(file, true)
5258
}
53-
case _ =>
54-
sender ! NotFoundError()
59+
case _ => sender ! NotFoundError()
5560
}
5661
case _ => sender ! MethodNotAllowedError()
5762
}
63+
64+
private def getEnumerator = {
65+
// chunked encoding 을 사용하여 사이즈가 큰 파일을 스트리밍으로 처리하기 받기.
66+
val file1 = new File(docroot + "/src/main/resource", "stream_data1.txt")
67+
val file2 = new File(docroot + "/src/main/resource", "stream_data2.txt")
68+
val enum1 = Enumerator.fromFile(file1)
69+
val enum2 = Enumerator.fromFile(file2)
70+
71+
// enume 을 연결시켜서 여러가지 input source 를 한 스트림으로 순서대로 처리하기.
72+
enum1 andThen enum2
73+
}
5874
}

0 commit comments

Comments
 (0)