I am found of quick solutions and groovy is a convenient way of developing small scripts for extending NiFi when you need it.
NiFi has good processors for extracting data from a database, but sometimes you need your own, which I will show in another post where I will use this code to make lookups and replace all matched entries with a token, as part of my anonymization of data.
First we need to create a Database pool servicein Nifi
![](http://34.204.195.226/wp-content/uploads/2018/06/Skärmavbild-2018-06-06-kl.-17.32.51.png)
URL is the URL to your database in the form
URL = jdbc:mysql://localhost/smstid?serverTimezone=CET
In my case I had to change timezone to make my NiFi timezone match the database timezone
DriverName = com.mysql.jdbc.Driver
DriverJar = /Users/max/Documents/xml/jdbc/mysql-connector-java-8.0.11.jar
![](http://34.204.195.226/wp-content/uploads/2018/06/Skärmavbild-2018-06-06-kl.-17.32.17.png)
Now its time to create the flow in NiFi
I use the ExecuteGroovyScript processor in my NiFi 1.6
And I also use puFile processor to store the result back in a file
The whole flow will look like this
![](http://34.204.195.226/wp-content/uploads/2018/06/Skärmavbild-2018-06-06-kl.-17.48.58.png)
In The executeGroovyScript processor I add in the full path to my groovy script
And I create two attributes that I use in the script
DBCPConnectionPoolName that point to the newly created DBpool service
And the sqlCmd attribute that will contain my select statement
![](http://34.204.195.226/wp-content/uploads/2018/06/Skärmavbild-2018-06-06-kl.-17.34.40.png)
And below is my Groovy script that will store the result in a file in comma separated format
import org.apache.nifi.controller.ControllerService
import groovy.sql.Sql
// Executescript attributes
def serviceName = DBCPConnectionPoolName.value
def sqlCmdString = sqlCmd.value
// get controller service lookup from context
def lookup = context.controllerServiceLookup
// search for serviceName in controller services
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
cs -> lookup.getControllerServiceName(cs) == serviceName
}
//Get the service from serviceid
def service = lookup.getControllerService(dbcpServiceId)
// Connect to service
def conn = service.getConnection()
if (!conn) {
log.error( “Failed to connect to ” + serviceName)
return;
}
try {
flowFile = session.create()
flowFile = session.write(flowFile, {out ->
def sql = new Sql(conn)
if (!sql) {
log.error( “Failed to get SQL connection”)
return
}
//def sqlCmdString = ‘select fname from t_user WHERE fname LIKE “M%” OR fname LIKE “A%”‘
def rows = sql.rows(sqlCmdString)
rows.eachWithIndex { row, idx ->
if(idx == 0) {
// Header line
out.write(((row.keySet() as List).join(‘,’) + “\n”).getBytes(“UTF-8”))
}
// Value line
out.write((row.values().join(‘,’) + “\n”).getBytes(“UTF-8”))
}
} as OutputStreamCallback)
session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
log.error(‘Scripting error’ + sqlCmd, e)
session.transfer(flowFile, REL_FAILURE)
}
// Release connection, this is important as it will otherwise block new executions
conn?.close()
Post Views: 0