Commit 7950683b authored by pcgrenier's avatar pcgrenier
Browse files

Sample reporting task that utilizes influxdb and bulletin messages

parent f8692086
/.idea
/.idea/*
*.iml
/sample-bundle-nar/target/*
/sample-controller-service-api-nar/target/*
/sample-controller-service-api/target/*
/sample-controller-service/target/*
/sample-processor/target/*
*/target/**
/.project
/.vscode
/.settings
*/.project
*/.vscode
*/.settings
*/.classpath
\ No newline at end of file
......@@ -5,5 +5,6 @@ from apache/nifi:latest
ADD --chown=nifi:nifi ./sample-bundle-nar/target/*.nar lib/
ADD --chown=nifi:nifi ./sample-controller-service-api-nar/target/*.nar lib/
ADD --chown=nifi:nifi ./sample-reporting-task-nar/target/*.nar lib/
RUN sed -i -e 's/#java\.arg\.debug/java\.arg\.debug/g' conf/bootstrap.conf
# Apache Nifi Examples by [NiFi.rocks](http://www.nifi.rocks)
### Building and Deploying
```shell
mvn clean package
docker-compose up --build
```
visit [localhost](http://localhost:8080/nifi)
version: '2'
services:
influxdb:
image: influxdb
container_name: influxdb
ports:
- "8083:8083"
- "8086:8086"
- "8090:8090"
environment:
INFLUXDB_DB: "db"
INFLUXDB_USER: "user"
INFLUXDB_USER_PASSWORD: "pass"
grafana:
image: grafana/grafana
container_name: grafana
ports:
- 3000:3000
links:
- influxdb
nifi:
build: .
container_name: nifi-examples
ports:
- 8080:8080
- 8000:8000
links:
- influxdb
\ No newline at end of file
......@@ -11,6 +11,7 @@
<properties>
<nifi.version>1.5.0</nifi.version>
<influxdb.version>2.9</influxdb.version>
</properties>
<modules>
......@@ -18,6 +19,8 @@
<module>sample-controller-service-api-nar</module>
<module>sample-controller-service</module>
<module>sample-processor</module>
<module>sample-reporting-task</module>
<module>sample-reporting-task-nar</module>
<module>sample-bundle-nar</module>
</modules>
<dependencyManagement>
......@@ -38,6 +41,11 @@
<artifactId>nifi-processor-utils</artifactId>
<version>${nifi.version}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-reporting-utils</artifactId>
<version>${nifi.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-io</artifactId>
......@@ -60,6 +68,12 @@
<version>${nifi.version}</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.influxdb/influxdb-java -->
<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
<version>${influxdb.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<build>
......
......@@ -17,6 +17,7 @@ import org.slf4j.LoggerFactory;
import java.io.*;
import java.io.BufferedInputStream;
import java.nio.file.Paths;
import java.util.Properties;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>examples</artifactId>
<groupId>nifi.rocks</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>sample-reporting-task-nar</artifactId>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>nifi.rocks</groupId>
<artifactId>sample-reporting-task</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>examples</artifactId>
<groupId>nifi.rocks</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>sample-reporting-task</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-reporting-utils</artifactId>
</dependency>
<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package rocks.nifi.examples.reporting;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.reporting.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
@Tags({"nifirocks", "bulletin", "metrics"})
@CapabilityDescription("Report bulletin metrics")
public abstract class BulletinReportingTask extends AbstractReportingTask {
public static final PropertyDescriptor SOURCE_ID = new PropertyDescriptor.Builder()
.name("Source Id")
.required(false)
.build();
public static final PropertyDescriptor MESSAGE_MATCH = new PropertyDescriptor.Builder()
.name("Message Match")
.required(false)
.build();
public static final PropertyDescriptor GROUP_ID = new PropertyDescriptor.Builder()
.name("Group Id")
.required(false)
.build();
protected List<PropertyDescriptor> properties;
private String groupId;
private String sourceId;
private String message;
@Override
protected void init(final ReportingInitializationContext config){
properties = new ArrayList<>();
properties.add(SOURCE_ID);
properties.add(MESSAGE_MATCH);
properties.add(GROUP_ID);
}
protected AtomicLong lastQuery = new AtomicLong(0);
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors(){
return properties;
}
protected List<Bulletin> getBulletins(final ReportingContext context) {
BulletinRepository repo = context.getBulletinRepository();
BulletinQuery query = getBulletinQuery(context);
List<Bulletin> bulletins = repo.findBulletins(query);
if(!bulletins.isEmpty()){
lastQuery.lazySet(bulletins.get(0).getId());
}
return bulletins;
}
protected void onScheduled(final ConfigurationContext context) {
sourceId = context.getProperty(SOURCE_ID).getValue();
groupId = context.getProperty(GROUP_ID).getValue();
message = context.getProperty(MESSAGE_MATCH).getValue();
}
protected BulletinQuery getBulletinQuery(final ReportingContext context){
BulletinQuery.Builder queryBuilder = new BulletinQuery.Builder().after(lastQuery.get());
if(null != sourceId && !sourceId.isEmpty()){
queryBuilder.sourceIdMatches(sourceId);
}
if(null != groupId && !groupId.isEmpty()){
queryBuilder.groupIdMatches(groupId);
}
if(null != message && !message.isEmpty()){
queryBuilder.messageMatches(message);
}
return queryBuilder.build();
}
@Override
public void onTrigger(final ReportingContext context) {
final ComponentLog log = getLogger();
List<Bulletin> bulletins = getBulletins(context);
bulletins.stream().forEach((bulletin)->log.debug("Hey I found a message=" + bulletin.getMessage()));
}
}
\ No newline at end of file
package rocks.nifi.examples.reporting;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.ReportingInitializationContext;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.Pong;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Tags({"nifirocks", "influxdb", "bulletin", "metrics"})
@CapabilityDescription("Report bulletin metrics to the InfluxDB time series database")
public class InfluxBulletinReportingTask extends BulletinReportingTask {
public static final Validator INFLUX_HOST_VALIDATOR = (String subject, String input, ValidationContext context) -> {
ValidationResult.Builder builder = new ValidationResult.Builder()
.subject(subject)
.input(input);
try {
InfluxDB influxDB = InfluxDBFactory.connect(input, "user", "pass");
Pong p = influxDB.ping();
if (p.getResponseTime() > 0) {
builder.valid(true).explanation("connected to " + input + " in " + p.getResponseTime());
} else {
builder.valid(false).explanation("Failed to connect to " + input);
}
} catch (final IllegalArgumentException e) {
builder.valid(false).explanation(e.getMessage());
}
return builder.build();
};
public static final PropertyDescriptor INFLUXDB_PATH = new PropertyDescriptor.Builder()
.name("Influxdb path")
.required(true)
.addValidator(INFLUX_HOST_VALIDATOR)
.defaultValue("http://localhost:8086")
.build();
public static final PropertyDescriptor INFLUXDB_NAME = new PropertyDescriptor.Builder()
.name("Influxdb database name")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor INFLUXDB_METRIC = new PropertyDescriptor.Builder()
.name("Influxdb metric name")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
@Override
public void init(final ReportingInitializationContext context) {
super.init(context);
properties.add(INFLUXDB_PATH);
properties.add(INFLUXDB_NAME);
properties.add(INFLUXDB_METRIC);
properties = Collections.unmodifiableList(properties);
}
private String host;
private String name;
private String metric;
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.description("tag name and value")
.dynamic(true)
.expressionLanguageSupported(true)
.required(false)
.build();
}
@OnScheduled
public void onScheduled(final ConfigurationContext context) {
// setup, to initiate stuff based off of context that will not change between onTriggers
super.onScheduled(context);
host = context.getProperty(INFLUXDB_PATH).getValue();
name = context.getProperty(INFLUXDB_NAME).getValue();
metric = context.getProperty(INFLUXDB_METRIC).getValue();
}
@Override
public void onTrigger(final ReportingContext context) {
final long timestamp = System.currentTimeMillis();
Map<String, String> tags = new HashMap<>();
context.getProperties().entrySet().stream()
.filter((entry) -> entry.getKey().isDynamic())
.forEach(entry -> tags.put(entry.getKey().getName(), entry.getValue()));
InfluxDB influxDB = InfluxDBFactory.connect(host, "user", "pass");
BatchPoints.Builder batch = BatchPoints.database(name);
tags.forEach((key, value) -> batch.tag(key, value));
try {
Map<String, AtomicInteger> counts = new HashMap<>();
counts.put("INFO", new AtomicInteger(0));
counts.put("WARNING", new AtomicInteger(0));
counts.put("ERROR", new AtomicInteger(0));
List<Bulletin> bulletins = getBulletins(context);
for (Bulletin bulletin : bulletins) {
if (bulletin.getLevel().equalsIgnoreCase("INFO")) {
counts.get("INFO").incrementAndGet();
} else if (bulletin.getLevel().equalsIgnoreCase("WARNING")) {
counts.get("WARNING").incrementAndGet();
} else if (bulletin.getLevel().equalsIgnoreCase("ERROR")) {
counts.get("ERROR").incrementAndGet();
}
}
counts.forEach((key, value) -> {
Point.Builder point = Point.measurement(metric)
.time(timestamp, TimeUnit.MILLISECONDS)
.tag(metric, key)
.addField("value", value.intValue());
batch.point(point.build());
});
influxDB.write(batch.build());
} finally {
influxDB.close();
}
}
}
rocks.nifi.examples.reporting.InfluxBulletinReportingTask
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment