Commit de0e56f9 authored by Fletcher Haynes's avatar Fletcher Haynes
Browse files

Merge branch '33-send-join-message' into 'master'

Resolve "Nodes should be able to send a join message that doesn't trigger a full list of the cluster nodes in the response"

Closes #33

See merge request !34
parents e1bb0926 621e369e
Pipeline #35560821 passed with stages
in 18 minutes and 23 seconds
......@@ -2,3 +2,4 @@
/target
**/*.rs.bk
.idea
.node_id
#!/bin/bash
tmux kill-session -t iridium
\ No newline at end of file
#!/bin/bash
# Start a tmux session
tmux new-session -s iridium -d
tmux rename-window -t iridium "server"
tmux new-window -t iridium -n "client1"
tmux new-window -t iridium -n "client2"
tmux send-keys -t iridium:server "RUST_BACKTRACE=1 RUST_LOG=debug iridium --node-alias server" C-m
tmux send-keys -t iridium:client1 "RUST_LOG=debug iridium --server-bind-port 2255 --node-alias client1" C-m
tmux send-keys -t iridium:client2 "RUST_LOG=debug iridium --server-bind-port 2256 --node-alias client2" C-m
tmux send-keys -t iridium:server "!start_cluster" C-m
sleep 3
tmux send-keys -t iridium:client1 "!join_cluster localhost 2254" C-m
sleep 3
tmux send-keys -t iridium:client2 "!join_cluster localhost 2254" C-m
sleep 3
tmux attach -t iridium
......@@ -53,3 +53,8 @@ args:
required: false
takes_value: true
long: node-alias
- DAEMON_MODE:
help: Does not create a REPL, only a VM running in the background
required: false
takes_value: true
long: daemon-mode
use std::fs::File;
use std::io::prelude::*;
use std::io::Read;
use std::path::Path;
use std::sync::mpsc::Receiver;
......@@ -11,17 +12,20 @@ extern crate clap;
extern crate log;
extern crate bincode;
extern crate env_logger;
extern crate num_cpus;
extern crate uuid;
extern crate serde_derive;
extern crate iridium;
extern crate num_cpus;
extern crate serde;
extern crate serde_derive;
extern crate uuid;
use clap::App;
use iridium::assembler::Assembler;
use iridium::repl::REPL;
use iridium::vm::VM;
static NODE_ID_FILENAME: &'static str = ".node_id";
static DEFAULT_NODE_LISTEN_PORT: &'static str = "2254";
static DEFAULT_REMOTE_ACCESS_PORT: &'static str = "2244";
fn main() {
env_logger::init();
let mut _repl_receiver: Receiver<String>;
......@@ -29,6 +33,8 @@ fn main() {
let yaml = clap::load_yaml!("cli.yml");
let matches = App::from_yaml(yaml).get_matches();
let daemon_mode = matches.value_of("DAEMON_MODE").unwrap_or("false");
let data_root_dir = matches
.value_of("DATA_ROOT_DIR")
.unwrap_or("/var/lib/iridium/");
......@@ -39,16 +45,36 @@ fn main() {
};
if matches.is_present("ENABLE_REMOTE_ACCESS") {
let port = matches.value_of("LISTEN_PORT").unwrap_or("2244");
let port = matches.value_of("LISTEN_PORT").unwrap_or(DEFAULT_REMOTE_ACCESS_PORT);
let host = matches.value_of("LISTEN_HOST").unwrap_or("127.0.0.1");
start_remote_server(host.to_string(), port.to_string());
}
let alias = matches.value_of("NODE_ALIAS").unwrap_or("");
// Find or generate a unique node ID
let alias: String;
if matches.is_present("NODE_ALIAS") {
let cli_alias = matches.value_of("NODE_ALIAS").expect("NODE_ALIAS CLI flag present, but unable to get value");
alias = cli_alias.into();
} else {
alias = match iridium::cluster::alias::read_node_id(NODE_ID_FILENAME) {
Ok(read_alias) => {
read_alias
},
Err(_) => {
let new_alias = uuid::Uuid::new_v4().to_hyphenated().to_string();
if let Err(_) = iridium::cluster::alias::write_node_id(NODE_ID_FILENAME, &new_alias) {
std::process::exit(1);
}
new_alias
}
};
}
info!("Node ID is: {}", alias);
let server_addr = matches
.value_of("SERVER_LISTEN_HOST")
.unwrap_or("127.0.0.1");
let server_port = matches.value_of("SERVER_LISTEN_PORT").unwrap_or("2254");
let server_port = matches.value_of("SERVER_LISTEN_PORT").unwrap_or(DEFAULT_NODE_LISTEN_PORT);
let num_threads = match matches.value_of("THREADS") {
Some(number) => match number.parse::<usize>() {
......@@ -64,44 +90,49 @@ fn main() {
None => num_cpus::get(),
};
let target_file = matches.value_of("INPUT_FILE");
match target_file {
Some(filename) => {
let program = read_file(filename);
let mut asm = Assembler::new();
let mut vm = VM::new()
.with_alias(alias.to_string())
.with_cluster_bind(server_addr.into(), server_port.into());
vm.logical_cores = num_threads;
let program = asm.assemble(&program);
match program {
Ok(p) => {
vm.add_bytes(p);
let _events = vm.run();
println!("{:#?}", vm.registers);
std::process::exit(0);
if daemon_mode == "true" {
// TODO: Fill this in
} else {
let target_file = matches.value_of("INPUT_FILE");
match target_file {
Some(filename) => {
let program = read_file(filename);
let mut asm = Assembler::new();
let mut vm = VM::new()
.with_alias(alias.to_string())
.with_cluster_bind(server_addr.into(), server_port.into());
vm.logical_cores = num_threads;
let program = asm.assemble(&program);
match program {
Ok(p) => {
vm.add_bytes(p);
let _events = vm.run();
println!("{:#?}", vm.registers);
std::process::exit(0);
}
Err(_e) => {}
}
Err(_e) => {}
}
}
None => {
let mut vm = VM::new()
.with_alias(alias.to_string())
.with_cluster_bind(server_addr.into(), server_port.into());
let mut repl = REPL::new(vm);
let mut rx = repl.rx_pipe.take();
thread::spawn(move || {
let chan = rx.unwrap();
loop {
match chan.recv() {
Ok(msg) => {
println!("{}", msg);
None => {
debug!("Spawning REPL with alias {}", alias);
let mut vm = VM::new()
.with_alias(alias.to_string())
.with_cluster_bind(server_addr.into(), server_port.into());
let mut repl = REPL::new(vm);
let mut rx = repl.rx_pipe.take();
thread::spawn(move || {
let chan = rx.unwrap();
loop {
match chan.recv() {
Ok(msg) => {
println!("{}", msg);
}
Err(_e) => {}
}
Err(_e) => {}
}
}
});
repl.run();
});
repl.run();
}
}
}
}
......
use std::path::PathBuf;
use std::fs::File;
use std::io::{Read, Error, Write};
pub fn read_node_id(path: &str) -> Result<String, Error> {
let path = PathBuf::from(path);
let mut alias = String::new();
debug!("Reading node_id from file");
let mut f = File::open(path)?;
match f.read_to_string(&mut alias) {
Ok(bytes) => {
debug!("Read node_id from file. {} bytes.", bytes);
Ok(alias)
}
Err(e) => {
error!("Error reading node ID from disk: {}", e);
Err(e)
}
}
}
pub fn write_node_id(path: &str, alias: &str) -> Result<(), Error> {
let path = PathBuf::from(path);
let mut f = File::create(path)?;
match f.write_all(alias.as_bytes()) {
Ok(_) => {
info!("Node ID {} from CLI written to disk", alias);
Ok(())
}
Err(e) => {
error!("Error writing node ID to disk: {}", e);
Err(e)
}
}
}
\ No newline at end of file
......@@ -3,16 +3,21 @@ use std::io::{BufReader, BufWriter};
use std::net::TcpStream;
use std::sync::mpsc::channel;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, RwLock};
use std::thread;
use bincode;
use cluster::message::IridiumMessage;
use cluster::{NodeInfo};
use cluster::manager::Manager;
#[derive(Debug)]
pub struct ClusterClient {
alias: Option<String>,
pub reader: BufReader<TcpStream>,
pub writer: BufWriter<TcpStream>,
pub connection_manager: Arc<RwLock<Manager>>,
pub bind_port: Option<String>,
rx: Option<Arc<Mutex<Receiver<String>>>>,
_tx: Option<Arc<Mutex<Sender<String>>>>,
pub raw_stream: TcpStream,
......@@ -20,18 +25,20 @@ pub struct ClusterClient {
impl ClusterClient {
/// Creates and returns a new ClusterClient that wraps TcpStreams for communicating with it
pub fn new(stream: TcpStream) -> ClusterClient {
pub fn new(stream: TcpStream, manager: Arc<RwLock<Manager>>, bind_port: String) -> ClusterClient {
// TODO: Handle this better
let reader = stream.try_clone().unwrap();
let writer = stream.try_clone().unwrap();
let (tx, rx) = channel();
ClusterClient {
connection_manager: manager,
reader: BufReader::new(reader),
writer: BufWriter::new(writer),
raw_stream: stream,
_tx: Some(Arc::new(Mutex::new(tx))),
rx: Some(Arc::new(Mutex::new(rx))),
alias: None,
bind_port: Some(bind_port),
}
}
......@@ -44,9 +51,9 @@ impl ClusterClient {
pub fn send_hello(&mut self) {
match self.alias {
Some(ref alias) => {
if let Ok(mut hello) = IridiumMessage::hello(alias) {
if let Ok(mut hello) = IridiumMessage::hello(alias, &self.bind_port.clone().unwrap()) {
if self.raw_stream.write_all(&hello).is_ok() {
trace!("Hello sent: {:?}", hello);
trace!("Hello sent: {:#?}", hello);
} else {
error!("Error sending hello");
}
......@@ -74,6 +81,14 @@ impl ClusterClient {
}
}
pub fn remote_ip_as_string(&self) -> Option<String> {
if let Ok(addr) = self.raw_stream.peer_addr() {
Some(addr.ip().to_string())
} else {
None
}
}
pub fn port_as_string(&self) -> Option<String> {
if let Ok(addr) = self.raw_stream.local_addr() {
Some(addr.port().to_string())
......@@ -82,18 +97,26 @@ impl ClusterClient {
}
}
pub fn remote_port_as_string(&self) -> Option<String> {
if let Ok(addr) = self.raw_stream.peer_addr() {
Some(addr.port().to_string())
} else {
None
}
}
#[allow(dead_code)]
fn w(&mut self, msg: &str) -> bool {
match self.writer.write_all(msg.as_bytes()) {
Ok(_) => match self.writer.flush() {
Ok(_) => true,
Err(e) => {
println!("Error flushing to client: {}", e);
error!("Error flushing to client: {}", e);
false
}
},
Err(e) => {
println!("Error writing to client: {}", e);
error!("Error writing to client: {}", e);
false
}
}
......@@ -104,11 +127,11 @@ impl ClusterClient {
Ok(_) => match self.writer.flush() {
Ok(_) => {}
Err(e) => {
println!("Error flushing to client: {}", e);
error!("Error flushing to client: {}", e);
}
},
Err(e) => {
println!("Error writing to client: {}", e);
error!("Error writing to client: {}", e);
}
}
}
......@@ -159,7 +182,30 @@ impl ClusterClient {
ref nodes,
ref alias,
} => {
debug!("Received list of nodes: {:?} from {:?}", nodes, alias);
let join_message: std::result::Result<std::vec::Vec<u8>, std::boxed::Box<bincode::ErrorKind>>;
if let Some(ref alias) = self.alias_as_string() {
join_message = IridiumMessage::join(&alias, &self.port_as_string().unwrap());
} else {
error!("Unable to get my own alias to send a join message to other cluster members");
continue;
}
let join_message = join_message.unwrap();
for node in nodes {
let remote_alias = &node.0;
let remote_ip = &node.1;
let remote_port = &node.2;
let addr = remote_ip.to_owned() + ":" + remote_port;
if let Ok(stream) = TcpStream::connect(addr) {
let mut cluster_client = ClusterClient::new(stream, self.connection_manager.clone(), self.bind_port.clone().unwrap());
cluster_client.write_bytes(&join_message);
if let Ok(mut cm) = self.connection_manager.write() {
let client_tuple = (remote_alias.to_string(), cluster_client.ip_as_string().unwrap(), cluster_client.port_as_string().unwrap());
cm.add_client(client_tuple, cluster_client);
}
} else {
error!("Unable to establish connection to: {:?}", node);
}
}
}
_ => {
error!("Unknown message received");
......@@ -173,4 +219,4 @@ impl ClusterClient {
}
}
}
}
}
\ No newline at end of file
use cluster::client::ClusterClient;
use cluster::NodeAlias;
use cluster::{NodeAlias, NodeInfo};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::thread;
#[derive(Default)]
#[derive(Default, Debug)]
pub struct Manager {
pub clients: HashMap<NodeAlias, Arc<RwLock<ClusterClient>>>,
pub clients: HashMap<NodeInfo, Arc<RwLock<ClusterClient>>>,
}
impl Manager {
......@@ -16,7 +16,7 @@ impl Manager {
}
}
pub fn add_client(&mut self, alias: NodeAlias, client: ClusterClient) -> bool {
pub fn add_client(&mut self, alias: NodeInfo, client: ClusterClient) -> bool {
if self.clients.contains_key(&alias) {
error!("Tried to add a client that already existed");
return false;
......@@ -30,19 +30,19 @@ impl Manager {
true
}
pub fn get_client(&mut self, alias: NodeAlias) -> Option<Arc<RwLock<ClusterClient>>> {
pub fn get_client(&mut self, alias: NodeInfo) -> Option<Arc<RwLock<ClusterClient>>> {
Some(self.clients.get_mut(&alias).unwrap().clone())
}
pub fn del_client(&mut self, alias: &NodeAlias) {
pub fn del_client(&mut self, alias: &NodeInfo) {
self.clients.remove(alias);
}
pub fn get_client_names(&self) -> Vec<String> {
pub fn get_client_names(&self) -> Vec<NodeInfo> {
debug!("Getting client names");
let mut results = vec![];
for alias in self.clients.keys() {
results.push(alias.to_owned());
results.push((alias.0.to_owned(), alias.1.to_string(), alias.2.to_string()));
}
results
}
......
......@@ -4,39 +4,59 @@ use std::sync::{Arc, RwLock};
use bincode::*;
use cluster::client::ClusterClient;
use cluster::NodeAlias;
use cluster::{NodeAlias, NodeInfo, NodePort};
#[derive(Serialize, Deserialize, Debug)]
/// These are the message types that cluster nodes can exchange between themselves
pub enum IridiumMessage {
Hello {
alias: String,
alias: NodeAlias,
port: NodePort,
},
HelloAck {
alias: (String, String, String),
nodes: Vec<(String, String, String)>,
alias: NodeInfo,
nodes: Vec<NodeInfo>,
},
Join {
alias: NodeAlias,
port: NodePort,
}
}
impl IridiumMessage {
pub fn join(alias: &str, port: &str) -> Result<Vec<u8>> {
trace!("Generating join message!");
let new_message = IridiumMessage::Join {
alias: alias.into(),
port: port.into(),
};
serialize(&new_message)
}
/// Creates and serializes a Hello message
pub fn hello(alias: &str) -> Result<Vec<u8>> {
pub fn hello(alias: &str, port: &str) -> Result<Vec<u8>> {
trace!("Generating hello message");
let new_message = IridiumMessage::Hello {
alias: alias.into(),
port: port.into(),
};
serialize(&new_message)
}
/// Creates and serializes a HelloAck message, whch sends back a list of all cluster nodes to the sender
pub fn hello_ack(clients: &HashMap<NodeAlias, Arc<RwLock<ClusterClient>>>) -> Result<Vec<u8>> {
let _results: Vec<(String, String, String)> = Vec::new();
for (_key, value) in clients.iter() {
pub fn hello_ack(myself: NodeInfo, clients: &HashMap<NodeAlias, Arc<RwLock<ClusterClient>>>) -> Result<Vec<u8>> {
trace!("Generating helloack message");
let mut results: Vec<NodeInfo> = Vec::new();
for (key, value) in clients.iter() {
if let Ok(client_data) = value.read() {
let _client_tuple = (client_data.alias_as_string(),);
results.push((key.to_string(), client_data.ip_as_string().unwrap(), client_data.port_as_string().unwrap()));
}
}
Ok(Vec::new())
let new_message = IridiumMessage::HelloAck {
alias: myself,
nodes: results,
};
serialize(&new_message)
}
pub fn process_message(message: &[u8]) -> Result<IridiumMessage> {
......@@ -47,3 +67,4 @@ impl IridiumMessage {
#[cfg(test)]
mod test {}
......@@ -2,5 +2,9 @@ pub mod client;
pub mod manager;
pub mod message;
pub mod server;
pub mod alias;
type NodeAlias = String;
type NodeIP = String;
type NodePort = String;
type NodeInfo = (NodeAlias, NodeIP, NodePort);
......@@ -15,15 +15,15 @@ pub fn listen(my_alias: String, addr: SocketAddr, connection_manager: Arc<RwLock
info!("New Node connected!");
let stream = stream.unwrap();
thread::spawn(move || {
let mut client = ClusterClient::new(stream);
debug!("Handling an incoming connection on a thread");
let mut client = ClusterClient::new(stream, cmgr.clone(), addr.port().to_string());
let result: bincode::Result<IridiumMessage> =
bincode::deserialize_from(&mut client.reader);
match result {
Ok(message) => {
match message {
IridiumMessage::Hello { alias } => {
debug!("Found a hello message with alias: {:?}", alias);
let mut cmgr_lock = cmgr.write().unwrap();
IridiumMessage::Hello { alias, port } => {
debug!("Received hello");
let mut members: Vec<(
String,
String,
......@@ -31,16 +31,21 @@ pub fn listen(my_alias: String, addr: SocketAddr, connection_manager: Arc<RwLock
)> = Vec::new();
// Now we need to send back a list of cluster members in the form of a Vector of tuples, containing their alias
for (key, value) in &cmgr_lock.clients {
if let Ok(client) = value.read() {
debug!("Generating member list");
{
let mut cmgr_lock = cmgr.read().unwrap();
debug!("Grabbed read lock on manager");
for (key, value) in &cmgr_lock.clients {
debug!("Processing kv: {:#?} {:#?}", key, value);
let tuple = (
key.to_string(),
client.ip_as_string().unwrap(),
client.port_as_string().unwrap(),
key.0.to_string(),
key.1.to_string(),
key.2.to_string(),
);
members.push(tuple);
}
}
debug!("Generating hello_ack");
let hello_ack = IridiumMessage::HelloAck {
nodes: members,
alias: (
......@@ -51,10 +56,27 @@ pub fn listen(my_alias: String, addr: SocketAddr, connection_manager: Arc<RwLock
};
client.write_bytes(&bincode::serialize(&hello_ack).unwrap());
cmgr_lock.add_client(alias.to_string(), client);
debug!("Adding {} to clients. Client info: {:?}", alias, client);
{
let mut cmgr_lock = cmgr.write().unwrap();
let client_tuple = (alias.to_string(), client.remote_ip_as_string().unwrap(), port.to_string());
cmgr_lock.add_client(client_tuple, client);
}
debug!("Client added to manager");
}
// Handles another node sending a Join message. In this case, we don't want to send back a list of all known nodes.
IridiumMessage::Join { alias, port } => {
debug!("Received join message from alias: {:?}", alias);
if let Ok(mut connection_manager) = cmgr.write() {
debug!("Added new client {} to conneciton manager", alias);
let client_tuple = (alias.to_string(), client.remote_ip_as_string().unwrap(), port);
connection_manager.add_client(client_tuple, client);
} else {
error!("Unable to add {} to connection manager", alias);
}
}
_ => {
error!("Non-hello message received from node trying to join");
error!("Unknown message received from node");
}
}
}
......
......@@ -3,7 +3,7 @@ pub mod command_parser;
use std;
use std::fs::File;
use std::io;
use std::io::{Read, Write};
use std::io::{Read};