Commit ae09f30e authored by Chris Coughlin's avatar Chris Coughlin

Initial commit after refactoring out of Myriad project

parents
This diff is collapsed.
ROIPipeline
===========
Demonstrates a concurrent Region Of Interest (ROI) detection pipeline written with the [Myriad Data Reduction Framework](https://emphysic.com/myriad/), analogous to the [single-process PipelineDemo example](https://emphysic.com/myriad/sample-code/roi-detection-pipeline/).
What It Does
============
A bundled machine learning model based on the [Passive Aggressive algorithm](http://www.jmlr.org/papers/v7/crammer06a.html) was trained to recognize possible indications of structural damage from [ultrasonic C-scan](http://www.olympus-ims.com/en/ndt-tutorials/instrumententation/cscan/) data. More details on the training process is available from the [Myriad Trainer documentation](http://myrdocs.azurewebsites.net/trainer/#inital-preparation).
A concurrent processing pipeline is constructed in which each input file is read and subsets of data are passed to the defect-detection model. The model examines the data and reports whether or not it found an indication of damage in the data.
How To Use It
=============
Follow the [Myriad installation instructions](http://myrdocs.azurewebsites.net/install/) to get Myriad installed on your machine. You'll need to have [Java 8](http://java.sun.com/) and [Apache Maven](http://maven.apache.org/) installed. Myriad has been tested to run on Windows 7, 10, various 32 and 64 bit flavors of Linux, FreeBSD, TrueOS, and OpenBSD.
Once Myriad has successfully been installed, open a command prompt in the Myriad Demo root folder (the folder with the *pom.xml* file). Run Maven to build:
<pre>mvn package</pre>
After a few minutes the file *MyriadDemo-1.0-SNAPSHOT-allinone.jar* should be created in the *target/* folder. Run this JAR and specify any files you'd like to analyze at the command line with e.g.
<pre>java -jar MyriadDemo-1.0-SNAPSHOT-allinone.jar /home/username/MyriadDemo/samples/140.csv</pre>
We've provided some sample input files under the *samples/* subfolder you can use to get started. Press CTRL-C to end the demonstration.
How It Works
============
This application makes use of several varieties of Myriad's [LinkedWorkerPool](http://myrdocs.azurewebsites.net/api/com/emphysic/myriad/network/LinkedWorkerPool.html). A LinkedWorkerPool is a pool of workers under the management of a single router. When the router receives a work order it relays the work to the next available worker. When the worker completes their task it is sent back to the router, which then sends it on to the next LinkedWorkerPool. If a worker fails to complete the work, the work is resubmitted to the router which then relays to the next worker (restarting workers if necessary).
The ROI detection pipeline is constructed in which input datafiles are read and scanned for ROI in a six-step process:
1. The input file(s) are read by an [Ingestion pool](http://myrdocs.azurewebsites.net/api/com/emphysic/myriad/network/DataIngestorPool.html). Currently-supported file formats include:
* [Delimited text](http://myrdocs.azurewebsites.net/api/com/emphysic/myriad/core/data/io/TextDataset.html);
* [Various image formats](http://myrdocs.azurewebsites.net/api/com/emphysic/myriad/core/data/io/ImageDataset.html) including GIF, PNG, JPEG, and TIFF; and
* [DICOM/DICONDE](http://myrdocs.azurewebsites.net/api/com/emphysic/myriad/core/data/io/DicomDataset.html) files.
2. The Ingestion pool sends input file contents to a [Pyramid pool](http://myrdocs.azurewebsites.net/api/com/emphysic/myriad/network/PyramidActorPool.html) which blurs and subsamples the input to allow the ROI detector to consider the input at several different scales.
3. The Pyramid pool sends each resized version of each input file to a [Preprocessing pool](http://myrdocs.azurewebsites.net/api/com/emphysic/myriad/network/DatasetOperationPool.html), which performs any preprocessing operations required for the ROI detection model. In this case, a [Sobel filter](http://myrdocs.azurewebsites.net/api/com/emphysic/myriad/core/data/ops/SobelOperation.html) is applied to incoming data.
4. The Sobel pool sends the filtered data to a [Sliding Window pool](http://myrdocs.azurewebsites.net/api/com/emphysic/myriad/network/SlidingWindowPool.html), which scans over the inputs and extracts subsets for the ROI detector to examine.
5. For each window of data extracted by the Sliding Window pool, an [ROI Detection pool](http://myrdocs.azurewebsites.net/api/com/emphysic/myriad/network/ROIFinderPool.html) scans for ROI. In this case, a [machine learning model](http://myrdocs.azurewebsites.net/api/com/emphysic/myriad/core/data/roi/MLROIFinder.html) was trained to detect indications of structural damage in ultrasonic sensor data.
6. Flaw detection workers in the ROI Detection pool report their findings to the [Reporting pool](http://myrdocs.azurewebsites.net/api/com/emphysic/myriad/network/ReporterActorPool.html). In this case, the Reporting pool simply logs a message indicating whether or not a flaw signal was found. If a flaw signal was found, it includes sufficient metadata such that the relative position of the flaw can be found in the original input data from Step 1.
After each stage in the pipeline is constructed, the stages are connected by sending each stage a message with a [reference](http://doc.akka.io/docs/akka/2.4/general/actors.html#Actor_Reference) to the next stage. This sets the stage's [next](http://myrdocs.azurewebsites.net/api/com/emphysic/myriad/network/LinkedWorkerPool.html#next) field - when the stage receives a completed task from one of its workers the results are automatically sent to the Akka actor referenced by *next*. Each of the six stages runs concurrently and independently - if a stage is busy the previous stage simply posts its messages to the stage's mailbox and continues with its own work.
Files specified on the command line are read and processed directly. The application then continues to run, waiting for new data to inspect, until the user terminates e.g. with CTRL-C. If running the application on the server, sending a [FileMessage](http://myrdocs.azurewebsites.net/api/com/emphysic/myriad/network/messages/FileMessage.html) to Stage 1's data ingestion pool's Actor reference (e.g. *akka://ROIPipeline/user/IngestorPool#141020403*) will analyze the new data file.
Where To Go From Here
=====================
1. Try adjusting the number of workers in each pool and examining what effect the new configuration has on throughput.
2. If you have access to [Trainer](http://myrdocs.azurewebsites.net/trainer/), try training the bundled model on your data to see if you get better results.
3. If you have access to [Desktop](http://myrdocs.azurewebsites.net/desktop/), try starting this server app on one machine and using Myriad's [remoting](http://doc.akka.io/docs/akka/snapshot/scala/remoting.html) feature to have Desktop call it for one or more stages of your data processing.
4. Have a look at [other code samples](https://emphysic.com/myriad/sample-code/) and the current [Myriad API](http://myrdocs.azurewebsites.net/api/) and get coding!
License
=======
Myriad, Myriad Trainer, Myriad Desktop, and this demonstration of Myriad are all licensed under the [Apache 2.0 license](https://www.apache.org/licenses/LICENSE-2.0).
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.emphysic.myriad</groupId>
<artifactId>MyriadDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>allinone</shadedClassifierName>
<artifactSet>
<includes>
<include>*:*</include>
</includes>
</artifactSet>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<manifestEntries>
<Main-Class>com.emphysic.myriad.ui.desktop.MyriadDesktop</Main-Class>
</manifestEntries>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<!-- Build an executable JAR -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>com.emphysic.myriad.demo.ROIPipeline</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>prepare-package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.emphysic.myriad</groupId>
<artifactId>core</artifactId>
<version>1.0-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.emphysic.myriad</groupId>
<artifactId>network</artifactId>
<version>1.0-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.10</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.1.3</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<version>0.9.10</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
/*
* com.emphysic.myriad.core.demo.ROIPipeline
*
* Copyright (c) 2016 Emphysic LLC.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.emphysic.myriad.demo;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.emphysic.myriad.core.data.ops.GaussianBlur;
import com.emphysic.myriad.core.data.ops.GaussianPyramidOperation;
import com.emphysic.myriad.core.data.ops.SobelOperation;
import com.emphysic.myriad.core.data.roi.PassiveAggressiveROIFinder;
import com.emphysic.myriad.core.data.roi.ROIFinder;
import com.emphysic.myriad.network.*;
import com.emphysic.myriad.network.messages.FileMessage;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import lombok.extern.slf4j.Slf4j;
import java.io.File;
import java.io.IOException;
import java.net.URL;
/**
* Demonstrates a distributed ROI processing pipeline. Analogous to the single-process PipelineDemo example.
*
* 1. Ingest data
* 2. Run a Gaussian Pyramid operation on the input for scale invariance
* 2a. (Optional) perform any preprocessing desired on each step in the pyramid in 2
* 3. For each step in the pyramid in 2(a), run a sliding window operation
* 4. For each sliding window in 3, ask an ROI detector whether the window contains ROI or not
* 5. Compile the results
*
* Created by ccoughlin on 10/25/16.
*/
@Slf4j
public class ROIPipeline {
/**
* Main Akka system
*/
private ActorSystem system;
/**
* Akka configuration
*/
private final Config config;
/**
* Reference to the start of the pipeline, a data ingestion pool
*/
private ActorRef ingestor;
/**
* Number of workers in the data ingestion stage (defaults to 1).
*/
private int numIngestors = 1;
/**
* Number of Gaussian pyramid workers in the scaling stage (defaults to 2).
*/
private int numScalers = 2;
/**
* Number of preprocessor workers in the preprocessing stage (defaults to 2).
*/
private int numPreprocessors = 2;
/**
* Number of sliding window workers in the sliding window stage (defaults to 4).
*/
private int numSliders = 4;
/**
* Number of ROI detector workers in the ROI detection stage (defaults to 8).
*/
private int numDetectors = 8;
/**
* Number of ROI reporting workers in the reporting stage (defaults to 8).
*/
private int numReporters = 8;
/**
* Radius of blur operation in scaling stage (defaults to 5).
*/
private int blurRadius = 5;
/**
* Ratio of scales between subsequent pyramid steps in the scaling stage (defaults to 2 i.e. each step is 1/2
* the size of the previous step).
*/
private int scaleFactor = 2;
/**
* Cutoff size in scaling stage (defaults to 1 i.e. when any dimension of the current step is only 1 element pyramid
* is complete).
*/
private int scaleLimit = 1;
/**
* Width in points of the sliding window (defaults to 15).
*/
private int windowWidth = 15;
/**
* Height in points of the sliding window (defaults to 15).
*/
private int windowHeight = 15;
/**
* Step size in points of the sliding window (defaults to 5 i.e. a window is taken every 5 points).
*/
private int windowStep = 5;
/**
* The ROI finder to use
*/
private ROIFinder roiFinder;
public ROIPipeline(Config config) {
this.config = config;
system = ActorSystem.create("ROIPipeline", config);
}
/**
* Constructs the processing pipeline.
* @return true if configuration of each stage was successful, false otherwise.
*/
public boolean startup() {
try {
log.info("Creating processing pipeline");
ingestor = system.actorOf(Props.create(DataIngestorPool.class, numIngestors), "IngestorPool");
log.info("Ingestor pool of " + numIngestors + " workers created");
ActorRef scaler = system.actorOf(
Props.create(PyramidActorPool.class,
numScalers,
new GaussianPyramidOperation(
new GaussianBlur(blurRadius),
scaleFactor,
scaleLimit)
),
"ScalerPool"
);
log.info("Scale space pool of " + numScalers + " workers created");
ActorRef preprocessor = system.actorOf(
Props.create(DatasetOperationPool.class, numPreprocessors, new SobelOperation()),
"PreprocessorPool"
);
log.info("Preprocessor pool of " + numPreprocessors + " workers created");
ActorRef slider = system.actorOf(
Props.create(SlidingWindowPool.class,
numSliders,
windowStep,
windowWidth,
windowHeight
),
"SliderPool"
);
log.info("Slider pool of " + numSliders + " workers created");
ActorRef roidetection = system.actorOf(
Props.create(ROIFinderPool.class,
numDetectors,
loadROIFinder()
),
"ROIFinderPool"
);
log.info("ROI detector pool of " + numDetectors + " workers created");
ActorRef reporting = system.actorOf(
Props.create(ReporterActorPool.class,
numReporters
),
"ReportingPool"
);
log.info("Reporting pool of " + numReporters + " workers created");
// Connect the processing stages
log.info("Connecting pipeline stages");
ActorRef guardian = system.guardian();
ingestor.tell(scaler, guardian);
scaler.tell(preprocessor, guardian);
preprocessor.tell(slider, guardian);
slider.tell(roidetection, guardian);
roidetection.tell(reporting, guardian);
log.info("Pipeline constructed, ready to receive inputs");
return true;
} catch (IllegalAccessException | IOException | InstantiationException ioe) {
log.error("Unable to read model file, error was: " + ioe.getMessage());
} catch (Exception e) {
log.error("An error occurred constructing the pipeline: " + e.getMessage());
}
return false;
}
/**
* Adds a file for processing
* @param f name of file to ingest
*/
public void ingest(File f) {
if (ingestor != null) {
log.info("Sending " + f + " through pipeline");
ingestor.tell(new FileMessage(f), system.guardian());
} else {
log.error("No ingestor configured - are you sure you called startup() ?");
}
}
/**
* Adds a file for processing
* @param f pathname of file to ingest
*/
public void ingest(String f) {
ingest(new File(f));
}
/**
* Convenience method for loading the C-scan damage detection model.
* @return model instance
* @throws InstantiationException error instantiating the ROIFinder (abstract, interface, etc.)
* @throws IllegalAccessException constructor isn't accessible
* @throws IOException if an I/O error occurs reading the input file
*/
private static ROIFinder loadROIFinder() throws IllegalAccessException, IOException, InstantiationException {
URL earl = Thread.currentThread().getContextClassLoader().getResource("models/sobel_cscan_model.myr");
File modelFile = new File(earl.getPath());
assert (modelFile.exists());
return ROIFinder.fromFile(modelFile, PassiveAggressiveROIFinder.class);
}
/**
* Demonstration of running - ingests input files from the command line to search for ROI, then waits for more input.
* @param args list of files to search
* @throws Exception if an error occurs
*/
public static void main(String[] args) throws Exception {
Config config = ConfigFactory.load();
ROIPipeline demo = new ROIPipeline(config);
boolean ready = demo.startup();
if (ready) {
for (String input : args) {
demo.ingest(input);
}
} else {
System.out.println("Unable to construct pipeline, please check log files for further details.");
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment