Skip to content

Commit b6209ac

Browse files
committed
36 of 39 passed
1 parent d2148aa commit b6209ac

File tree

8 files changed

+303
-62
lines changed

8 files changed

+303
-62
lines changed

src/main/scala/jsenv/playwright/CEEnv.scala

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package jsenv.playwright
22

33
import org.scalajs.jsenv._
4-
import scribe.format.{FormatterInterpolator, date, level, mdc, messages, methodName, threadName}
4+
import scribe.format.{FormatterInterpolator, dateFull, level, mdc, messages, methodName, threadName}
55

66
import scala.util.control.NonFatal
77

@@ -11,6 +11,8 @@ class CEEnv(
1111
showLogs: Boolean = false,
1212
pwConfig: PWEnv.Config
1313
) extends JSEnv {
14+
// private lazy val validator = ExternalJSRun.supports(RunConfig.Validator())
15+
1416
private lazy val validator = {
1517
RunConfig
1618
.Validator()
@@ -22,26 +24,44 @@ class CEEnv(
2224

2325
override def start(input: Seq[Input], runConfig: RunConfig): JSRun =
2426
try {
27+
scribe.info(
28+
s"Calling validator CEEnv.start with input $input and runConfig $runConfig"
29+
)
2530
validator.validate(runConfig)
2631
new CERun(pwConfig, runConfig, input)
2732
} catch {
28-
case NonFatal(t) =>
29-
JSRun.failed(t)
33+
case e: Exception =>
34+
scribe.error(s"IllegalArgumentException $e")
35+
throw e
36+
case r : RuntimeException =>
37+
scribe.error(s"RuntimeException $r")
38+
throw r
39+
40+
// case NonFatal(t) =>
41+
// scribe.error(s"CEEnv.start failed with $t")
42+
// JSRun.failed(t)
3043
}
3144

3245
override def startWithCom(
3346
input: Seq[Input],
3447
runConfig: RunConfig,
3548
onMessage: String => Unit
36-
): JSComRun = new CEComRun(pwConfig, runConfig, input, onMessage)
49+
): JSComRun = try {
50+
validator.validate(runConfig)
51+
new CEComRun(pwConfig, runConfig, input, onMessage)
52+
} catch {
53+
case NonFatal(t) =>
54+
JSComRun.failed(t)
55+
}
3756

3857
private def setupLogger(showLogs: Boolean): Unit = {
3958
val formatter =
40-
formatter"$date [$threadName] $level $methodName - $messages$mdc"
59+
formatter"$dateFull [$threadName] $level $methodName - $messages$mdc"
60+
// val formatter"${date("yyyy-MM-dd HH:mm:ss.SSS")} [$threadName] $level $methodName - $messages$mdc"
4161
scribe.Logger.root
4262
.clearHandlers()
4363
.withHandler(
44-
formatter = formatter,
64+
formatter = formatter
4565
)
4666
.replace()
4767
if (showLogs) {

src/main/scala/jsenv/playwright/CERun.scala

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,13 @@ class CERun(
2020

2121
implicit val ec: scala.concurrent.ExecutionContext =
2222
scala.concurrent.ExecutionContext.global
23-
23+
private lazy val validator = {
24+
RunConfig
25+
.Validator()
26+
.supportsInheritIO()
27+
.supportsOnOutputStream()
28+
}
29+
validator.validate(runConfig)
2430
// private[this] implicit val ec =
2531
// ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor())
2632

@@ -30,6 +36,7 @@ class CERun(
3036
val stopSignal: Deferred[IO, Boolean] = Deferred.unsafe[IO, Boolean]
3137
stopSignal.complete(false).unsafeRunSync()
3238
protected val sendQueue = new ConcurrentLinkedQueue[String]
39+
protected def receivedMessage(msg: String): Unit = ()
3340
System.setProperty("playwright.driver.impl", "jsenv.DriverJar")
3441

3542
/** A [[scala.concurrent.Future Future]] that completes if the run completes.
@@ -59,31 +66,35 @@ class CERun(
5966
// If want to close then close driver, streams, materializer
6067
// After future is completed close driver, streams, materializer
6168

62-
def jsRunPrg(isComEnabled:Boolean): Resource[IO, Unit] = for {
63-
_ <- Resource.pure(scribe.info(s"Begin Main JSRun!"))
69+
def jsRunPrg(isComEnabled: Boolean): Resource[IO, Unit] = for {
70+
_ <- Resource.pure(
71+
scribe.info(s"Begin Main JSRun! with isComEnabled $isComEnabled")
72+
)
6473
pageInstance <- createPage(headlessConfig)
6574
_ <- preparePageForJsRun(
6675
pageInstance,
6776
materializer(pwConfig),
6877
input,
6978
isComEnabled
7079
)
71-
initialStatus <- isConnectionUp(pageInstance, intf)
80+
connectionReady <- isConnectionUp(pageInstance, intf)
7281
_ <-
73-
if (!initialStatus) Resource.pure[IO, Unit](IO.sleep(100.milliseconds))
82+
if (!connectionReady) Resource.pure[IO, Unit](IO.sleep(100.milliseconds))
7483
else Resource.pure[IO, Unit](IO.unit)
7584
_ <- isConnectionUp(pageInstance, intf)
76-
jsResponse <- fetchMessages(pageInstance, intf)
77-
_ <- Resource.pure(scribe.info(s"jsResponse is $jsResponse"))
78-
_ <- streamWriter(jsResponse, runConfig)
79-
_ <- Resource.pure(
80-
scribe.info(
81-
s"StopSignal in jsRunPrg is ${stopSignal.get.unsafeRunSync()}"
82-
)
85+
jsResponse <- fetchMessagesResource(pageInstance, intf)
86+
_ <- Resource.pure(scribe.debug(s"jsResponse is $jsResponse"))
87+
out <- ResourcesFactory.outputStream(runConfig)
88+
_ <- Resource.pure[IO, Unit](
89+
streamWriter(jsResponse, out, Some(receivedMessage))
8390
)
84-
_ <- sendAllResource(sendQueue, pageInstance, intf)
85-
_ <- fetchAndProcess(stopSignal, pageInstance, intf, runConfig)
8691

92+
// while (!wantClose) {
93+
// sendAll()
94+
// fetchAndProcess()
95+
// Thread.sleep(100)
96+
// }
97+
_ <- ProcessUntilStop(wantToClose, pageInstance, intf, sendQueue, out, receivedMessage, isComEnabled)
8798
} yield {
8899
handleErrors(jsResponse)
89100
}
@@ -102,7 +113,8 @@ class CERun(
102113
}
103114
}
104115

105-
val future: Future[Unit] = jsRunPrg(enableCom).use(_ => IO.unit).unsafeToFuture()
116+
lazy val future: Future[Unit] =
117+
jsRunPrg(enableCom).use(_ => IO.unit).unsafeToFuture()
106118

107119
/** Stops the run and releases all the resources.
108120
*
@@ -120,7 +132,7 @@ class CERun(
120132

121133
override def close(): Unit = {
122134
wantToClose.set(true)
123-
stopSignal.complete(true).unsafeRunSync()
135+
stopSignal.complete(true)
124136

125137
scribe.info(s"WantToClose in close is ${wantToClose.get()}")
126138
}
@@ -140,7 +152,9 @@ class CEComRun(
140152
scribe.info("Send a message to sendQueue")
141153
sendQueue.offer(msg)
142154
}
143-
override val future: Future[Unit] = jsRunPrg(enableCom).use(_ => IO.unit).unsafeToFuture()
155+
override protected def receivedMessage(msg: String): Unit = onMessage(msg)
156+
// override lazy val future: Future[Unit] =
157+
// jsRunPrg(enableCom).use(_ => IO.unit).unsafeToFuture()
144158
}
145159

146160
private class WindowOnErrorException(errs: List[String])

src/main/scala/jsenv/playwright/PwRun.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,15 @@ private final class PwComRun(
8585
with JSComRun {
8686
private[this] val sendQueue = new ConcurrentLinkedQueue[String]
8787

88-
def send(msg: String): Unit = sendQueue.offer(msg)
88+
def send(msg: String): Unit = {
89+
scribe.info(s"Calling send with message $msg")
90+
sendQueue.offer(msg)}
8991

9092
override protected def receivedMessage(msg: String): Unit = onMessage(msg)
9193

9294
@tailrec
9395
override protected def sendAll(): Unit = {
96+
scribe.info(s"Calling sendAll")
9497
val msg = sendQueue.poll()
9598
if (msg != null) {
9699
scribe.info(s"Sending message $msg")

src/main/scala/jsenv/playwright/ResourcesFactory.scala

Lines changed: 84 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import org.scalajs.jsenv.{Input, RunConfig}
77

88
import java.util
99
import java.util.concurrent.ConcurrentLinkedQueue
10-
import java.util.concurrent.atomic.AtomicInteger
10+
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
1111
import java.util.function.Consumer
1212
import scala.annotation.tailrec
1313
import scala.concurrent.duration.DurationInt
@@ -102,21 +102,26 @@ object ResourcesFactory {
102102
} yield ()
103103

104104
val fetchCounter = new AtomicInteger(0)
105+
105106
def fetchMessages(
106107
pageInstance: Page,
107108
intf: String
109+
): java.util.Map[String, java.util.List[String]] = {
110+
val data =
111+
pageInstance
112+
.evaluate(s"$intf.fetch();")
113+
.asInstanceOf[java.util.Map[String, java.util.List[String]]]
114+
data
115+
}
116+
def fetchMessagesResource(
117+
pageInstance: Page,
118+
intf: String
108119
): Resource[IO, util.Map[String, util.List[String]]] = {
109-
Resource.make {
110-
IO {
111-
scribe.info(
112-
s"Page instance is ${pageInstance.hashCode()} fetchCounter is ${fetchCounter.incrementAndGet()}"
113-
)
114-
val data = pageInstance
115-
.evaluate(s"$intf.fetch();")
116-
.asInstanceOf[java.util.Map[String, java.util.List[String]]]
117-
data
118-
}
119-
}(_ => IO.unit)
120+
Resource.pure[IO, util.Map[String, util.List[String]]] {
121+
scribe.info(
122+
s"Page instance is ${pageInstance.hashCode()} fetchCounter is ${fetchCounter.incrementAndGet()}")
123+
fetchMessages(pageInstance, intf)
124+
}
120125
}
121126

122127
def sendAllResource(
@@ -125,31 +130,60 @@ object ResourcesFactory {
125130
intf: String
126131
): Resource[IO, Unit] = {
127132
Resource.make {
128-
scribe.info(s"Sending all messages sendQueue size is ${sendQueue.size()} ")
133+
scribe.info(
134+
s"Sending all messages sendQueue size is ${sendQueue.size()} "
135+
)
129136
sendAll(sendQueue, pageInstance, intf)
130137
IO.unit
131138
}(_ => IO.unit)
132139
}
133140

141+
def ProcessUntilStop(
142+
stopSignal: AtomicBoolean,
143+
pageInstance: Page,
144+
intf: String,
145+
sendQueue: ConcurrentLinkedQueue[String],
146+
outStream: OutputStreams.Streams,
147+
receivedMessage: String => Unit,
148+
isComEnabled: Boolean
149+
): Resource[IO, Unit] = {
150+
Resource.make {
151+
IO.unit
152+
} { _ =>
153+
while (!stopSignal.get() && isComEnabled) {
154+
scribe.info(
155+
s"Calling repeatSendUntilStopSignal with stopSignal = ${stopSignal.get()}"
156+
)
157+
IO.sleep(100.milliseconds)
158+
sendAll(sendQueue, pageInstance, intf)
159+
val jsResponse = fetchMessages(pageInstance, intf)
160+
streamWriter(jsResponse, outStream, Some(receivedMessage))
161+
}
162+
IO.unit
163+
}
164+
}
165+
166+
134167
def fetchAndProcess(
135168
stopSignal: Deferred[IO, Boolean],
136169
pageInstance: Page,
137170
intf: String,
138-
runConfig: RunConfig
171+
// runConfig: RunConfig,
172+
outStream: OutputStreams.Streams
139173
): Resource[IO, Unit] = {
140174
Resource.make {
141-
stopSignal.get.attempt.flatMap {
142-
case Left(_) =>
175+
stopSignal.get.flatMap {
176+
case true =>
143177
scribe.info("Stopping the program")
144178
IO.unit
145-
case Right(_) => {
179+
case false => {
146180
scribe.info("Program is running")
147181
for {
148182
_ <- Resource.pure(
149183
scribe.info(s"Page instance is ${pageInstance.hashCode()}")
150184
)
151-
jsResponse <- fetchMessages(pageInstance, intf)
152-
_ <- streamWriter(jsResponse, runConfig)
185+
jsResponse <- fetchMessagesResource(pageInstance, intf)
186+
_ <- streamWriterResource(jsResponse, outStream)
153187
} yield ()
154188
IO.sleep(100.milliseconds)
155189
}
@@ -161,14 +195,15 @@ object ResourcesFactory {
161195
pageInstance: Page,
162196
intf: String
163197
): Resource[IO, Boolean] = {
164-
Resource.make {
165-
IO {
166-
scribe.info(s"Page instance is ${pageInstance.hashCode()}")
167-
pageInstance.evaluate(s"!!$intf;").asInstanceOf[Boolean]
168-
}
169-
}(_ => IO.unit)
198+
Resource.pure[IO, Boolean] {
199+
scribe.info(s"Page instance is ${pageInstance.hashCode()}")
200+
pageInstance.evaluate(s"!!$intf;").asInstanceOf[Boolean]
201+
}
202+
170203
}
171204

205+
206+
172207
def materializer(pwConfig: Config): Resource[IO, FileMaterializer] =
173208
Resource.make {
174209
IO.blocking(FileMaterializer(pwConfig.materialization)) // build
@@ -185,7 +220,7 @@ object ResourcesFactory {
185220
/*
186221
* Creates resource for outputStream
187222
*/
188-
private def outputStream(
223+
def outputStream(
189224
runConfig: RunConfig
190225
): Resource[IO, OutputStreams.Streams] =
191226
Resource.make {
@@ -202,24 +237,37 @@ object ResourcesFactory {
202237

203238
def streamWriter(
204239
jsResponse: util.Map[String, util.List[String]],
205-
runConfig: RunConfig,
240+
outStream: OutputStreams.Streams,
206241
onMessage: Option[String => Unit] = None
207-
): Resource[IO, Unit] = {
242+
): Unit = {
208243
val data = jsResponse.get("consoleLog")
209244
val consoleError = jsResponse.get("consoleError")
210245
val error = jsResponse.get("errors")
211246
onMessage match {
212247
case Some(f) =>
213248
val msgs = jsResponse.get("msgs")
249+
// data.get("msgs").forEach(consumer(receivedMessage _))
250+
// msgs.forEach(receivedMessage(f))
214251
msgs.forEach(consumer(f))
215-
case None => ()
252+
case None => scribe.info("No onMessage function")
216253
}
217-
for {
218-
out <- ResourcesFactory.outputStream(runConfig)
219-
_ <- Resource.pure(data.forEach(out.out.println _))
220-
_ <- Resource.pure(error.forEach(out.out.println _))
221-
_ <- Resource.pure(consoleError.forEach(out.out.println _))
222-
} yield ()
254+
data.forEach(outStream.out.println _)
255+
error.forEach(outStream.out.println _)
256+
consoleError.forEach(outStream.out.println _)
257+
// val errs = data.get("errors")
258+
if (!error.isEmpty) {
259+
// Convoluted way of writing errs.toList without JavaConverters.
260+
val errList = error.toArray(Array[String]()).toList
261+
throw new WindowOnErrorException(errList)
262+
}
263+
}
264+
def streamWriterResource(
265+
jsResponse: util.Map[String, util.List[String]],
266+
outStream: OutputStreams.Streams,
267+
onMessage: Option[String => Unit] = None
268+
): Resource[IO, Unit] = {
269+
streamWriter(jsResponse, outStream, onMessage)
270+
Resource.pure[IO, Unit](IO.unit)
223271
}
224272

225273
@tailrec
@@ -229,8 +277,8 @@ object ResourcesFactory {
229277
intf: String
230278
): Unit = {
231279
val msg = sendQueue.poll()
232-
scribe.info(s"Sending message $msg")
233280
if (msg != null) {
281+
scribe.debug(s"Sending message $msg")
234282
val script = s"$intf.send(arguments[0]);"
235283
val wrapper = s"function(arg) { $script }"
236284
pageInstance.evaluate(s"$wrapper", msg)

0 commit comments

Comments
 (0)