NiFi is a fantastic tool for moving data from one system to another, and in combination with Kylo self service front end it makes it easy to move data.

Process data in NiFi is easy as it comes bundled with a lot of processors and the ability to extend with your own processors written in java or use a generic processor for writing processors in groovy for example.

I have developed a small groovy code to read an excel document and convert it to csv to be possible to ingest into Hive table.

To run the code you also need org.apache.poi xml library

You can insert the groovy code directly into the NiFi ExecuteScript processor or put the content into a file in your filesystem like I did on my Mac notebook.

import org.apache.commons.io.IOUtils

import java.nio.charset.*

import java.text.SimpleDateFormat

import org.apache.poi.ss.usermodel.Cell

import org.apache.poi.ss.usermodel.Row

import org.apache.poi.ss.usermodel.Sheet

import org.apache.poi.ss.usermodel.Workbook

import org.apache.poi.ss.usermodel.WorkbookFactory

import org.apache.poi.hssf.usermodel.*

def flowFile = session.get()

if(!flowFile) return

def primaryKeysAtt = primaryKeys.value

def primaryKeysArray = primaryKeysAtt.toLowerCase().split(',')

primaryKeysArray = primaryKeysArray*.trim()

//def toCategoryVaultAtt = toHiveVault.value

 

def date = new Date()

def value

def colType=[]

def colHeader=[]

def colDataType = ""

def headColNum = 0

flowFile = session.write(flowFile, {inputStream, outputStream ->

try {

Workbook wb = WorkbookFactory.create(inputStream,);

Sheet mySheet = wb.getSheetAt(0);

def record = ''

def content = ''

 

// processing time, inserted as first column

def tstamp = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss.SSS").format(date)

 

Iterator<Row> rowIter = mySheet.rowIterator();

def rowNum = 0

def colNum = 0

def columns = []

while (rowIter.hasNext()) {

rowNum++

colNum = 0

columns = []

record = ''

Row nextRow = rowIter.next();

Iterator<Cell> cellIterator = nextRow.cellIterator();

 

while (cellIterator.hasNext()) {

Cell cell = cellIterator.next()

if (rowNum == 1) {

content = cell.toString()

columns.add(content.replaceAll("\\s",""))

colHeader.add(content.replaceAll("\\s",""))

colType.add(null)

headColNum=colNum

}

else {

switch(cell.cellType) {

case HSSFCell.CELL_TYPE_NUMERIC:

if(HSSFDateUtil.isCellDateFormatted(cell)) {

//value = cell.dateCellValue

//columns.add(cell.toString())

colDataType="string"

value = cell.cell.toString()

} else {

//value = cell.numericCellValue

columns.add(cell.toString())

colDataType="float"

}

break

case HSSFCell.CELL_TYPE_BOOLEAN:

//value = cell.booleanCellValue

colDataType="string"

value = cell.cell.toString()

break

default:

value = cell.stringCellValue

colDataType="string"

break

 

 

}

// escape and clean string data from line breaks

if (colDataType == "string") {

// remove all record separators

value = value.replaceAll('\n',' ')

value = value.replaceAll('\r',' ')

// Esacpe all escape characters

if (value.contains('\\')) {

value = value.replaceAll('\\','\\\\')

}

// escape quote character

if (value.contains('\'')) {

value = value.replaceAll('\'','\\\'')

}

// quaote strings containing separator

if (value.contains(',')) {

columns.add( "'" + value + "'")

} else {

columns.add(value)

}

 

}

if (colType[colNum] == null) {

colType[colNum] = colDataType;

} else {

if (colType[colNum] != colDataType) {

colType[colNum] = "string"

}

}

}

 

colNum++

}

while (colNum < headColNum )

{

columns.add("")

colNum++

}

record = columns.join(',')

 

//if (rowIter.hasNext()) {

record = record + '\n'

//}

outputStream.write(record.getBytes(StandardCharsets.UTF_8))

}

 

}

catch(e) {

log.error("Error during processing of spreadsheet name = xx, sheet = xx", e)

//session.transfer(inputStream, REL_FAILURE)

}

} as StreamCallback)

def filename = flowFile.getAttribute('filename').split('\\.')[0] + '_' + new SimpleDateFormat("YYYYMMdd-HHmmss").format(date)+'.csv'

flowFile = session.putAttribute(flowFile, 'filename', filename)

 

table_feedFormat="ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' WITH SERDEPROPERTIES ( 'separatorChar' = ',' ,'escapeChar' = '\\\\' ,'quoteChar' = '\\\'') STORED AS TEXTFILE"

def table_feedFieldStructure = []

//table.fieldStructure same as feedFieldStructure

def table_fieldPoliciesJson = []

def table_targetFormat="STORED AS ORC"

def table_targetMergeStrategy = "DEDUPE_AND_MERGE"

def table_targetTblProperties = 'tblproperties("orc.compress"="SNAPPY")'

 

def numberCols=[]

def colData=[]

for (i=0;i<colHeader.size();i++) {

numberCols.add(i)

colData=[]

colData.add(colHeader[i])

colData.add(colType[i])

if (colHeader[i].toLowerCase() in primaryKeysArray) {

colData.add("1") // primary key 1

} else {

colData.add("0") // primary key 1

}

colData.add("0")

colData.add("0")

colData.add(colHeader[i])

table_feedFieldStructure.add(colData.join('|'))

 

table_fieldPoliciesJson.add('{"profile":true,"index":false,"fieldName":"' + colHeader[i] + '","feedFieldName":"' + colHeader[i] + '","standardization":null,"validation":null,"domainTypeId":null,"partitionColumn":false}')

}

 

flowFile = session.putAttribute(flowFile, 'metadata.table.feedFormat', table_feedFormat)

flowFile = session.putAttribute(flowFile, 'metadata.table.targetFormat', table_targetFormat)

flowFile = session.putAttribute(flowFile, 'metadata.table.targetMergeStrategy', table_targetMergeStrategy)

flowFile = session.putAttribute(flowFile, 'metadata.table.targetTblProperties', table_targetTblProperties)

flowFile = session.putAttribute(flowFile, 'metadata.table.feedFieldStructure', table_feedFieldStructure.join('\n'))

flowFile = session.putAttribute(flowFile, 'metadata.table.fieldStructure', table_feedFieldStructure.join('\n'))

flowFile = session.putAttribute(flowFile, 'metadata.table.fieldPoliciesJson', '[' + table_fieldPoliciesJson.join(',') + ']')

flowFile = session.putAttribute(flowFile, 'table.columns', colHeader.join(',') )

flowFile = session.putAttribute(flowFile, 'csv.column.separator', ',' )

flowFile = session.putAttribute(flowFile, 'csv.column.escapechar', '\\' )

flowFile = session.putAttribute(flowFile, 'csv.column.quotechar', '\'' )

flowFile = session.putAttribute(flowFile, 'commaSeparatedColumnNumbers', numberCols.join(','))

flowFile = session.putAttribute(flowFile, 'commaSeparatedColumnNames', colHeader.join(','))

session.transfer(flowFile, REL_SUCCESS)

Leave a Reply

Your email address will not be published. Required fields are marked *