-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathMain.scala
393 lines (360 loc) · 17 KB
/
Main.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
package docxAnonymizer
import Control.using
import java.io.File
import java.nio.file.{Files, Paths}
import java.util
import java.util.regex.Pattern
import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicAWSCredentials}
import com.amazonaws.regions.Regions
import com.amazonaws.services.s3.model.{AmazonS3Exception, S3Object, S3ObjectInputStream}
import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.commons.cli.{BasicParser, CommandLine, MissingArgumentException, Options}
import org.apache.commons.io.FileUtils
import org.apache.log4j.{BasicConfigurator, Level, Logger}
import org.docx4j.openpackaging.exceptions.Docx4JException
import org.docx4j.openpackaging.packages.WordprocessingMLPackage
import org.docx4j.openpackaging.parts.WordprocessingML.{CommentsPart, EndnotesPart, FooterPart, FootnotesPart, HeaderPart}
import org.docx4j.openpackaging.parts.relationships.Namespaces
import scala.io.Source
import scala.util.{Failure, Success, Try}
import scala.collection.JavaConverters._
import Function.tupled
object Main {
// Flags
var debug: Boolean = _
var parallel: Boolean = _
val log4jPrints: Boolean = false
// CLI options
private var inputFilePath: String = _
private var outputFilePath: String = _
private var s3Bucket: Option[String] = _
private var minimizeFilePath: Option[String] = _
private var keepUnchangedNamesFilePath: Option[String] = _
private var keepUnchangedExprFilePath: Option[String] = _
private var cmdArgsNames: Array[String] = _
// Lists of people to minimize and not to minimize, respectively
private var peopleToMinimize: List[Persona] = _
private var peopleToKeepUnchanged: List[Persona] = _
// AWS S3 credentials and client object
private var credentials: BasicAWSCredentials = _
private var s3Client: Option[AmazonS3] = _
// Spark context
var sparkContext: Option[SparkContext] = _
// Config file
val IAM_PATH = "config/iam_credentials.txt"
private def disableDocxWarning(): Unit = {
System.err.close()
System.setErr(System.out)
}
private def cmdArgsRetriever(args: Array[String]): Either[String, Unit] = {
val cli_options: Options = new Options()
cli_options.addOption("i", "input-file", true,
"The docx input file to anonymize")
cli_options.addOption("o", "output-file", true,
"The docx output file generated")
cli_options.addOption("s3", "s3bucket", true,
"The S3 bucket in which files are stored")
cli_options.addOption("m", "minimize", true,
"The file with names and surnames to minimize. It must contain one expression per line. Names are separated by ':' between them and by ';' from the surname")
cli_options.addOption("kn", "keep-names", true,
"The file with names and surnames to keep unchanged (no minimization). It must contain one expression per line. Names are separated by ':' between them and by ';' from the surname")
cli_options.addOption("ke", "keep-expressions", true,
"The file with those expressions to be kept unchanged")
cli_options.addOption("p", "parallel", false,
"set parallel execution mode")
cli_options.addOption("d", "debug", false,
"set debug mode")
val parser = new BasicParser
// Flags
Try(parser.parse(cli_options, args)) match {
case Failure(ex: MissingArgumentException) => Left(ex.getMessage)
case Success(commandLine: CommandLine) =>
// Debug flag
debug = commandLine.hasOption('d')
if (debug) println("Debug mode: ON")
// Parallel flag
parallel = commandLine.hasOption('p')
// Input file
Option(commandLine.getOptionValue('i')) match {
case None => Left("Path to input file is mandatory.")
case Some(filePath: String) =>
if (debug) {
print("Input file: ")
println(filePath)
}
// Check if file has the correct extension
if (!filePath.endsWith(".docx")) Left("Input file must have '.docx' extension.")
else {
inputFilePath = filePath
// Other optional files
minimizeFilePath = Option(commandLine.getOptionValue("m"))
keepUnchangedExprFilePath = Option(commandLine.getOptionValue("ke"))
keepUnchangedNamesFilePath = Option(commandLine.getOptionValue("kn"))
// AWS integration
s3Bucket = Option(commandLine.getOptionValue("s3"))
s3Bucket match {
case None =>
s3Client = None
// Check local files' existence
if (!Files.exists(Paths.get(inputFilePath)))
return Left(s"Input file '$inputFilePath' not found")
if (minimizeFilePath.isDefined && !Files.exists(Paths.get(minimizeFilePath.orNull)))
return Left(s"Minimize file '${minimizeFilePath.orNull}' not found")
if (keepUnchangedNamesFilePath.isDefined && !Files.exists(Paths.get(keepUnchangedNamesFilePath.orNull)))
return Left(s"keepUnchangedNames file '${keepUnchangedNamesFilePath.orNull}' not found")
if (keepUnchangedExprFilePath.isDefined && !Files.exists(Paths.get(keepUnchangedExprFilePath.orNull)))
return Left(s"keepUnchangedExpr file '${keepUnchangedExprFilePath.orNull}' not found")
case Some(bucketName: String) =>
// Read credentials from config
val (awsAccessKey, awsSecretKey): (String, String) =
using(Source.fromInputStream(getClass.getClassLoader.getResourceAsStream(IAM_PATH))) {
source => source.getLines().toList.take(2)
} match {
case List(a: String, b: String) => (a, b)
}
credentials = new BasicAWSCredentials(
awsAccessKey,
awsSecretKey
)
// Connect to AWS
s3Client = Option(AmazonS3ClientBuilder
.standard()
.withCredentials(new AWSStaticCredentialsProvider(credentials))
.withRegion(Regions.US_EAST_2)
.build())
if (debug) println("Connected to AWS.")
// Input file
downloadFromS3(bucketName, inputFilePath) match {
case Left(msg: String) => return Left(msg)
case Right(_: Unit) =>
// Minimize file
minimizeFilePath match {
case None =>
case Some(filePath: String) =>
downloadFromS3(bucketName, filePath) match {
case Left(msg: String) => return Left(msg)
case Right(_: Unit) =>
}
}
// keepUnchangedNames file
keepUnchangedNamesFilePath match {
case None =>
case Some(filePath: String) =>
downloadFromS3(bucketName, filePath) match {
case Left(msg: String) => return Left(msg)
case Right(_: Unit) =>
}
}
// keepUnchangedExpr file
keepUnchangedExprFilePath match {
case None =>
case Some(filePath: String) =>
downloadFromS3(bucketName, filePath) match {
case Left(msg: String) => return Left(msg)
case Right(_: Unit) =>
}
}
}
}
// Output file
val defaultOutputFilePath: String = inputFilePath.replaceAll("\\.docx$", "-result\\.docx")
outputFilePath = commandLine.getOptionValue('o', defaultOutputFilePath)
if (debug) print(s"Output file: $outputFilePath")
// Names passed as command line arguments
cmdArgsNames = commandLine.getArgs
// Check consistency of sources
if (cmdArgsNames.length > 0 && (minimizeFilePath.isDefined || keepUnchangedNamesFilePath.isDefined))
Left("Names must be specified via file or via args, not both")
else Right()
}
}
}
}
private def downloadFromS3(bucketName: String, filePath: String): Either[String, Unit] = {
Try(s3Client.orNull.getObject(bucketName, filePath)) match {
case Failure(ex: AmazonS3Exception) => Left(ex.getMessage)
case Failure(ex: IllegalArgumentException) => Left("file not found on S3 bucket")
case Success(s3Object: S3Object) =>
val inputStream: S3ObjectInputStream = s3Object.getObjectContent
val file = new File(filePath)
Option(file.getParent) match {
case None =>
case Some(parentDirPath: String) =>
new File(parentDirPath).mkdirs()
}
FileUtils.copyInputStreamToFile(inputStream, file)
Right()
}
}
private def preprocessing(): Either[String, Unit] = {
// Read names to minimize from cmd or from minimizeFile
readNamesFromFile(minimizeFilePath, (name: String) => name.charAt(0) == '!',
"Number of people whose data must be minimized",
"Automatic recognition of names", incrementalId = true) match {
case Left(msg: String) => Left(msg)
case Right(people: List[Persona]) =>
peopleToMinimize = people
// Read names to keep unchanged from cmd or from keepUnchangedNamesFile
readNamesFromFile(keepUnchangedNamesFilePath, (name: String) => name.charAt(0) == '!',
"Number of people whose data must not be minimized",
"All data detected will be minimized", incrementalId = false) match {
case Left(msg: String) => Left(msg)
case Right(people: List[Persona]) =>
peopleToKeepUnchanged = people
Right()
}
}
}
private def readNamesFromFile(filePath: Option[String], filterFn: String => Boolean,
thenMsg: String, elseMsg: String,
incrementalId: Boolean): Either[String, List[Persona]] = {
val toProcess: List[String] =
if (cmdArgsNames.length > 0)
cmdArgsNames.toList.filter(filterFn)
else {
filePath match {
case None => List.empty[String]
case Some(filePath: String) =>
using(Source.fromFile(filePath)) {
source => source.getLines().toList
}
}
}
if (debug) {
if (toProcess.nonEmpty) println(s"$thenMsg: ${toProcess.length}")
else println(elseMsg)
}
// Produce list of 'Persona' objects for the identities to minimize or to keep unchanged
val (wellFormedNames, notWellFormedNames) =
toProcess.partition(arg => arg.matches(Persona.NOMINATIVO_USER))
if (notWellFormedNames.nonEmpty)
Left(s"input arguments {${notWellFormedNames.mkString(", ")}} are not well-formed.")
else {
val (compatibleNames, tooLongNames): (List[(Array[String], String)], List[(Array[String], String)]) =
wellFormedNames.map(arg => (
arg.split(Pattern.quote(";"))(0).split(Pattern.quote(":")),
arg.split(Pattern.quote(";"))(1))
).partition(tupled({
(names, _) => names.length <= 10
}))
if (tooLongNames.nonEmpty)
println(s"WARNING: for each person, only 10 names can be saved: data of the person with surnames {${tooLongNames.mkString(", ")}} will be ignored.")
if (incrementalId) Right(
for {
((names: Array[String], surname: String), id: Int) <- compatibleNames zip (1 to compatibleNames.length)
} yield new Persona(surname, names.toList.asJava, id)
)
else Right(
compatibleNames.map(tupled({
(names, surname) => new Persona(surname, names.toList.asJava, -1)
}))
)
}
}
private def docxProcessing(): Either[String, Unit] = {
Try(WordprocessingMLPackage.load(new File(inputFilePath))) match {
case Failure(_: Docx4JException) => Left(s"could not load $inputFilePath")
case Success(wordMLPackage: WordprocessingMLPackage) =>
val mainDocumentPart = wordMLPackage.getMainDocumentPart
val rp = wordMLPackage.getMainDocumentPart.getRelationshipsPart
val runNodesXPath = "//w:r"
val runNodes: util.List[AnyRef] = mainDocumentPart.getJAXBNodesViaXPath(runNodesXPath, true)
var tmp_runs: Option[util.List[AnyRef]] = None
// Read nodes from document
for (r <- rp.getRelationships.getRelationship.asScala) {
r.getType match {
case Namespaces.HEADER =>
tmp_runs = Option(rp.getPart(r).asInstanceOf[HeaderPart].getJAXBNodesViaXPath(runNodesXPath, true))
case Namespaces.FOOTER =>
tmp_runs = Option(rp.getPart(r).asInstanceOf[FooterPart].getJAXBNodesViaXPath(runNodesXPath, true))
case Namespaces.ENDNOTES =>
tmp_runs = Option(rp.getPart(r).asInstanceOf[EndnotesPart].getJAXBNodesViaXPath(runNodesXPath, true))
case Namespaces.FOOTNOTES =>
tmp_runs = Option(rp.getPart(r).asInstanceOf[FootnotesPart].getJAXBNodesViaXPath(runNodesXPath, true))
case Namespaces.COMMENTS =>
tmp_runs = Option(rp.getPart(r).asInstanceOf[CommentsPart].getJAXBNodesViaXPath(runNodesXPath, true))
case _ =>
tmp_runs = None
}
// Unify data structures, distinct semantic blocks are then separated with a bottom-up approach
if (tmp_runs.isDefined) {
for (t <- tmp_runs.orNull.asScala) {
runNodes.add(t)
}
}
}
// sparkContext.isDefined &&
val worker: Worker = if (parallel) {
new ParallelWorker(runNodes, peopleToMinimize, peopleToKeepUnchanged,
keepUnchangedExprFilePath, debug, sparkContext.orNull)
} else {
new SequentialWorker(runNodes, peopleToMinimize.asJava, peopleToKeepUnchanged.asJava,
keepUnchangedExprFilePath.orNull, debug)
}
// Anonymization
worker.work()
// Save output file
val exportFile = new File(outputFilePath)
Try(wordMLPackage.save(exportFile)) match {
case Failure(_: Docx4JException) => Left(s"could not save $outputFilePath")
case Success(_) =>
s3Bucket match {
case None => if (debug) println(s"Success! Output file: $outputFilePath")
case Some(bucketName: String) =>
Try(s3Client.orNull.putObject(bucketName, outputFilePath, exportFile)) match {
case Failure(_: AmazonS3Exception) => return Left("could not upload file to S3 bucket")
case Success(_) =>
if (debug) println(s"Success! Output file: $outputFilePath uploaded to S3 bucket")
}
val freqFile = new File("frequencies.txt")
Try(s3Client.orNull.putObject(bucketName, Paths.get(exportFile.getParent, "frequencies.txt").toString, freqFile)) match {
case Failure(_: AmazonS3Exception) => return Left("could not upload file to S3 bucket")
case Success(_) =>
if (debug) println("Success! Output file: 'frequencies.txt' uploaded to S3 bucket")
}
val assocFile = new File("associations.txt")
Try(s3Client.orNull.putObject(bucketName, Paths.get(exportFile.getParent, "associations.txt").toString, assocFile)) match {
case Failure(_: AmazonS3Exception) => return Left("could not upload file to S3 bucket")
case Success(_) =>
if (debug) println("Success! Output file: 'associations.txt' uploaded to S3 bucket")
}
}
Right()
}
}
}
def main(args: Array[String]): Unit = {
sparkContext = try {
Option(new SparkContext(new SparkConf().setAppName("DocxAnon")))
} catch {
case _: NoClassDefFoundError =>
println("Spark platform not found, running locally")
None
}
// Logger configuration
BasicConfigurator.configure()
if (log4jPrints) {
Logger.getRootLogger.setLevel(Level.OFF)
disableDocxWarning()
}
// Retrieve command line arguments
cmdArgsRetriever(args) match {
case Left(message: String) => println(s"ERROR: $message")
case Right(_: Unit) =>
// Preprocess document
preprocessing() match {
case Left(message: String) => println(s"ERROR: $message")
case Right(_: Unit) =>
// Docx elaboration
docxProcessing()
}
}
sparkContext match {
case Some(sparkContext: SparkContext) =>
sparkContext.stop()
case None =>
}
println("Terminating...")
}
}