Skip to content
GitLab
Menu
Projects
Groups
Snippets
/
Help
What's new
7
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Switch to GitLab Next
Sign in / Register
Toggle navigation
Menu
Open sidebar
zobierlabs
nifi-examples
Commits
1346944d
Commit
1346944d
authored
Feb 06, 2015
by
Phillip Grenier
Browse files
More information in test case
parent
7bdf265f
Changes
2
Hide whitespace changes
Inline
Side-by-side
src/main/java/rocks/nifi/examples/processors/JsonProcessor.java
View file @
1346944d
...
...
@@ -18,6 +18,7 @@ import java.util.concurrent.atomic.AtomicReference;
import
org.apache.commons.io.IOUtils
;
import
org.apache.nifi.components.PropertyDescriptor
;
import
org.apache.nifi.flowfile.FlowFile
;
import
org.apache.nifi.logging.ProcessorLog
;
import
org.apache.nifi.processor.AbstractProcessor
;
import
org.apache.nifi.processor.ProcessContext
;
import
org.apache.nifi.processor.ProcessSession
;
...
...
@@ -39,10 +40,12 @@ import org.apache.nifi.processor.util.StandardValidators;
@Tags
({
"JSON"
,
"NIFI ROCKS"
})
@CapabilityDescription
(
"Fetch value from json path."
)
public
class
JsonProcessor
extends
AbstractProcessor
{
private
List
<
PropertyDescriptor
>
properties
;
private
Set
<
Relationship
>
relationships
;
public
static
final
String
MATCH_ATTR
=
"match"
;
public
static
final
PropertyDescriptor
JSON_PATH
=
new
PropertyDescriptor
.
Builder
()
.
name
(
"Json Path"
)
.
required
(
true
)
...
...
@@ -67,18 +70,32 @@ public class JsonProcessor extends AbstractProcessor {
@Override
public
void
onTrigger
(
ProcessContext
context
,
ProcessSession
session
)
throws
ProcessException
{
FlowFile
flowfile
=
session
.
ge
t
();
final
ProcessorLog
log
=
this
.
getLog
ge
r
();
final
AtomicReference
<
String
>
value
=
new
AtomicReference
<>();
FlowFile
flowfile
=
session
.
get
();
session
.
read
(
flowfile
,
new
InputStreamCallback
()
{
@Override
public
void
process
(
InputStream
in
)
throws
IOException
{
String
json
=
IOUtils
.
toString
(
in
);
String
result
=
JsonPath
.
read
(
json
,
"$.hello"
);
value
.
set
(
result
);
try
{
String
json
=
IOUtils
.
toString
(
in
);
String
result
=
JsonPath
.
read
(
json
,
"$.hello"
);
value
.
set
(
result
);
}
catch
(
Exception
ex
){
ex
.
printStackTrace
();
log
.
error
(
"Failed to read json string."
);
}
}
});
// Write the results to an attribute
String
results
=
value
.
get
();
if
(
results
!=
null
&&
!
results
.
isEmpty
()){
flowfile
=
session
.
putAttribute
(
flowfile
,
"match"
,
results
);
}
// To write the results back out ot flow file
flowfile
=
session
.
write
(
flowfile
,
new
OutputStreamCallback
()
{
@Override
...
...
@@ -87,8 +104,7 @@ public class JsonProcessor extends AbstractProcessor {
}
});
session
.
transfer
(
flowfile
,
SUCCESS
);
session
.
transfer
(
flowfile
,
SUCCESS
);
}
@Override
...
...
src/test/java/rocks/nifi/examples/processors/JsonProcessorTest.java
View file @
1346944d
...
...
@@ -6,13 +6,15 @@
package
rocks.nifi.examples.processors
;
import
java.io.ByteArrayInputStream
;
import
java.io.IOException
;
import
java.io.InputStream
;
import
java.util.List
;
import
java.util.Set
;
import
org.apache.commons.io.IOUtils
;
import
org.apache.nifi.processor.ProcessContext
;
import
org.apache.nifi.processor.ProcessSession
;
import
org.apache.nifi.processor.ProcessorInitializationContext
;
import
org.apache.nifi.
processor.Relationship
;
import
org.apache.nifi.
util.MockFlowFile
;
import
org.apache.nifi.util.TestRunner
;
import
org.apache.nifi.util.TestRunners
;
import
org.junit.Test
;
...
...
@@ -28,18 +30,36 @@ public class JsonProcessorTest {
* Test of onTrigger method, of class JsonProcessor.
*/
@org
.
junit
.
Test
public
void
testOnTrigger
()
{
public
void
testOnTrigger
()
throws
IOException
{
// Content to be mock a json file
InputStream
content
=
new
ByteArrayInputStream
(
"{\"hello\":\"nifi rocks\"}"
.
getBytes
());
// Generate a test runner to mock a processor in a flow
TestRunner
runner
=
TestRunners
.
newTestRunner
(
new
JsonProcessor
());
// Add properites
runner
.
setProperty
(
JsonProcessor
.
JSON_PATH
,
"$.hello"
);
// Add the content to the runner
runner
.
enqueue
(
content
);
runner
.
run
();
// Run the enqueued content, it also takes an int = number of contents queued
runner
.
run
(
1
);
// All results were processed with out failure
runner
.
assertQueueEmpty
();
// TODO review the generated test code and remove the default call to fail.
// fail("The test case is a prototype.");
// If you need to read or do aditional tests on results you can access the content
List
<
MockFlowFile
>
results
=
runner
.
getFlowFilesForRelationship
(
JsonProcessor
.
SUCCESS
);
assertTrue
(
"1 match"
,
results
.
size
()
==
1
);
MockFlowFile
result
=
results
.
get
(
0
);
String
resultValue
=
new
String
(
runner
.
getContentAsByteArray
(
result
));
System
.
out
.
println
(
"Match: "
+
IOUtils
.
toString
(
runner
.
getContentAsByteArray
(
result
)));
// Test attributes and content
result
.
assertAttributeEquals
(
JsonProcessor
.
MATCH_ATTR
,
"nifi rocks"
);
result
.
assertContentEquals
(
"nifi rocks"
);
}
}
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment