csv analyser with NiFi and groovy

When you want your users to bring their own data, you soon realize that they will bring any kind of data and you need to figure out what they want to load.

As an analyst you like to work with structured data if you can and many times you have some ind of csv file you like to load into Hive but from a convenient perspective you do not want to define all columns but rather let the ETL tool figure this out.

Here is a small groovy script that is analyzing csv files to determine what column separator is used as well as which quoting character is used.

import java.nio.charset.StandardCharsets



def flowFile = session.get()

if(!flowFile) return

def lineCount=0

def fieldDelimiter=','

def quoteChar = '\\\''

def quoteCharAttr = '\''

def status = 1

def colHeader = []



def primaryKeysAtt = primaryKeys.value

def primaryKeysArray = primaryKeysAtt.toLowerCase().split(',')

primaryKeysArray = primaryKeysArray*.trim()



session.read(flowFile, {inputStream ->

inputStream.eachLine { line ->

lineCount++

if (lineCount == 1) {

// find delimiter

if (line.contains(';')) {

fieldDelimiter=';'

} else if (! line.contains(',')) {

// unsupported field separator

status=0

return true

}

colHeader = line.split(fieldDelimiter)

colHeader = colHeader*.trim()



} else {

// find quoting char

if (line.contains('\'') || line.contains('"')) {

// Line contains a quoting character

line=line.replaceAll("\\s","")

fields = line.split(fieldDelimiter)

fields.each {

if (it[0] == '"' && it[it.size()-1] == '"') {

quoteChar='"'

quoteCharAtt='"'

return true

} else if (it[0] == '\'' && it[it.size()-1] == '\'') {

quoteChar='\\\''

quoteCharAttr='\''

return true

}

}



}

}

}

} as InputStreamCallback)

if (status == 1) {

def table_feedFieldStructure = []

//table.fieldStructure same as feedFieldStructure

def table_fieldPoliciesJson = []

def numberCols=[]

for (i=0;i<colHeader.size();i++) {

numberCols.add(i)

colData=[]

colData.add(colHeader[i])

colData.add('string')

if (colHeader[i].toLowerCase() in primaryKeysArray) {

colData.add("1") // primary key 1

} else {

colData.add("0") // primary key 1

}

colData.add("0")

colData.add("0")

colData.add(colHeader[i])

table_feedFieldStructure.add(colData.join('|'))



table_fieldPoliciesJson.add('{"profile":true,"index":false,"fieldName":"' + colHeader[i] + '","feedFieldName":"' + colHeader[i] + '","standardization":null,"validation":null,"domainTypeId":null,"partitionColumn":false}')

}

table_feedFormat="ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' WITH SERDEPROPERTIES ( 'separatorChar' = " + fieldDelimiter + " ,'escapeChar' = '\\\\' ,'quoteChar' = " + quoteChar + ") STORED AS TEXTFILE"

def table_targetFormat="STORED AS ORC"

def table_targetMergeStrategy = "DEDUPE_AND_MERGE"

def table_targetTblProperties = 'tblproperties("orc.compress"="SNAPPY")'



flowFile = session.putAttribute(flowFile, 'metadata.table.feedFormat', table_feedFormat)

flowFile = session.putAttribute(flowFile, 'metadata.table.targetFormat', table_targetFormat)

flowFile = session.putAttribute(flowFile, 'metadata.table.targetMergeStrategy', table_targetMergeStrategy)

flowFile = session.putAttribute(flowFile, 'metadata.table.targetTblProperties', table_targetTblProperties)

flowFile = session.putAttribute(flowFile, 'metadata.table.feedFieldStructure', table_feedFieldStructure.join('\n'))

flowFile = session.putAttribute(flowFile, 'metadata.table.fieldStructure', table_feedFieldStructure.join('\n'))

flowFile = session.putAttribute(flowFile, 'metadata.table.fieldPoliciesJson', '[' + table_fieldPoliciesJson.join(',') + ']')

flowFile = session.putAttribute(flowFile, 'table.columns', colHeader.join(',') )

flowFile = session.putAttribute(flowFile, 'csv.column.separator', fieldDelimiter )

flowFile = session.putAttribute(flowFile, 'csv.column.escapechar', '\\' )

flowFile = session.putAttribute(flowFile, 'csv.column.quotechar', quoteCharAttr )

flowFile = session.putAttribute(flowFile, 'commaSeparatedColumnNumbers', numberCols.join(','))

flowFile = session.putAttribute(flowFile, 'commaSeparatedColumnNames', colHeader.join(','))



session.transfer(flowFile, REL_SUCCESS)

} else {

session.transfer(flowFile, REL_FAILURE)

}

Leave a Reply

Your email address will not be published. Required fields are marked *