When you want your users to bring their own data, you soon realize that they will bring any kind of data and you need to figure out what they want to load.
As an analyst you like to work with structured data if you can and many times you have some ind of csv file you like to load into Hive but from a convenient perspective you do not want to define all columns but rather let the ETL tool figure this out.
Here is a small groovy script that is analyzing csv files to determine what column separator is used as well as which quoting character is used.
import java.nio.charset.StandardCharsets def flowFile = session.get() if(!flowFile) return def lineCount=0 def fieldDelimiter=',' def quoteChar = '\\\'' def quoteCharAttr = '\'' def status = 1 def colHeader = [] def primaryKeysAtt = primaryKeys.value def primaryKeysArray = primaryKeysAtt.toLowerCase().split(',') primaryKeysArray = primaryKeysArray*.trim() session.read(flowFile, {inputStream -> inputStream.eachLine { line -> lineCount++ if (lineCount == 1) { // find delimiter if (line.contains(';')) { fieldDelimiter=';' } else if (! line.contains(',')) { // unsupported field separator status=0 return true } colHeader = line.split(fieldDelimiter) colHeader = colHeader*.trim() } else { // find quoting char if (line.contains('\'') || line.contains('"')) { // Line contains a quoting character line=line.replaceAll("\\s","") fields = line.split(fieldDelimiter) fields.each { if (it[0] == '"' && it[it.size()-1] == '"') { quoteChar='"' quoteCharAtt='"' return true } else if (it[0] == '\'' && it[it.size()-1] == '\'') { quoteChar='\\\'' quoteCharAttr='\'' return true } } } } } } as InputStreamCallback) if (status == 1) { def table_feedFieldStructure = [] //table.fieldStructure same as feedFieldStructure def table_fieldPoliciesJson = [] def numberCols=[] for (i=0;i<colHeader.size();i++) { numberCols.add(i) colData=[] colData.add(colHeader[i]) colData.add('string') if (colHeader[i].toLowerCase() in primaryKeysArray) { colData.add("1") // primary key 1 } else { colData.add("0") // primary key 1 } colData.add("0") colData.add("0") colData.add(colHeader[i]) table_feedFieldStructure.add(colData.join('|')) table_fieldPoliciesJson.add('{"profile":true,"index":false,"fieldName":"' + colHeader[i] + '","feedFieldName":"' + colHeader[i] + '","standardization":null,"validation":null,"domainTypeId":null,"partitionColumn":false}') } table_feedFormat="ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' WITH SERDEPROPERTIES ( 'separatorChar' = " + fieldDelimiter + " ,'escapeChar' = '\\\\' ,'quoteChar' = " + quoteChar + ") STORED AS TEXTFILE" def table_targetFormat="STORED AS ORC" def table_targetMergeStrategy = "DEDUPE_AND_MERGE" def table_targetTblProperties = 'tblproperties("orc.compress"="SNAPPY")' flowFile = session.putAttribute(flowFile, 'metadata.table.feedFormat', table_feedFormat) flowFile = session.putAttribute(flowFile, 'metadata.table.targetFormat', table_targetFormat) flowFile = session.putAttribute(flowFile, 'metadata.table.targetMergeStrategy', table_targetMergeStrategy) flowFile = session.putAttribute(flowFile, 'metadata.table.targetTblProperties', table_targetTblProperties) flowFile = session.putAttribute(flowFile, 'metadata.table.feedFieldStructure', table_feedFieldStructure.join('\n')) flowFile = session.putAttribute(flowFile, 'metadata.table.fieldStructure', table_feedFieldStructure.join('\n')) flowFile = session.putAttribute(flowFile, 'metadata.table.fieldPoliciesJson', '[' + table_fieldPoliciesJson.join(',') + ']') flowFile = session.putAttribute(flowFile, 'table.columns', colHeader.join(',') ) flowFile = session.putAttribute(flowFile, 'csv.column.separator', fieldDelimiter ) flowFile = session.putAttribute(flowFile, 'csv.column.escapechar', '\\' ) flowFile = session.putAttribute(flowFile, 'csv.column.quotechar', quoteCharAttr ) flowFile = session.putAttribute(flowFile, 'commaSeparatedColumnNumbers', numberCols.join(',')) flowFile = session.putAttribute(flowFile, 'commaSeparatedColumnNames', colHeader.join(',')) session.transfer(flowFile, REL_SUCCESS) } else { session.transfer(flowFile, REL_FAILURE) }