Skip to content
This repository was archived by the owner on Sep 29, 2025. It is now read-only.

Flink DataSource DataSink

Jörn Franke edited this page Jul 11, 2018 · 2 revisions

The Flink DataSource/DataSink uses the DataSet API of Apache Flink. to read/write Office documents, such as MS Excel, using the DataSource/DataSink API (note: as an alternative you can use the Flink Table API). It supports all features of the HadoopOffice library, such as encryption, signing, linked workbooks, templates or low footprint mode.

The following DataSources/DataSinks are provided:

  • ExcelFlinkFileInputFormat/ExcelFlinkFileOutputFormat: reads/write arrays of objects of type SpreadSheetCellDAO
  • SimpleExcelFlinkFileInputFormat/SimpleExcelFlinkFileOutputFormat: reads/write array of objects of simple type.
    • Notes InputFormat: you need to specify HadoopOfficeReadConfiguration, the dateLocale (usually it is Locale.US because most Excel versions store dates in US format, using SimpleDateFormat you can use any pattern to specify the date format) and decimaleFormat (to use the right decimal "point"), the dateTimeLocale (as of 1.2.0) to read timestamps (using using SimpleDateFormat you can use any pattern to specify the date format, Defaults to java.sql.Timestamp pattern. Additionally you can specify the number of rows to infer the schema of the underlying Excel file or you can alternatively specify your own schema (via setSchema on the fileformat).
    • Notes OutputFormat: You need to specify the HadoopOfficeWriteConfiguration, optionally a set of "header" columns (first row of Excel, leave null if you do not need it), the sheetname to which you want to write the data, the dateLocale (usually it is Locale.US because most Excel versions store dates in US format) and decimaleFormat (to use the right decimal "point").
  • RowSimpleExcelFlinkFileInputFormat/RowSimpleExcelFlinkFileOutputFormat. This is similarly to the SimpleExcelFlinkFileInputFormat/SimpleExcelFlinkFileOutputFormat, but it returns a Flink Row. It is used by the TableSource/TableSink provided by the HadoopOffice Library.

Build

Note this Flink DataSource/DataSink is available on Maven Central and you do not need to build and publish it to a local Maven anymore to use it.

Execute:

git clone https://github.com/ZuInnoTe/hadoopoffice.git hadoopoffice

You can build the application by changing to the directory hadoopoffice/flinkds and using the following command:

../gradlew clean build publishToMavenLocal

This command also publishes it to the local Maven so you easily use it in your own projects.

Use

Examples (Scala)

ExcelFlinkFileInputFormat

This example reads an excel file using a US locale and assuming that all rows contain data (no header row).

val hocr = new HadoopOfficeReadConfiguration()
hocr.setLocale(new Locale.Builder().setLanguageTag("us").build())
// load Excel file
hocr.setReadHeader(false)  // set to true if you want to skip the first row
val inputFormat = new ExcelFlinkFileInputFormat(hocr)
val excelData = env.readFile(inputFormat, inputFile)

ExcelFlinkFileOutputFormat

This example writes a file called test.xlsx in new Excel format to the folder /user/office/output. It sets the comment author and the comment size. It specifies the defaultSheetName (only needed if you require to write a "header" row (=first row) in Excel). However, in this example no headers are written.

val howc = new HadoopOfficeWriteConfiguration('test.xlsx')
howc.setMimeType("application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
howc.setCommentAuthor("Jane Doe")
howc.setCommentHeight(1)  // units in terms of "cells",ie the comment box is 1 cell height
howc.setCommentWidth(3) // units in terms of "cells",ie the comment box is 3 cell width
val defaultSheetName = "Sheet1"
val header = null // write a header as first line in defaultSheetName
val outputFormat = new ExcelFlinkFileOutputFormat(howc, header,defaultSheetName)
excelDataWithComment.write(outputFormat, "hdfs:///user/office/output/test.xlsx")

SimpleExcelFlinkFileInputFormat

The following example illustrates how an Excel file can be read and it returns a DataSet of simple objects (e.g. string, integer, byte, short, date etc.). For instance, the example can be used with the testsimple.xlsx file. It specifies the decimal format (to determine how to interpret the decimal point) and dateFormat (usually this one is US, because most of the time Excel stores dates in US format). The first row of the Excel is skipped, because it contains a header. It determines the schema of the Excel by reading all lines in the Excel first to derive it.

val hocr = new HadoopOfficeReadConfiguration()
hocr.setLocale(new Locale.Builder().setLanguageTag("de").build())
// load Excel file, in order to do the conversion correctly, we need to define the format for date and decimal
val dateFormat: SimpleDateFormat = DateFormat.getDateInstance(DateFormat.SHORT, Locale.US).asInstanceOf[SimpleDateFormat] //important: even for non-US excel files US must be used most of the time, because this is how Excel stores them internally
val decimalFormat: DecimalFormat = NumberFormat.getInstance(Locale.GERMAN).asInstanceOf[DecimalFormat] 
hocr.setReadHeader(true) // the Excel file contains in the first line the header
// we have maxInferRows = -1 , which means we read the full Excel file first to infer the underlying schema
val maxInferRows = -1
val inputFormat = new SimpleExcelFlinkFileInputFormat(hocr, maxInferRows, dateFormat, decimalFormat)
val excelInData = env.readFile(inputFormat, inputFile)

SimpleExcelFlinkFileOutputFormat

The following example writes a dataset consisting of arrays of simple objects to an Excel file in new Excel format. It writes them to the Sheet "Sheet2". However, in this example no headers are written.

val howc = new HadoopOfficeWriteConfiguration(new Path(outputFile).getName())
howc.setMimeType("application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
val defaultSheetName = "Sheet2"
val header = null // is an Array of Strings, if null then no header line is written
 val outputFormat = new SimpleExcelFlinkFileOutputFormat(howc, header,defaultSheetName, dateFormat, decimalFormat)
 excelData.write(outputFormat, outputFile)
Clone this wiki locally