POST

Kotlin NiFi Processors

zeevo

Kotlin is an excellent choice for developing JVM-based applications, including extensions for Apache NiFi. Writing custom NiFi Processors in Kotlin minimizes boilerplate code and enhances null safety, reducing the likelihood of bugs. Additionally, NiFi's lambda callback conventions align seamlessly with Kotlin's trailing lambda syntax. Let's dive in.

To skip right to the code, clone the NiFi Starter Kotlin project

Below is a basic implementation of the AbstractProcessor interface.

ExampleProcessor.kt
@Tags("starter")
@CapabilityDescription("Starter processor")
class ExampleProcessor : AbstractProcessor() {
  val success: Relationship =
    Relationship
      .Builder()
      .name("success")
      .description("success")
      .build()
 
  val failure: Relationship =
    Relationship
      .Builder()
      .name("failure")
      .description("failure")
      .build()
 
  override fun getRelationships(): Set<Relationship> {
    return setOf(success, failure)
  }
 
  override fun getSupportedPropertyDescriptors(): List<PropertyDescriptor> {
    return listOf()
  }
 
  override fun onTrigger(context: ProcessContext, session: ProcessSession) {
    val flowFile: FlowFile = session.get() ?: return
    session.transfer(flowFile, success)
  }
}

This processor transfers incoming FlowFiles to the success relationship. Obviously, we can make this more interesting. Let's implement onTrigger to return a paragraph of text per incoming FlowFile.

First, let's add a new PropertyDescriptor so that the User can decide which paragraph they want.

val index: PropertyDescriptor =
  PropertyDescriptor.Builder()
    .name("index")
    .displayName("index")
    .description("The desired paragraph selected by index")
    .required(false)
    .defaultValue("0")
    .addValidator(StandardValidators.INTEGER_VALIDATOR)
    .build()
 
override fun getSupportedPropertyDescriptors(): List<PropertyDescriptor> {
  return listOf(index)
}

Then, let's modify onTrigger to do something interesting.

override fun onTrigger(context: ProcessContext, session: ProcessSession) {
  val flowFile: FlowFile = session.get() ?: return
  try {
    val index = context.getProperty(index).asInteger()
    val contents = session.read(flowFile).bufferedReader().use {
      it.readText()
    }
    val splitByBlankLine = contents.split("\n\n")
    val selection = splitByBlankLine[index]
    session.write(flowFile) { output ->
      output.write(selection.toByteArray())
    }
    session.transfer(flowFile, success)
  } catch (e: Exception) {
    session.putAttribute(flowFile, "ExampleProcessor.error", e.toString())
    session.transfer(flowFile, failure)
  }
}

This method does the following:

  1. Read the incoming FlowFile content
  2. Split on blank lines
  3. Select the element based on the index property
  4. Replace the FlowFile content with the selection

For example, if the incoming FlowFile has contents:

From my grandfather Verus I learned good morals and
the government of my temper.
 
From the reputation and remembrance of my father,
modesty and a manly character.
 
From my mother, piety and beneficence, and abstinence,
not only from evil deeds, but even from evil thoughts; and
further, simplicity in my way of living, far removed
from the habits of the rich.

With the index property set to 1, the FlowFile's content will change to that of From the reputation and remembrance of my father, modesty and a manly character.

Writing Tests

We can easily test this processor using NiFi's TestRunner.

ExampleProcessorTest.kt
import org.apache.nifi.util.TestRunners
import org.junit.jupiter.api.Test
 
class ExampleProcessorTest {
  var testRunner = TestRunners.newTestRunner(ExampleProcessor::class.java)
 
  @Test
  fun testProcessor() {
      testRunner.setProperty("index", "0")
      testRunner.enqueue(
              """
          From my grandfather Verus I learned good morals and
          the government of my temper.
 
          From the reputation and remembrance of my father,
          modesty and a manly character.
 
          From my mother, piety and beneficence, and abstinence,
          not only from evil deeds, but even from evil thoughts; and
          further, simplicity in my way of living, far removed
          from the habits of the rich.
              """.trimIndent()
      )
      testRunner.run()
      testRunner.assertAllFlowFilesTransferred("success")
      testRunner.getFlowFilesForRelationship(("success"))[0].assertContentEquals(
              """
          From my grandfather Verus I learned good morals and
          the government of my temper.
              """.trimIndent()
      )
  }
}

Registering your Processor

In order for NiFi to recognize your Processor you must register it in a special resource file: nifi-starter-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor

The contents of this file are the import paths of your processors. In our case, it will be one line for ExampleProcessor.

org.example.starter.processor.ExampleProcessor

Building a NAR file

We can use Maven to package our Kotlin Processors as a NAR file ("NiFi Archive"). NAR files are NiFi's preferred extension format as they provide dependency isolation between different Processors. It's a good idea to organize your project into a two modules: one for source code, and one for NAR bundling. We will be creating three pom.xml files in total: the root pom, the kotlin code pom, and the NAR bundler pom.

.
├── nifi-starter-nar
│   └── pom.xml
├── nifi-starter-processors
│   ├── pom.xml
│   └── src
│       ├── main
│       │   ├── kotlin
│       │   │   └── org
│       │   │       └── example
│       │   │           └── starter
│       │   │               └── processor
│       │   │                   └── ExampleProcessor.kt
│       │   └── resources
│       │       └── META-INF
│       │           └── services
│       │               └── org.apache.nifi.processor.Processor
│       └── test
│           └── kotlin
│               └── org
│                   └── example
│                       └── starter
│                           └── processor
│                               └── ExampleProcessorTest.kt
└── pom.xml
nifi-starter-nar/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>org.example</groupId>
    <artifactId>nifi-starter</artifactId>
    <version>0.0.1</version>
  </parent>
  <artifactId>nifi-starter-nar</artifactId>
  <version>0.0.1</version>
  <packaging>nar</packaging>
  <dependencies>
    <dependency>
      <groupId>org.example</groupId>
      <artifactId>nifi-starter-processors</artifactId>
      <version>0.0.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.nifi</groupId>
      <artifactId>nifi-standard-services-api-nar</artifactId>
      <version>2.0.0-M2</version>
      <type>nar</type>
    </dependency>
  </dependencies>
</project>
nifi-starter-processors/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <properties>
    <kotlin.version>1.9.22</kotlin.version>
  </properties>
  <parent>
    <groupId>org.example</groupId>
    <artifactId>nifi-starter</artifactId>
    <version>0.0.1</version>
  </parent>
  <artifactId>nifi-starter-processors</artifactId>
  <packaging>jar</packaging>
  <build>
    <sourceDirectory>${project.basedir}/src/main/kotlin</sourceDirectory>
    <testSourceDirectory>${project.basedir}/src/test/kotlin</testSourceDirectory>
    <plugins>
      <plugin>
        <groupId>org.jetbrains.kotlin</groupId>
        <artifactId>kotlin-maven-plugin</artifactId>
        <version>${kotlin.version}</version>
        <executions>
          <execution>
            <id>compile</id>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
          <execution>
            <id>test-compile</id>
            <goals>
              <goal>test-compile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
  <dependencies>
    <dependency>
      <groupId>org.jetbrains.kotlin</groupId>
      <artifactId>kotlin-stdlib</artifactId>
      <version>${kotlin.version}</version>
    </dependency>
    <dependency>
      <groupId>org.jetbrains.kotlin</groupId>
      <artifactId>kotlin-script-runtime</artifactId>
      <version>${kotlin.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.nifi</groupId>
      <artifactId>nifi-api</artifactId>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.nifi</groupId>
      <artifactId>nifi-utils</artifactId>
      <version>2.0.0-M1</version>
    </dependency>
    <dependency>
      <groupId>org.jetbrains.kotlin</groupId>
      <artifactId>kotlin-test</artifactId>
      <version>${kotlin.version}</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.nifi</groupId>
      <artifactId>nifi-mock</artifactId>
      <version>2.0.0-M1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-simple</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.junit.jupiter</groupId>
      <artifactId>junit-jupiter-api</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.junit.jupiter</groupId>
      <artifactId>junit-jupiter-engine</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>
</project>

And finally, the root pom.xml

pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>org.apache.nifi</groupId>
    <artifactId>nifi-nar-bundles</artifactId>
    <version>2.0.0-M1</version>
  </parent>
  <groupId>org.example</groupId>
  <artifactId>nifi-starter</artifactId>
  <version>0.0.1</version>
  <packaging>pom</packaging>
  <modules>
    <module>nifi-starter-processors</module>
    <module>nifi-starter-nar</module>
  </modules>
</project>

We have created three projects:

  1. Root project
  2. Processor project
  3. NAR project

To build a NAR bundle that NiFi can read, we can call maven in the root directory with mvn package.

[INFO] nifi-starter ....................................... SUCCESS [  0.637 s]
[INFO] nifi-starter-processors ............................ SUCCESS [  2.818 s]
[INFO] nifi-starter-nar ................................... SUCCESS [  0.302 s]

Your NAR file will be at nifi-starter-nar/target/nifi-starter-nar-0.0.1.nar. Add this file to NiFi's /opt/nifi/nifi-current/nar_extensions directory in order to use your new Processor. We can now use ExampleProcessor in our flows.

Select Processor

Apache NiFi is a great computing platform, and using Kotlin to create custom Processors unlocks many interesting engineering possibilities.