
NiFi is a fantastic tool for moving data from one system to another, and in combination with Kylo self service front end it makes it easy to move data.
Process data in NiFi is easy as it comes bundled with a lot of processors and the ability to extend with your own processors written in java or use a generic processor for writing processors in groovy for example.
I have developed a small groovy code to read an excel document and convert it to csv to be possible to ingest into Hive table.
To run the code you also need org.apache.poi xml library
You can insert the groovy code directly into the NiFi ExecuteScript processor or put the content into a file in your filesystem like I did on my Mac notebook.
import org.apache.commons.io.IOUtils import java.nio.charset.* import java.text.SimpleDateFormat import org.apache.poi.ss.usermodel.Cell import org.apache.poi.ss.usermodel.Row import org.apache.poi.ss.usermodel.Sheet import org.apache.poi.ss.usermodel.Workbook import org.apache.poi.ss.usermodel.WorkbookFactory import org.apache.poi.hssf.usermodel.* def flowFile = session.get() if(!flowFile) return def primaryKeysAtt = primaryKeys.value def primaryKeysArray = primaryKeysAtt.toLowerCase().split(',') primaryKeysArray = primaryKeysArray*.trim() //def toCategoryVaultAtt = toHiveVault.value def date = new Date() def value def colType=[] def colHeader=[] def colDataType = "" def headColNum = 0 flowFile = session.write(flowFile, {inputStream, outputStream -> try { Workbook wb = WorkbookFactory.create(inputStream,); Sheet mySheet = wb.getSheetAt(0); def record = '' def content = '' // processing time, inserted as first column def tstamp = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss.SSS").format(date) Iterator<Row> rowIter = mySheet.rowIterator(); def rowNum = 0 def colNum = 0 def columns = [] while (rowIter.hasNext()) { rowNum++ colNum = 0 columns = [] record = '' Row nextRow = rowIter.next(); Iterator<Cell> cellIterator = nextRow.cellIterator(); while (cellIterator.hasNext()) { Cell cell = cellIterator.next() if (rowNum == 1) { content = cell.toString() columns.add(content.replaceAll("\\s","")) colHeader.add(content.replaceAll("\\s","")) colType.add(null) headColNum=colNum } else { switch(cell.cellType) { case HSSFCell.CELL_TYPE_NUMERIC: if(HSSFDateUtil.isCellDateFormatted(cell)) { //value = cell.dateCellValue //columns.add(cell.toString()) colDataType="string" value = cell.cell.toString() } else { //value = cell.numericCellValue columns.add(cell.toString()) colDataType="float" } break case HSSFCell.CELL_TYPE_BOOLEAN: //value = cell.booleanCellValue colDataType="string" value = cell.cell.toString() break default: value = cell.stringCellValue colDataType="string" break } // escape and clean string data from line breaks if (colDataType == "string") { // remove all record separators value = value.replaceAll('\n',' ') value = value.replaceAll('\r',' ') // Esacpe all escape characters if (value.contains('\\')) { value = value.replaceAll('\\','\\\\') } // escape quote character if (value.contains('\'')) { value = value.replaceAll('\'','\\\'') } // quaote strings containing separator if (value.contains(',')) { columns.add( "'" + value + "'") } else { columns.add(value) } } if (colType[colNum] == null) { colType[colNum] = colDataType; } else { if (colType[colNum] != colDataType) { colType[colNum] = "string" } } } colNum++ } while (colNum < headColNum ) { columns.add("") colNum++ } record = columns.join(',') //if (rowIter.hasNext()) { record = record + '\n' //} outputStream.write(record.getBytes(StandardCharsets.UTF_8)) } } catch(e) { log.error("Error during processing of spreadsheet name = xx, sheet = xx", e) //session.transfer(inputStream, REL_FAILURE) } } as StreamCallback) def filename = flowFile.getAttribute('filename').split('\\.')[0] + '_' + new SimpleDateFormat("YYYYMMdd-HHmmss").format(date)+'.csv' flowFile = session.putAttribute(flowFile, 'filename', filename) table_feedFormat="ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' WITH SERDEPROPERTIES ( 'separatorChar' = ',' ,'escapeChar' = '\\\\' ,'quoteChar' = '\\\'') STORED AS TEXTFILE" def table_feedFieldStructure = [] //table.fieldStructure same as feedFieldStructure def table_fieldPoliciesJson = [] def table_targetFormat="STORED AS ORC" def table_targetMergeStrategy = "DEDUPE_AND_MERGE" def table_targetTblProperties = 'tblproperties("orc.compress"="SNAPPY")' def numberCols=[] def colData=[] for (i=0;i<colHeader.size();i++) { numberCols.add(i) colData=[] colData.add(colHeader[i]) colData.add(colType[i]) if (colHeader[i].toLowerCase() in primaryKeysArray) { colData.add("1") // primary key 1 } else { colData.add("0") // primary key 1 } colData.add("0") colData.add("0") colData.add(colHeader[i]) table_feedFieldStructure.add(colData.join('|')) table_fieldPoliciesJson.add('{"profile":true,"index":false,"fieldName":"' + colHeader[i] + '","feedFieldName":"' + colHeader[i] + '","standardization":null,"validation":null,"domainTypeId":null,"partitionColumn":false}') } flowFile = session.putAttribute(flowFile, 'metadata.table.feedFormat', table_feedFormat) flowFile = session.putAttribute(flowFile, 'metadata.table.targetFormat', table_targetFormat) flowFile = session.putAttribute(flowFile, 'metadata.table.targetMergeStrategy', table_targetMergeStrategy) flowFile = session.putAttribute(flowFile, 'metadata.table.targetTblProperties', table_targetTblProperties) flowFile = session.putAttribute(flowFile, 'metadata.table.feedFieldStructure', table_feedFieldStructure.join('\n')) flowFile = session.putAttribute(flowFile, 'metadata.table.fieldStructure', table_feedFieldStructure.join('\n')) flowFile = session.putAttribute(flowFile, 'metadata.table.fieldPoliciesJson', '[' + table_fieldPoliciesJson.join(',') + ']') flowFile = session.putAttribute(flowFile, 'table.columns', colHeader.join(',') ) flowFile = session.putAttribute(flowFile, 'csv.column.separator', ',' ) flowFile = session.putAttribute(flowFile, 'csv.column.escapechar', '\\' ) flowFile = session.putAttribute(flowFile, 'csv.column.quotechar', '\'' ) flowFile = session.putAttribute(flowFile, 'commaSeparatedColumnNumbers', numberCols.join(',')) flowFile = session.putAttribute(flowFile, 'commaSeparatedColumnNames', colHeader.join(',')) session.transfer(flowFile, REL_SUCCESS)