Commit b7ebe525 authored by Chris Coughlin's avatar Chris Coughlin

Added support for normalization in Data Ingestors

parent 050854ed
......@@ -19,8 +19,9 @@
package com.emphysic.myriad.network;
import akka.actor.UntypedActor;
import com.emphysic.myriad.core.data.io.IODataset;
import com.emphysic.myriad.core.data.io.Dataset;
import com.emphysic.myriad.core.data.io.ImageDataset;
import com.emphysic.myriad.core.data.ops.NormalizeSignalOperation;
import com.emphysic.myriad.core.data.util.FileSniffer;
import com.emphysic.myriad.network.messages.DatasetMessage;
import com.emphysic.myriad.network.messages.FileMessage;
......@@ -33,6 +34,42 @@ import java.io.File;
*/
@Slf4j
public class DataIngestorActor extends UntypedActor {
private boolean normalize;
private NormalizeSignalOperation norm;
/**
* Constructor
* @param normalizeData true if data should be normalized between 0 and 1 after reading
*/
public DataIngestorActor(boolean normalizeData) {
normalize = normalizeData;
norm = new NormalizeSignalOperation();
}
/**
* Default constructor. Data are not normalized after reading.
*/
public DataIngestorActor() {
this(false);
}
/**
* Sets the data normalization policy.
* @param norm true if data should be normalized between 0 and 1 after reading, false if not.
*/
public void Normalize(boolean norm) {
normalize = norm;
}
/**
* Returns the current data normalization policy.
* @return true if data are normalized, false otherwise.
*/
public boolean Normalize() {
return normalize;
}
/**
* Handled Messages:
* FileMessage - attempts to read a Dataset from the message's payload, sends a DatasetMessage response back to
......@@ -42,8 +79,11 @@ public class DataIngestorActor extends UntypedActor {
public void onReceive(Object message) throws Exception {
if (message instanceof FileMessage) {
File f = ((FileMessage) message).getFile();
IODataset dataset = FileSniffer.read(f, true);
Dataset dataset = FileSniffer.read(f, true);
if (dataset != null) {
if (normalize) {
dataset = norm.run(dataset);
}
log.info(getSelf() + ": successfully read " + f + " sending to " + getSender());
DatasetMessage response = new DatasetMessage(dataset, ((FileMessage) message).getMetadata());
getSender().tell(response, getSelf());
......
......@@ -45,24 +45,49 @@ public class DataIngestorPool extends LinkedWorkerPool {
/**
* Creates a new pool.
* @param numWorkers number of workers to create
* @param normalize true if data should be normalized between 0 and 1
*/
public DataIngestorPool(int numWorkers) {
start(numWorkers, DataIngestorActor.class);
public DataIngestorPool(int numWorkers, boolean normalize) {
start(numWorkers, DataIngestorActor.class, normalize);
}
/**
* Creates a new pool from an existing pool.
* @param pool pool
* @param normalize true if data should be normalized between 0 and 1
*/
public DataIngestorPool(Pool pool) {
start(pool, DataIngestorActor.class);
public DataIngestorPool(Pool pool, boolean normalize) {
start(pool, DataIngestorActor.class, normalize);
}
/**
* Starts a new pool with the default number of workers.
*/
public DataIngestorPool(boolean normalize) {
start(DataIngestorPool.class, normalize);
}
/**
* Creates a new pool. Data are not normalized.
* @param numWorkers number of workers to create
*/
public DataIngestorPool(int numWorkers) {
this(numWorkers, false)
}
/**
* Creates a new pool from an existing pool. Data are not normalized.
* @param pool pool
*/
public DataIngestorPool(Pool pool) {
this(pool, false);
}
/**
* Starts a new pool with the default number of workers. Data are not normalized.
*/
public DataIngestorPool() {
start(DataIngestorPool.class);
this(false);
}
/**
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment