POST
Kotlin NiFi Processors
Kotlin is an excellent choice for developing JVM-based applications, including extensions for Apache NiFi. Writing custom NiFi Processors in Kotlin minimizes boilerplate code and enhances null safety, reducing the likelihood of bugs. Additionally, NiFi's lambda callback conventions align seamlessly with Kotlin's trailing lambda syntax. Let's dive in.
To skip right to the code, clone the NiFi Starter Kotlin project
Below is a basic implementation of the AbstractProcessor
interface.
@Tags("starter")
@CapabilityDescription("Starter processor")
class ExampleProcessor : AbstractProcessor() {
val success: Relationship =
Relationship
.Builder()
.name("success")
.description("success")
.build()
val failure: Relationship =
Relationship
.Builder()
.name("failure")
.description("failure")
.build()
override fun getRelationships(): Set<Relationship> {
return setOf(success, failure)
}
override fun getSupportedPropertyDescriptors(): List<PropertyDescriptor> {
return listOf()
}
override fun onTrigger(context: ProcessContext, session: ProcessSession) {
val flowFile: FlowFile = session.get() ?: return
session.transfer(flowFile, success)
}
}
This processor transfers incoming FlowFiles to the success
relationship.
Obviously, we can make this more interesting. Let's implement onTrigger
to
return a paragraph of text per incoming FlowFile.
First, let's add a new PropertyDescriptor
so that the User can decide which
paragraph they want.
val index: PropertyDescriptor =
PropertyDescriptor.Builder()
.name("index")
.displayName("index")
.description("The desired paragraph selected by index")
.required(false)
.defaultValue("0")
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.build()
override fun getSupportedPropertyDescriptors(): List<PropertyDescriptor> {
return listOf(index)
}
Then, let's modify onTrigger
to do something interesting.
override fun onTrigger(context: ProcessContext, session: ProcessSession) {
val flowFile: FlowFile = session.get() ?: return
try {
val index = context.getProperty(index).asInteger()
val contents = session.read(flowFile).bufferedReader().use {
it.readText()
}
val splitByBlankLine = contents.split("\n\n")
val selection = splitByBlankLine[index]
session.write(flowFile) { output ->
output.write(selection.toByteArray())
}
session.transfer(flowFile, success)
} catch (e: Exception) {
session.putAttribute(flowFile, "ExampleProcessor.error", e.toString())
session.transfer(flowFile, failure)
}
}
This method does the following:
- Read the incoming FlowFile content
- Split on blank lines
- Select the element based on the
index
property - Replace the FlowFile content with the selection
For example, if the incoming FlowFile has contents:
From my grandfather Verus I learned good morals and
the government of my temper.
From the reputation and remembrance of my father,
modesty and a manly character.
From my mother, piety and beneficence, and abstinence,
not only from evil deeds, but even from evil thoughts; and
further, simplicity in my way of living, far removed
from the habits of the rich.
With the index
property set to 1
, the FlowFile's content will change to that
of From the reputation and remembrance of my father, modesty and a manly character.
Writing Tests
We can easily test this processor using NiFi's TestRunner
.
import org.apache.nifi.util.TestRunners
import org.junit.jupiter.api.Test
class ExampleProcessorTest {
var testRunner = TestRunners.newTestRunner(ExampleProcessor::class.java)
@Test
fun testProcessor() {
testRunner.setProperty("index", "0")
testRunner.enqueue(
"""
From my grandfather Verus I learned good morals and
the government of my temper.
From the reputation and remembrance of my father,
modesty and a manly character.
From my mother, piety and beneficence, and abstinence,
not only from evil deeds, but even from evil thoughts; and
further, simplicity in my way of living, far removed
from the habits of the rich.
""".trimIndent()
)
testRunner.run()
testRunner.assertAllFlowFilesTransferred("success")
testRunner.getFlowFilesForRelationship(("success"))[0].assertContentEquals(
"""
From my grandfather Verus I learned good morals and
the government of my temper.
""".trimIndent()
)
}
}
Registering your Processor
In order for NiFi to recognize your Processor you must register it in a special resource file: resources/META-INF/services/org.apache.nifi.processor.Processor
The contents of this file are the import paths of your processors. In our case, it will be one line for ExampleProcessor
.
org.example.starter.processor.ExampleProcessor
Building a NAR file
We can use Maven to package our Kotlin Processors as a NAR file ("NiFi
Archive"). NAR files are NiFi's preferred extension format as they provide
dependency isolation between different Processors. It's a good idea to organize
your project into a two modules: one for source code, and one for NAR bundling.
We will be creating three pom.xml
files in total: the root pom, the kotlin
code pom, and the NAR bundler pom.
.
├── nifi-starter-nar
│ └── pom.xml
├── nifi-starter-processors
│ ├── pom.xml
│ └── src
│ ├── main
│ │ ├── kotlin
│ │ │ └── org
│ │ │ └── example
│ │ │ └── starter
│ │ │ └── processor
│ │ │ └── ExampleProcessor.kt
│ │ └── resources
│ │ └── META-INF
│ │ └── services
│ │ └── org.apache.nifi.processor.Processor
│ └── test
│ └── kotlin
│ └── org
│ └── example
│ └── starter
│ └── processor
│ └── ExampleProcessorTest.kt
└── pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.example</groupId>
<artifactId>nifi-starter</artifactId>
<version>0.0.1</version>
</parent>
<artifactId>nifi-starter-nar</artifactId>
<version>0.0.1</version>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.example</groupId>
<artifactId>nifi-starter-processors</artifactId>
<version>0.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<version>2.0.0-M2</version>
<type>nar</type>
</dependency>
</dependencies>
</project>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<properties>
<kotlin.version>1.9.22</kotlin.version>
</properties>
<parent>
<groupId>org.example</groupId>
<artifactId>nifi-starter</artifactId>
<version>0.0.1</version>
</parent>
<artifactId>nifi-starter-processors</artifactId>
<packaging>jar</packaging>
<build>
<sourceDirectory>${project.basedir}/src/main/kotlin</sourceDirectory>
<testSourceDirectory>${project.basedir}/src/test/kotlin</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-maven-plugin</artifactId>
<version>${kotlin.version}</version>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>test-compile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib</artifactId>
<version>${kotlin.version}</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-script-runtime</artifactId>
<version>${kotlin.version}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>2.0.0-M1</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-test</artifactId>
<version>${kotlin.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>2.0.0-M1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
And finally, the root pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>2.0.0-M1</version>
</parent>
<groupId>org.example</groupId>
<artifactId>nifi-starter</artifactId>
<version>0.0.1</version>
<packaging>pom</packaging>
<modules>
<module>nifi-starter-processors</module>
<module>nifi-starter-nar</module>
</modules>
</project>
We have created three projects:
- Root project
- Processor project
- NAR project
To build a NAR bundle that NiFi can read, we can call maven in the root
directory with mvn package
.
[INFO] nifi-starter ....................................... SUCCESS [ 0.637 s]
[INFO] nifi-starter-processors ............................ SUCCESS [ 2.818 s]
[INFO] nifi-starter-nar ................................... SUCCESS [ 0.302 s]
Your NAR file will be at nifi-starter-nar/target/nifi-starter-nar-0.0.1.nar
.
Add this file to NiFi's /opt/nifi/nifi-current/nar_extensions
directory in
order to use your new Processor. We can now use ExampleProcessor
in our flows.
Apache NiFi is a great computing platform, and using Kotlin to create custom Processors unlocks many interesting engineering possibilities.