
If you want to use a lookup table in NiFi to mask or complement the data in a feed you can build a simple processor with Groovy.
The groovy code can also be found here https://github.com/maxbback/nifi_lookuptable
In this processor I use a DB pool service for looking up addresses to mask in my text.
For each line I collect first characters in each word and is using them in my select statement for my lookup table.
This will case a lot of lookups to the database, so an improvement to this would be to cache the lookup data so we only make request for new data.
In this simple flow I use a getFile to get the file to be processed and a putFile to store it back in the filesystem.
And the main component is the executeGroovyScript processor.
First we define the DBPool service
URL = jdbc:mysql://localhost/mysql?serverTimezone=CET&useUnicode=true&characterEncoding=UTF-8
Driver = com.mysql.cj.jdbc.Driver
Its time to configure the ExecuteGroovyScript processor
I added 4 attributes, one for dbConnection pool, one for lookup table and lookup column and the last is the tag that will be used to replace matched data
The resulting data will look similar to this string;
<fName> is my name or is it <fName>or maybe <fName>
The feed looks like this;
There is a lot of things that needs to be improved to make it production ready, just to mention some;
- Propper error handling
- Limitation of how much we cache so we are not running out of memory
- Smarter matching algoritms
The Code;
import org.apache.nifi.controller.ControllerService import java.nio.charset.StandardCharsets import groovy.sql.Sql // Executescript attributes def serviceName = dbConnectionPoolName.value def lookupTableName = lookupTable.value def lookupColName = lookupColumn.value def tagString = tag.value // Some counters def countChars = 0 def countReplaceExecutions = 0 def countFoundAllWords = 0 // get controller service lookup from context def lookup = context.controllerServiceLookup // search for serviceName in controller services def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find { cs -> lookup.getControllerServiceName(cs) == serviceName } //Get the service from serviceid def service = lookup.getControllerService(dbcpServiceId) // Connect to service def conn = service.getConnection() def sql = new Sql(conn) if (!sql) { log.error( "Failed to get SQL connection") conn?.close() return } if (!conn) { log.error( "Failed to connect to " + serviceName) return } def lookupCacheMap = [:] def charsExpectedAfterMasking = 0 // Get the flowfile def flowFile = session.get() if(!flowFile) return // Update flow file with outputStream flowFile = session.write(flowFile, {inputStream, outputStream -> // Read line by line inputStream.eachLine { line -> // Get all first word character def matchedChars = line =~ /\b\w/ // Create a temporary list to store // Sql where like statements def lookupChars = [] def likeList = [] matchedChars.each { // Remove any numbers if (! it.isNumber()) { // Add charcter to the lookup list lookupChars.add(it.toUpperCase()) // Check if we already cached the lookup values if (!lookupCacheMap.containsKey(it.toUpperCase())) { // Add the new character and a list to lookup cache def newList = [] lookupCacheMap[it.toUpperCase()] = newList // Convert to upper case // Add the new Like statement to the list likeList.add("${lookupColName} LIKE \"${it.toUpperCase()}%\"") } } } charsExpectedAfterMasking = matchedChars.size() - lookupChars.size() // Remove all duplicates lookupChars = lookupChars.unique() likeList = likeList.unique() def cacheList = [] // Do we have more lookup values to cache? if (likeList.size() > 0) { // Join the list with OR statement def likeSql = likeList.join(' OR ') // Build the SQL lookup select statement def sqlCmd = "SELECT ${lookupColName} FROM ${lookupTableName} WHERE ${likeSql}" // Convert sqlStatement to string to secure compatibility with groovy sql def sqlCmdString = sqlCmd.toString() //def rows = sql.rows("SELECT fname FROM lookupTable WHERE fname LIKE 'M%' OR fname LIKE 'I%' OR fname LIKE 'N%' OR fname LIKE 'O%'") // execute sql and get the rows back def rows = sql.rows(sqlCmdString) // add all lines in lookupCache list // For each row mask any existence of the value in the line // (?i) is making the match case insenitive rows.each { row -> if (!lookupCacheMap.containsKey(row.address[0].toUpperCase())) { // SQL sometimes return lines not requested def newList = [] lookupCacheMap[row.address[0].toUpperCase()] = newList } // Add value to cache lookupCacheMap[row.address[0].toUpperCase()].add(row.address) //line = line.replaceAll("(?i)${row.address}", "${tagString}") } } countChars = countChars + lookupChars.size() // match your cache lookupChars.any{ def replacedAllWords = false def charMapId = it lookupCacheMap[charMapId].any { line = line.replaceAll("(?i)${it}", "<@@@@>") countReplaceExecutions++ def matchedCharLeftInLine = line =~ /\b\w/ if (matchedCharLeftInLine.size() <= charsExpectedAfterMasking) { countFoundAllWords++ replacedAllWords = true return true } } if (replacedAllWords) { return true } } line = line.replaceAll("<@@@@>", "${tagString}") + "\n" // Write the result to outputStream outputStream.write(line.getBytes(StandardCharsets.UTF_8)) } } as StreamCallback) //flowFile = session.putAttribute(flowFile, 'countLeftInLine', matchedCharLeftInLine.size().toString()) // Write counters flowFile = session.putAttribute(flowFile, 'countChars', countChars.toString()) flowFile = session.putAttribute(flowFile, 'countReplaceExecutions', countReplaceExecutions.toString()) flowFile = session.putAttribute(flowFile, 'countLookupCacheMapChars', lookupCacheMap.size().toString()) flowFile = session.putAttribute(flowFile, 'countFoundAllWords', countFoundAllWords.toString()) session.transfer(flowFile, REL_SUCCESS) // Release connection, this is important as it will otherwise block new executions conn?.close()