Skip to content

Commit 9a411cf

Browse files
authored
[WX-1410] Sanitize 4 byte UTF-8 characters before inserting into METADATA_ENTRY (#7414)
1 parent f9372f9 commit 9a411cf

File tree

9 files changed

+172
-47
lines changed

9 files changed

+172
-47
lines changed

cromwell.example.backends/cromwell.examples.conf

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,11 @@ services {
498498
# # count against this limit.
499499
# metadata-read-row-number-safety-threshold = 1000000
500500
#
501+
# # Remove any UTF-8 mb4 (4 byte) characters from metadata keys in the list.
502+
# # These characters (namely emojis) will cause metadata writing to fail in database collations
503+
# # that do not support 4 byte UTF-8 characters.
504+
# metadata-keys-to-sanitize-utf8mb4 = ["submittedFiles:workflow", "commandLine"]
505+
#
501506
# metadata-write-statistics {
502507
# # Not strictly necessary since the 'metadata-write-statistics' section itself is enough for statistics to be recorded.
503508
# # However, this can be set to 'false' to disable statistics collection without deleting the section.

services/src/main/scala/cromwell/services/metadata/impl/MetadataServiceActor.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,12 +110,14 @@ case class MetadataServiceActor(serviceConfig: Config, globalConfig: Config, ser
110110
val metadataWriteStatisticsConfig = MetadataStatisticsRecorderSettings(
111111
serviceConfig.as[Option[Config]]("metadata-write-statistics")
112112
)
113+
val metadataKeysToClean = serviceConfig.getOrElse[List[String]]("metadata-keys-to-sanitize-utf8mb4", List())
113114
val writeActor = context.actorOf(
114115
WriteMetadataActor.props(dbBatchSize,
115116
dbFlushRate,
116117
serviceRegistryActor,
117118
LoadConfig.MetadataWriteThreshold,
118-
metadataWriteStatisticsConfig
119+
metadataWriteStatisticsConfig,
120+
metadataKeysToClean
119121
),
120122
"WriteMetadataActor"
121123
)

services/src/main/scala/cromwell/services/metadata/impl/WriteMetadataActor.scala

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@ import cromwell.core.Dispatcher.ServiceDispatcher
66
import cromwell.core.Mailbox.PriorityMailbox
77
import cromwell.core.WorkflowId
88
import cromwell.core.instrumentation.InstrumentationPrefixes
9-
import cromwell.services.metadata.MetadataEvent
9+
import cromwell.services.metadata.{MetadataEvent, MetadataValue}
1010
import cromwell.services.metadata.MetadataService._
1111
import cromwell.services.metadata.impl.MetadataStatisticsRecorder.MetadataStatisticsRecorderSettings
1212
import cromwell.services.{EnhancedBatchActor, MetadataServicesStore}
13+
import wdl.util.StringUtil
1314

1415
import scala.concurrent.duration._
1516
import scala.util.{Failure, Success}
@@ -18,7 +19,8 @@ class WriteMetadataActor(override val batchSize: Int,
1819
override val flushRate: FiniteDuration,
1920
override val serviceRegistryActor: ActorRef,
2021
override val threshold: Int,
21-
metadataStatisticsRecorderSettings: MetadataStatisticsRecorderSettings
22+
metadataStatisticsRecorderSettings: MetadataStatisticsRecorderSettings,
23+
metadataKeysToClean: List[String]
2224
) extends EnhancedBatchActor[MetadataWriteAction](flushRate, batchSize)
2325
with ActorLogging
2426
with MetadataDatabaseAccess
@@ -27,9 +29,10 @@ class WriteMetadataActor(override val batchSize: Int,
2729
private val statsRecorder = MetadataStatisticsRecorder(metadataStatisticsRecorderSettings)
2830

2931
override def process(e: NonEmptyVector[MetadataWriteAction]) = instrumentedProcess {
32+
val cleanedMetadataWriteActions = if (metadataKeysToClean.isEmpty) e else sanitizeInputs(e)
3033
val empty = (Vector.empty[MetadataEvent], List.empty[(Iterable[MetadataEvent], ActorRef)])
3134

32-
val (putWithoutResponse, putWithResponse) = e.foldLeft(empty) {
35+
val (putWithoutResponse, putWithResponse) = cleanedMetadataWriteActions.foldLeft(empty) {
3336
case ((putEvents, putAndRespondEvents), action: PutMetadataAction) =>
3437
(putEvents ++ action.events, putAndRespondEvents)
3538
case ((putEvents, putAndRespondEvents), action: PutMetadataActionAndRespond) =>
@@ -46,7 +49,7 @@ class WriteMetadataActor(override val batchSize: Int,
4649
case Success(_) =>
4750
putWithResponse foreach { case (ev, replyTo) => replyTo ! MetadataWriteSuccess(ev) }
4851
case Failure(cause) =>
49-
val (outOfTries, stillGood) = e.toVector.partition(_.maxAttempts <= 1)
52+
val (outOfTries, stillGood) = cleanedMetadataWriteActions.toVector.partition(_.maxAttempts <= 1)
5053

5154
handleOutOfTries(outOfTries, cause)
5255
handleEventsToReconsider(stillGood)
@@ -55,6 +58,23 @@ class WriteMetadataActor(override val batchSize: Int,
5558
dbAction.map(_ => allPutEvents.size)
5659
}
5760

61+
def sanitizeInputs(
62+
metadataWriteActions: NonEmptyVector[MetadataWriteAction]
63+
): NonEmptyVector[MetadataWriteAction] =
64+
metadataWriteActions.map { metadataWriteAction =>
65+
val metadataEvents =
66+
metadataWriteAction.events.map { event =>
67+
event.value match {
68+
case Some(eventVal) => event.copy(value = Option(MetadataValue(StringUtil.cleanUtf8mb4(eventVal.value))))
69+
case None => event
70+
}
71+
}
72+
metadataWriteAction match {
73+
case action: PutMetadataAction => action.copy(events = metadataEvents)
74+
case actionAndResp: PutMetadataActionAndRespond => actionAndResp.copy(events = metadataEvents)
75+
}
76+
}
77+
5878
private def countActionsByWorkflow(writeActions: Vector[MetadataWriteAction]): Map[WorkflowId, Int] =
5979
writeActions.flatMap(_.events).groupBy(_.key.workflowId).map { case (k, v) => k -> v.size }
6080

@@ -106,9 +126,18 @@ object WriteMetadataActor {
106126
flushRate: FiniteDuration,
107127
serviceRegistryActor: ActorRef,
108128
threshold: Int,
109-
statisticsRecorderSettings: MetadataStatisticsRecorderSettings
129+
statisticsRecorderSettings: MetadataStatisticsRecorderSettings,
130+
metadataKeysToClean: List[String]
110131
): Props =
111-
Props(new WriteMetadataActor(dbBatchSize, flushRate, serviceRegistryActor, threshold, statisticsRecorderSettings))
132+
Props(
133+
new WriteMetadataActor(dbBatchSize,
134+
flushRate,
135+
serviceRegistryActor,
136+
threshold,
137+
statisticsRecorderSettings,
138+
metadataKeysToClean
139+
)
140+
)
112141
.withDispatcher(ServiceDispatcher)
113142
.withMailbox(PriorityMailbox)
114143
}

services/src/test/scala/cromwell/services/metadata/impl/WriteMetadataActorBenchmark.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class WriteMetadataActorBenchmark extends TestKitSuite with AnyFlatSpecLike with
4646

4747
it should "provide good throughput" taggedAs IntegrationTest in {
4848
val writeActor =
49-
TestFSMRef(new WriteMetadataActor(1000, 5.seconds, registry, Int.MaxValue, MetadataStatisticsDisabled) {
49+
TestFSMRef(new WriteMetadataActor(1000, 5.seconds, registry, Int.MaxValue, MetadataStatisticsDisabled, List()) {
5050
override val metadataDatabaseInterface: MetadataSlickDatabase = dataAccess
5151
})
5252

services/src/test/scala/cromwell/services/metadata/impl/WriteMetadataActorSpec.scala

Lines changed: 99 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class WriteMetadataActorSpec extends TestKitSuite with AnyFlatSpecLike with Matc
3838

3939
it should "process jobs in the correct batch sizes" in {
4040
val registry = TestProbe().ref
41-
val writeActor = TestFSMRef(new BatchSizeCountingWriteMetadataActor(10, 10.millis, registry, Int.MaxValue) {
41+
val writeActor = TestFSMRef(new BatchSizeCountingWriteMetadataActor(10, 10.millis, registry, Int.MaxValue, List()) {
4242
override val metadataDatabaseInterface = mockDatabaseInterface(0)
4343
})
4444

@@ -71,9 +71,10 @@ class WriteMetadataActorSpec extends TestKitSuite with AnyFlatSpecLike with Matc
7171
failuresBetweenSuccessValues foreach { failureRate =>
7272
it should s"succeed metadata writes and respond to all senders even with $failureRate failures between each success" in {
7373
val registry = TestProbe().ref
74-
val writeActor = TestFSMRef(new BatchSizeCountingWriteMetadataActor(10, 10.millis, registry, Int.MaxValue) {
75-
override val metadataDatabaseInterface = mockDatabaseInterface(failureRate)
76-
})
74+
val writeActor =
75+
TestFSMRef(new BatchSizeCountingWriteMetadataActor(10, 10.millis, registry, Int.MaxValue, List()) {
76+
override val metadataDatabaseInterface = mockDatabaseInterface(failureRate)
77+
})
7778

7879
def metadataEvent(index: Int, probe: ActorRef) =
7980
PutMetadataActionAndRespond(List(
@@ -111,7 +112,7 @@ class WriteMetadataActorSpec extends TestKitSuite with AnyFlatSpecLike with Matc
111112

112113
it should s"fail metadata writes and respond to all senders with failures" in {
113114
val registry = TestProbe().ref
114-
val writeActor = TestFSMRef(new BatchSizeCountingWriteMetadataActor(10, 10.millis, registry, Int.MaxValue) {
115+
val writeActor = TestFSMRef(new BatchSizeCountingWriteMetadataActor(10, 10.millis, registry, Int.MaxValue, List()) {
115116
override val metadataDatabaseInterface = mockDatabaseInterface(100)
116117
})
117118

@@ -146,6 +147,90 @@ class WriteMetadataActorSpec extends TestKitSuite with AnyFlatSpecLike with Matc
146147
writeActor.stop()
147148
}
148149

150+
it should s"test removing emojis from metadata works as expected" in {
151+
val registry = TestProbe().ref
152+
val writeActor =
153+
TestFSMRef(new BatchSizeCountingWriteMetadataActor(10, 10.millis, registry, Int.MaxValue, List("metadata_key")) {
154+
override val metadataDatabaseInterface = mockDatabaseInterface(100)
155+
})
156+
157+
def metadataEvent(index: Int, probe: ActorRef) = PutMetadataActionAndRespond(
158+
List(
159+
MetadataEvent(MetadataKey(WorkflowId.randomId(), None, "metadata_key"), MetadataValue(s"🎉_$index"))
160+
),
161+
probe
162+
)
163+
164+
val probes = (0 until 43)
165+
.map { _ =>
166+
val probe = TestProbe()
167+
probe
168+
}
169+
.zipWithIndex
170+
.map { case (probe, index) =>
171+
probe -> metadataEvent(index, probe.ref)
172+
}
173+
174+
val metadataWriteActions = probes.map(probe => probe._2).toVector
175+
val metadataWriteActionNE = NonEmptyVector(metadataWriteActions.head, metadataWriteActions.tail)
176+
177+
val sanitizedWriteActions = writeActor.underlyingActor.sanitizeInputs(metadataWriteActionNE)
178+
179+
sanitizedWriteActions.map { writeAction =>
180+
writeAction.events.map { event =>
181+
if (event.value.getOrElse(fail("Removed value from metadata event")).value.matches("[\\x{10000}-\\x{FFFFF}]")) {
182+
fail("Metadata event contains emoji")
183+
}
184+
185+
if (!event.value.getOrElse(fail("Removed value from metadata event")).value.contains("\uFFFD")) {
186+
fail("Incorrect character used to replace emoji")
187+
}
188+
}
189+
}
190+
}
191+
192+
it should s"test removing emojis from metadata which doesn't contain emojis returns the string" in {
193+
val registry = TestProbe().ref
194+
val writeActor =
195+
TestFSMRef(new BatchSizeCountingWriteMetadataActor(10, 10.millis, registry, Int.MaxValue, List("metadata_key")) {
196+
override val metadataDatabaseInterface = mockDatabaseInterface(100)
197+
})
198+
199+
def metadataEvent(index: Int, probe: ActorRef) = PutMetadataActionAndRespond(
200+
List(
201+
MetadataEvent(MetadataKey(WorkflowId.randomId(), None, "metadata_key"), MetadataValue(s"hello_$index"))
202+
),
203+
probe
204+
)
205+
206+
val probes = (0 until 43)
207+
.map { _ =>
208+
val probe = TestProbe()
209+
probe
210+
}
211+
.zipWithIndex
212+
.map { case (probe, index) =>
213+
probe -> metadataEvent(index, probe.ref)
214+
}
215+
216+
val metadataWriteActions = probes.map(probe => probe._2).toVector
217+
val metadataWriteActionNE = NonEmptyVector(metadataWriteActions.head, metadataWriteActions.tail)
218+
219+
val sanitizedWriteActions = writeActor.underlyingActor.sanitizeInputs(metadataWriteActionNE)
220+
221+
sanitizedWriteActions.map { writeAction =>
222+
writeAction.events.map { event =>
223+
if (event.value.getOrElse(fail("Removed value from metadata event")).value.matches("[\\x{10000}-\\x{FFFFF}]")) {
224+
fail("Metadata event contains emoji")
225+
}
226+
227+
if (event.value.getOrElse(fail("Removed value from metadata event")).value.contains("\uFFFD")) {
228+
fail("Incorrectly replaced character in metadata event")
229+
}
230+
}
231+
}
232+
}
233+
149234
// Mock database interface.
150235
// A customizable number of failures occur between each success
151236
def mockDatabaseInterface(failuresBetweenEachSuccess: Int) = new MetadataSqlDatabase with SqlDatabase {
@@ -382,8 +467,15 @@ object WriteMetadataActorSpec {
382467
class BatchSizeCountingWriteMetadataActor(override val batchSize: Int,
383468
override val flushRate: FiniteDuration,
384469
override val serviceRegistryActor: ActorRef,
385-
override val threshold: Int
386-
) extends WriteMetadataActor(batchSize, flushRate, serviceRegistryActor, threshold, MetadataStatisticsDisabled) {
470+
override val threshold: Int,
471+
val metadataKeysToClean: List[String]
472+
) extends WriteMetadataActor(batchSize,
473+
flushRate,
474+
serviceRegistryActor,
475+
threshold,
476+
MetadataStatisticsDisabled,
477+
metadataKeysToClean
478+
) {
387479

388480
var batchSizes: Vector[Int] = Vector.empty
389481
var failureCount: Int = 0

supportedBackends/google/pipelines/v2beta/src/main/scala/cromwell/backend/google/pipelines/v2beta/PipelinesUtilityConversions.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import cromwell.core.ExecutionEvent
88
import cromwell.core.logging.JobLogger
99
import mouse.all._
1010
import PipelinesUtilityConversions._
11+
import wdl.util.StringUtil
1112

1213
import scala.language.postfixOps
1314

@@ -67,7 +68,7 @@ trait PipelinesUtilityConversions {
6768
// characters (typically emoji). Some databases have trouble storing these; replace them with the standard
6869
// "unknown character" unicode symbol.
6970
val name = Option(event.getContainerStopped) match {
70-
case Some(_) => cleanUtf8mb4(event.getDescription)
71+
case Some(_) => StringUtil.cleanUtf8mb4(event.getDescription)
7172
case _ => event.getDescription
7273
}
7374

@@ -101,9 +102,4 @@ object PipelinesUtilityConversions {
101102
None
102103
}
103104
}
104-
105-
lazy val utf8mb4Regex = "[\\x{10000}-\\x{FFFFF}]"
106-
lazy val utf8mb3Replacement = "\uFFFD" // This is the standard char for replacing invalid/unknown unicode chars
107-
def cleanUtf8mb4(in: String): String =
108-
in.replaceAll(utf8mb4Regex, utf8mb3Replacement)
109105
}

supportedBackends/google/pipelines/v2beta/src/test/scala/cromwell/backend/google/pipelines/v2beta/PipelinesUtilityConversionsSpec.scala

Lines changed: 0 additions & 26 deletions
This file was deleted.

wdl/model/draft2/src/test/scala/wdl/util/StringUtilSpec.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,4 +70,21 @@ class StringUtilSpec extends AnyFlatSpec with CromwellTimeoutSpec with Matchers
7070
}
7171
}
7272

73+
it should "not modify strings that contain only ascii characters" in {
74+
val input = "hi there!?"
75+
StringUtil.cleanUtf8mb4(input) shouldBe input
76+
}
77+
78+
it should "not modify strings with 3-byte unicode characters" in {
79+
val input = "Here is my non-ascii character: \u1234 Do you like it?"
80+
StringUtil.cleanUtf8mb4(input) shouldBe input
81+
}
82+
83+
it should "replace 4-byte unicode characters" in {
84+
val cry = new String(Character.toChars(Integer.parseInt("1F62D", 16)))
85+
val barf = new String(Character.toChars(Integer.parseInt("1F92E", 16)))
86+
val input = s"When I try to put an emoji in the database it $barf and then I $cry"
87+
val cleaned = "When I try to put an emoji in the database it \uFFFD and then I \uFFFD"
88+
StringUtil.cleanUtf8mb4(input) shouldBe cleaned
89+
}
7390
}

wom/src/main/scala/wdl/util/StringUtil.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import scala.annotation.tailrec
88
* WOMmy TaskDefinition. That should get straightened out. */
99
object StringUtil {
1010
val Ws = Pattern.compile("[\\ \\t]+")
11+
val utf8mb4Regex = "[\\x{10000}-\\x{FFFFF}]"
12+
val utf8mb3Replacement = "\uFFFD" // This is the standard char for replacing
1113

1214
/**
1315
* 1) Remove all leading newline chars
@@ -63,4 +65,12 @@ object StringUtil {
6365

6466
start(0)
6567
}
68+
69+
/**
70+
* Remove all utf8mb4 exclusive characters (emoji) from the given string.
71+
* @param in String to clean
72+
* @return String with all utf8mb4 exclusive characters removed
73+
*/
74+
def cleanUtf8mb4(in: String): String =
75+
in.replaceAll(utf8mb4Regex, utf8mb3Replacement)
6676
}

0 commit comments

Comments
 (0)