Commit 91c1717a authored by MartinFIT's avatar MartinFIT

Update - see description

- Refactored package for Handlers
- Removed TestDatabaseSpringBoot
- PerformanceData.xlsx
parent 7f18b8c0
package cz.vutbr.fit.communication.consumer.handler;
package cz.vutbr.fit.communication.service.handler;
import cz.vutbr.fit.communication.command.Command;
import org.slf4j.Logger;
......
package cz.vutbr.fit;
import cz.vutbr.fit.communication.KafkaCriteria;
import cz.vutbr.fit.communication.MetadataOperation;
import cz.vutbr.fit.distributedrepository.service.pcap.dumper.PcapDumper;
import cz.vutbr.fit.distributedrepository.service.pcap.dumper.pcap4j.DumperImpl;
import cz.vutbr.fit.distributedrepository.util.JavaEnvironment;
import cz.vutbr.fit.persistence.cassandra.entity.CassandraPacket;
import cz.vutbr.fit.persistence.cassandra.repository.PacketRepository;
import cz.vutbr.fit.persistence.mongodb.entity.PacketMetadata;
import cz.vutbr.fit.persistence.mongodb.repository.PacketMetadataRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.Banner;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.data.mongodb.core.query.Criteria;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
//@SpringBootApplication
//@EnableAutoConfiguration(exclude = {DataSourceAutoConfiguration.class})
public class TestDatabaseSpringBoot implements CommandLineRunner {
private static final Logger LOGGER = LoggerFactory.getLogger(TestDatabaseSpringBoot.class);
static {
JavaEnvironment.SetUp();
}
@Autowired
PacketRepository packetRepository;
@Autowired
PacketMetadataRepository packetMetadataRepository;
private PcapDumper<byte[]> pcapDumper = new DumperImpl();
private long packetsToLoad;
private long packetsLoaded;
public static void main(String[] args) {
new SpringApplicationBuilder(TestDatabaseSpringBoot.class)
.web(WebApplicationType.NONE)
.bannerMode(Banner.Mode.OFF)
.build()
.run(args);
}
private void handleFailure(Throwable throwable) {
LOGGER.error(throwable.getMessage(), throwable);
}
private Criteria prepareCriteria(List<KafkaCriteria> kafkaCriterias) {
Criteria criteria = new Criteria();
kafkaCriterias.forEach(
kafkaCriteria ->
packetMetadataRepository.appendCriteria(
criteria,
kafkaCriteria.getField(),
kafkaCriteria.getOperation().getOperationAsString(),
kafkaCriteria.getOperation().isArrayRequired(),
kafkaCriteria.getValue(),
kafkaCriteria.getValues(),
this::handleFailure)
);
return criteria;
}
public void loadPacketsByCriteria(Criteria criteria) {
packetsToLoad = 0;
packetsLoaded = 0;
pcapDumper.initDumper("file.pcap", this::handleFailure);
packetsToLoad = packetMetadataRepository.findByDynamicCriteria(criteria)
.doOnError(this::handleFailure)
.doOnNext(this::loadPacket)
.count().block();
if (packetsToLoad == 0) {
LOGGER.debug("Zero packets loaded, closing dumper.");
pcapDumper.closeDumper();
}
}
private void loadPacket(PacketMetadata packetMetadata) {
packetRepository.selectAsync(packetMetadata.getRefId(),
cassandraPacket -> {
dumpPacket(cassandraPacket, packetMetadata.getTimestamp());
packetsLoaded++;
if (loadingFinished()) {
onFinishLoaded();
}
});
}
private void dumpPacket(CassandraPacket packet, Instant timestamp) {
pcapDumper.dumpOutput(packet.getPacket().array(), timestamp, this::handleFailure);
}
private boolean loadingFinished() {
return packetsToLoad != 0 && packetsToLoad == packetsLoaded;
}
private void onFinishLoaded() {
LOGGER.debug(String.format("Packets to load: %d, successfully loaded %d packets, " +
"closing dumper.", packetsToLoad, packetsLoaded));
pcapDumper.closeDumper();
}
private Criteria ipv4Criteria() {
List<KafkaCriteria> criteria = new ArrayList<>();
KafkaCriteria ipVersionName = new KafkaCriteria.Builder()
.field("ipVersionName")
.operation(MetadataOperation.EQ)
.value("IPv4")
.build();
KafkaCriteria dstIpAddress = null;
try {
dstIpAddress = new KafkaCriteria.Builder()
.field("dstIpAddress")
.operation(MetadataOperation.EQ)
.value(InetAddress.getByName("192.168.1.1").getHostAddress())
.build();
} catch (UnknownHostException e) {
e.printStackTrace();
}
criteria.add(ipVersionName);
criteria.add(dstIpAddress);
Criteria mongoCriteria = prepareCriteria(criteria);
return mongoCriteria;
}
private Criteria ipv6Criteria() {
List<KafkaCriteria> criteria = new ArrayList<>();
KafkaCriteria ipVersionName = new KafkaCriteria.Builder()
.field("ipVersionName")
.operation(MetadataOperation.EQ)
.value("IPv6")
.build();
KafkaCriteria dstIpAddress = null;
try {
dstIpAddress = new KafkaCriteria.Builder()
.field("dstIpAddress")
.operation(MetadataOperation.EQ)
.value(InetAddress.getByName("ff02::c").getHostAddress())
.build();
} catch (UnknownHostException e) {
e.printStackTrace();
}
criteria.add(ipVersionName);
criteria.add(dstIpAddress);
Criteria mongoCriteria = prepareCriteria(criteria);
return mongoCriteria;
}
private Criteria tcpAndPortCriteria() {
List<KafkaCriteria> criteria = new ArrayList<>();
KafkaCriteria tcp = new KafkaCriteria.Builder()
.field("ipProtocolName").operation(MetadataOperation.EQ).value("TCP").build();
KafkaCriteria ports = new KafkaCriteria.Builder()
.field("dstPort").operation(MetadataOperation.IN).values(Arrays.asList(443, 80)).build();
criteria.add(tcp);
criteria.add(ports);
Criteria mongoCriteria = prepareCriteria(criteria);
return mongoCriteria;
}
@Override
public void run(String... strings) throws Exception {
Criteria mongoCriteria = ipv6Criteria();
loadPacketsByCriteria(mongoCriteria);
LOGGER.debug(mongoCriteria.getCriteriaObject().toJson());
}
}
\ No newline at end of file
......@@ -6,7 +6,6 @@ import org.pcap4j.core.*;
import org.pcap4j.packet.namednumber.DataLinkType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import java.io.IOException;
......@@ -15,9 +14,7 @@ import java.io.IOException;
* The method loadAndSavePcapFile loops all packets inside pcap file, each packet
* is dumped inside output pcap file using dumper.dumpRaw(packet.getRawData()).
*/
//@SpringBootApplication
//@EnableAutoConfiguration(exclude = {DataSourceAutoConfiguration.class})
public class TestPcap4J implements CommandLineRunner {
public class TestPcap4J {
private static final Logger LOGGER = LoggerFactory.getLogger(TestPcap4J.class);
......@@ -39,9 +36,7 @@ public class TestPcap4J implements CommandLineRunner {
public void loadAndSavePcapFile(String input) throws IOException {
ParserImpl pcapParser = new ParserImpl();
pcapParser.parseInput(input, this::doOnPacket, () -> {
LOGGER.debug("Completed");
}, TestPcap4J::handleError);
pcapParser.parseInput(input, this::doOnPacket, () -> LOGGER.debug("Completed"), TestPcap4J::handleError);
closeResources();
}
......@@ -64,20 +59,10 @@ public class TestPcap4J implements CommandLineRunner {
}
public static void main(String[] args) {
/*new SpringApplicationBuilder(TestPcap4J.class)
.web(WebApplicationType.NONE)
.bannerMode(Banner.Mode.OFF)
.build()
.run(args);*/
TestPcap4J testPcap4J = new TestPcap4J();
testPcap4J.runApp(args);
}
@Override
public void run(String... args) throws Exception {
runApp(args);
}
public void runApp(String... args) {
if (args.length != 2) {
System.exit(1);
......
......@@ -2,7 +2,7 @@ package cz.vutbr.fit.distributedrepository;
import cz.vutbr.fit.communication.KafkaRequest;
import cz.vutbr.fit.communication.command.Command;
import cz.vutbr.fit.communication.consumer.handler.HandlerManager;
import cz.vutbr.fit.communication.service.handler.HandlerManager;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......
......@@ -2,7 +2,7 @@ package cz.vutbr.fit.distributedrepository.beans;
import cz.vutbr.fit.communication.KafkaRequest;
import cz.vutbr.fit.communication.command.Command;
import cz.vutbr.fit.communication.consumer.handler.HandlerManager;
import cz.vutbr.fit.communication.service.handler.HandlerManager;
import cz.vutbr.fit.distributedrepository.communication.consumer.handler.LoadPcapHandler;
import cz.vutbr.fit.distributedrepository.communication.consumer.handler.StorePcapHandler;
import org.springframework.beans.factory.annotation.Autowired;
......
......@@ -3,7 +3,7 @@ package cz.vutbr.fit.distributedrepository.communication.consumer.handler;
import cz.vutbr.fit.communication.KafkaRequest;
import cz.vutbr.fit.communication.KafkaResponse;
import cz.vutbr.fit.communication.ResponseCode;
import cz.vutbr.fit.communication.consumer.handler.ICommandHandler;
import cz.vutbr.fit.communication.service.handler.ICommandHandler;
import cz.vutbr.fit.distributedrepository.communication.producer.ResponseProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.hadoop.fs.FsShell;
......
......@@ -32,6 +32,8 @@ public class LoadPcapHandler extends BaseHandler {
@Autowired
private PcapDumper<byte[]> pcapDumper;
private String resultingPcapFile;
private long packetsToLoad;
private long packetsLoaded;
......@@ -51,7 +53,8 @@ public class LoadPcapHandler extends BaseHandler {
packetsToLoad = 0;
packetsLoaded = 0;
pcapDumper.initDumper(request.getDataSource().getUri(), this::handleFailure);
resultingPcapFile = request.getDataSource().getUri();
pcapDumper.initDumper(resultingPcapFile, this::handleFailure);
Criteria criteria = prepareCriteria(request.getCriterias());
packetsToLoad = packetMetadataRepository.findByDynamicCriteria(criteria)
......@@ -60,8 +63,7 @@ public class LoadPcapHandler extends BaseHandler {
.count().block();
if (packetsToLoad == 0) {
LOGGER.debug("Zero packets loaded, closing dumper.");
pcapDumper.closeDumper();
onFinishLoaded();
}
}
......@@ -91,7 +93,7 @@ public class LoadPcapHandler extends BaseHandler {
pcapDumper.closeDumper();
String localFile, dstFile;
localFile = dstFile = request.getDataSource().getUri();
localFile = dstFile = resultingPcapFile;
storePayloadIntoHDFS(localFile, dstFile);
removeTmpFile(localFile);
......
......@@ -65,7 +65,6 @@ public class StorePcapHandler extends BaseHandler {
@PostConstruct
public void init() {
postConstructValidation();
LOGGER.debug("StorePcapHandler initialized");
}
private void postConstructValidation() {
......@@ -80,12 +79,9 @@ public class StorePcapHandler extends BaseHandler {
bufferRequest(request);
storePayload(value);
processPackets();
removePayload();
if (request.getAwaitsResponse()) {
String detailMessage = String.format("Successfully stored %d packets", count);
sendAcknowledgement(buildSuccessResponse(request, detailMessage), new byte[]{});
}
// Packets are processed, PCAP file can be removed
removePayload();
} catch (Exception exception) {
handleFailure(exception);
......@@ -109,7 +105,7 @@ public class StorePcapHandler extends BaseHandler {
count = 0;
packetMetadataList = new ArrayList<>(maxListSize);
pcapParser.parseInput(processedTmpFile, this::processPacket, this::storeMetadata, this::handleFailure);
pcapParser.parseInput(processedTmpFile, this::processPacket, this::packetsProcessed, this::handleFailure);
LOGGER.debug("Packets processed successfully.");
}
......@@ -157,6 +153,17 @@ public class StorePcapHandler extends BaseHandler {
sendAcknowledgement(buildFailureResponse(request, throwable.getMessage()), new byte[]{});
}
private void packetsProcessed() {
// Last bulk of metadata records
storeMetadata();
// If client awaits response, it will be sent
if (request.getAwaitsResponse()) {
String detailMessage = String.format("Successfully stored %d packets", count);
sendAcknowledgement(buildSuccessResponse(request, detailMessage), new byte[]{});
}
}
private void removePayload() {
DataSource dataSource = request.getDataSource();
boolean isHadoopDataSource = DataSourceStorage.HDFS == dataSource.getDataSourceStorage();
......
......@@ -22,7 +22,7 @@ public class FileManager {
}
public static String GenerateTmpPath(String tmpDirectory) {
return tmpDirectory + UUID.randomUUID() + FileExtension.PCAP;
return tmpDirectory + UUID.randomUUID();
}
}
<component name="libraryTable">
<library name="Maven: org.apache.commons:commons-io:1.3.2">
<CLASSES>
<root url="jar://$MAVEN_REPOSITORY$/org/apache/commons/commons-io/1.3.2/commons-io-1.3.2.jar!/" />
</CLASSES>
<JAVADOC>
<root url="jar://$MAVEN_REPOSITORY$/org/apache/commons/commons-io/1.3.2/commons-io-1.3.2-javadoc.jar!/" />
</JAVADOC>
<SOURCES>
<root url="jar://$MAVEN_REPOSITORY$/org/apache/commons/commons-io/1.3.2/commons-io-1.3.2-sources.jar!/" />
</SOURCES>
</library>
</component>
\ No newline at end of file
......@@ -78,11 +78,6 @@
<artifactId>commons-lang3</artifactId>
<version>3.5</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-io</artifactId>
<version>1.3.2</version>
</dependency>
<!-- Log4J -->
<dependency>
......
......@@ -141,7 +141,6 @@
<orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-mapreduce-client-shuffle:2.7.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-yarn-server-nodemanager:2.7.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.commons:commons-lang3:3.5" level="project" />
<orderEntry type="library" name="Maven: org.apache.commons:commons-io:1.3.2" level="project" />
<orderEntry type="library" name="Maven: log4j:log4j:1.2.17" level="project" />
</component>
</module>
\ No newline at end of file
......@@ -2,7 +2,7 @@ package cz.vutbr.fit.producerdemo;
import cz.vutbr.fit.communication.KafkaResponse;
import cz.vutbr.fit.communication.command.Command;
import cz.vutbr.fit.communication.consumer.handler.HandlerManager;
import cz.vutbr.fit.communication.service.handler.HandlerManager;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
......
......@@ -2,7 +2,7 @@ package cz.vutbr.fit.producerdemo;
import cz.vutbr.fit.communication.KafkaResponse;
import cz.vutbr.fit.communication.command.Command;
import cz.vutbr.fit.communication.consumer.handler.HandlerManager;
import cz.vutbr.fit.communication.service.handler.HandlerManager;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
......
......@@ -2,9 +2,9 @@ package cz.vutbr.fit.producerdemo.beans;
import cz.vutbr.fit.communication.KafkaResponse;
import cz.vutbr.fit.communication.command.Command;
import cz.vutbr.fit.communication.consumer.handler.HandlerManager;
import cz.vutbr.fit.producerdemo.communication.consumer.handler.AcknowledgementConsumerHandler;
import cz.vutbr.fit.producerdemo.communication.consumer.handler.ErrorConsumerHandler;
import cz.vutbr.fit.communication.service.handler.HandlerManager;
import cz.vutbr.fit.producerdemo.service.handler.AcknowledgementConsumerHandler;
import cz.vutbr.fit.producerdemo.service.handler.ErrorConsumerHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
......
package cz.vutbr.fit.producerdemo.communication.consumer.handler;
package cz.vutbr.fit.producerdemo.service.handler;
import cz.vutbr.fit.communication.KafkaResponse;
import cz.vutbr.fit.communication.consumer.handler.ICommandHandler;
import cz.vutbr.fit.communication.service.handler.ICommandHandler;
import cz.vutbr.fit.producerdemo.stats.CollectStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......
package cz.vutbr.fit.producerdemo.communication.consumer.handler;
package cz.vutbr.fit.producerdemo.service.handler;
import cz.vutbr.fit.communication.KafkaResponse;
import cz.vutbr.fit.communication.consumer.handler.ICommandHandler;
import cz.vutbr.fit.communication.service.handler.ICommandHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
......
package cz.vutbr.fit.producerdemo.communication.consumer.handler.beans;
package cz.vutbr.fit.producerdemo.service.handler.beans;
import cz.vutbr.fit.producerdemo.communication.consumer.handler.AcknowledgementConsumerHandler;
import cz.vutbr.fit.producerdemo.communication.consumer.handler.ErrorConsumerHandler;
import cz.vutbr.fit.producerdemo.service.handler.AcknowledgementConsumerHandler;
import cz.vutbr.fit.producerdemo.service.handler.ErrorConsumerHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
......
No preview for this file type
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment