Commit f6a9ad88 authored by MartinFIT's avatar MartinFIT

ProducerDemo - handling empty directory, changed ipv6 criteria to tcpAndPort criteria

parent 9abc6765
......@@ -31,7 +31,7 @@ public class LoadPcapProducerDemo extends BaseProducerDemo {
UUID requestId = UUID.randomUUID();
DataSource dataSource = createDataSource(dataSourceStorage, requestId.toString(), Boolean.FALSE);
KafkaRequest request = buildKafkaRequestWithCriterias(dataSource, Command.LOAD_PCAP, requestId, ipv6Criteria());
KafkaRequest request = buildKafkaRequestWithCriterias(dataSource, Command.LOAD_PCAP, requestId, tcpAndPortCriteria());
byte[] payload = null;
producer.produce(inputTopic, request, payload,
......@@ -43,14 +43,6 @@ public class LoadPcapProducerDemo extends BaseProducerDemo {
}
}
private List<KafkaCriteria> onlyOneByRefId() {
List<KafkaCriteria> criteria = new ArrayList<>();
UUID refId = UUID.fromString("8659c8f5-313c-11e8-b0c6-77a556ade544");
KafkaCriteria refIdCriteria = new KafkaCriteria.Builder().field("refId").operation(MetadataOperation.EQ).value(refId).build();
criteria.add(refIdCriteria);
return criteria;
}
private List<KafkaCriteria> ipv6Criteria() {
List<KafkaCriteria> criteria = new ArrayList<>();
KafkaCriteria ipVersionName = new KafkaCriteria.Builder()
......@@ -65,8 +57,8 @@ public class LoadPcapProducerDemo extends BaseProducerDemo {
.operation(MetadataOperation.EQ)
.value(InetAddress.getByName("ff02:0:0:0:0:0:0:c").getHostAddress())
.build();
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (UnknownHostException exception) {
handleError(exception);
}
criteria.add(ipVersionName);
criteria.add(dstIpAddress);
......
......@@ -9,7 +9,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import java.io.File;
import java.util.Arrays;
......@@ -29,7 +28,12 @@ public class StorePcapProducerDemo extends BaseProducerDemo {
public void runMultipleProducer(String directoryName) {
File directory = new File(directoryName);
Assert.notNull(directory, "Directory doesn't exist");
if (!directory.isDirectory() || directory.listFiles().length == 0) {
LOGGER.warn(String.format("%s is not a directory or is empty.", directoryName));
LOGGER.warn("Stopping StorePcap UseCase...");
return;
}
// TODO: Will be removed
initStatsForDirectory(directory);
......
......@@ -989,7 +989,7 @@ Je ale potřeba mít dostatečnou velikost paměti RAM k~načtení aktivních p
Většina konfigurací, kde je Kafka nasazena, nevyžaduje značný výkon procesoru. Výjimkou je však případ, kdy je zapnuto \texttt{SSL}, požadavky na výkon CPU potom mohou vzrůst. Je doporučeno zvolit moderní procesor s~více jádry, běžný Kafka cluster využije 24 jader. Větší počet jader má větší význam než vyšší frekvence procesoru.
Je také doporučeno nainstalovat více disků pro získání optimální propustnosti a~nesdílet tyto disky s~jinými aplikacemi nebo aktivitami operačního systému, aby byla zaručena nízká latence \cite{kafkaBestPractises}. Jisté výhody mají i~SSD disky, případně zapojení disků v~\texttt{RAID}.
\noindent Je také doporučeno nainstalovat více disků pro získání optimální propustnosti a~nesdílet tyto disky s~jinými aplikacemi nebo aktivitami operačního systému, aby byla zaručena nízká latence \cite{kafkaBestPractises}. Jisté výhody mají i~SSD disky, případně zapojení disků v~\texttt{RAID}.
Důležitou součástí distribuovaného systému je rychlá a~spolehlivá síť. Nízká latence zajišťuje, aby mohly uzly jednoduše komunikovat. Moderní ethernetová připojení s~rychlostí 1 Gb/s, či 10 Gb/s jsou dostatečná. Nedoporučuje se provozovat clustery, jejichž uzly se nachází v~geograficky vzdálených lokacích.
......
No preview for this file type
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment