I am found of quick solutions and groovy is a convenient way of developing small scripts for extending NiFi when you need it.

NiFi has good processors for extracting data from a database, but sometimes you need your own, which I will show in another post where I will use this code to make lookups and replace all matched entries with a token, as part of my anonymization of data.

 

First we need to create a Database pool servicein Nifi

URL is the URL to your database in the form

URL = jdbc:mysql://localhost/smstid?serverTimezone=CET

In my case I had to change timezone to make my NiFi timezone match the database timezone

DriverName = com.mysql.jdbc.Driver

DriverJar = /Users/max/Documents/xml/jdbc/mysql-connector-java-8.0.11.jar

Now its time to create the flow in NiFi

I use the ExecuteGroovyScript processor in my NiFi 1.6

And I also use puFile processor to store the result back in a file

The whole flow will look like this

 

In The executeGroovyScript processor I add in the full path to my groovy script

And I create two attributes that I use in the script

DBCPConnectionPoolName that point to the newly created DBpool service

And the sqlCmd attribute that will contain my select statement

And below is my Groovy script that will store the result in a file in comma separated format

 

import org.apache.nifi.controller.ControllerService
import groovy.sql.Sql
// Executescript attributes
def serviceName = DBCPConnectionPoolName.value
def sqlCmdString = sqlCmd.value
// get controller service lookup from context
def lookup = context.controllerServiceLookup
// search for serviceName in controller services
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
    cs -> lookup.getControllerServiceName(cs) == serviceName
}
//Get the service from serviceid
def service = lookup.getControllerService(dbcpServiceId)
// Connect to service
def conn = service.getConnection()
if (!conn) {
  log.error( “Failed to connect to ” + serviceName)
  return;
}
try {
flowFile = session.create()
flowFile = session.write(flowFile, {out ->
def sql = new Sql(conn)
  if (!sql) {
    log.error( “Failed to get SQL connection”)
    return
  }
//def sqlCmdString = ‘select fname from t_user WHERE fname LIKE “M%” OR fname LIKE “A%”‘
def rows = sql.rows(sqlCmdString)
  rows.eachWithIndex { row, idx ->
      if(idx == 0) {
              // Header line
       out.write(((row.keySet() as List).join(‘,’) + “\n”).getBytes(“UTF-8”))
         }
         // Value line
      out.write((row.values().join(‘,’) + “\n”).getBytes(“UTF-8”))
  }
} as OutputStreamCallback)
session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
    log.error(‘Scripting error’ + sqlCmd, e)
    session.transfer(flowFile, REL_FAILURE)
}
// Release connection, this is important as it will otherwise block new executions
conn?.close()

Leave a Reply

Your email address will not be published. Required fields are marked *