Commit 2d6260f0 authored by dns2utf8's avatar dns2utf8

Browser Pod and lots of Messages

parent 365bef1e
This diff is collapsed.
......@@ -5,19 +5,22 @@ authors = ["Stefan Schindler <dns2utf8@estada.ch>"]
edition = "2018"
[dependencies]
actix = "0.7"
actix = "=0.7.9"
actix-web = { version="0.7", features=["tls"] }
actix-net = { version="=0.2.6", features=["ssl"] }
openssl = "0.10"
tokio-openssl = "0.3"
native-tls = "0.2.2"
env_logger = "0.6.1"
futures = "0.1.26"
#actix-net = { version="=0.2.6", features=["ssl"] }
#openssl = "0.10"
#tokio-openssl = "0.3"
native-tls = "0.2"
env_logger = "0.6"
futures = "0.1"
serde_derive = "1.0"
serde_json = "1.0"
serde = "1.0"
#tokio-tls = "0.2.1"
actix-service = "0.3"
actix-server = { version="0.4", features=["ssl"] }
#actix-service = "0.3"
#actix-server = { version="0.4", features=["ssl"] }
#actix-web = { version="1.0.0-alpha.6", features=["ssl"] }
#actix-files = "0.1.0-alpha.6"
#actix-web-actors = "1.0.0-alpha.3"
actix-codec = "0.1.1"
tokio-tcp = "0.1.3"
#actix-codec = "0.1.1"
#tokio-tcp = "0.1.3"
use ::actix::*;
use ::actix_web::*;
use crate::protocols::{PodId, PodRequest, PodResponse, ClientRequest, ClientResponse};
use std::collections::HashMap;
#[derive(Default)]
pub struct Hub {
pods: HashMap<PodId, PodInfo>,
}
impl Actor for Hub {
type Context = SyncContext<Self>;
}
impl Handler<Subscribe> for Hub {
type Result = ();
fn handle(&mut self, msg: Subscribe, _ctx: &mut Self::Context) -> Self::Result {
self.pods.insert(msg.id, PodInfo {
addr: msg.addr,
name: msg.name,
image_paths: vec![],
});
}
}
impl Handler<Unsubscribe> for Hub {
type Result = ();
fn handle(&mut self, msg: Unsubscribe, _ctx: &mut Self::Context) -> Self::Result {
if let Some(lost_pod) = self.pods.remove(&msg.0) {
let message = ClientResponse::PodGone(msg.0);
for pod in self.pods.values() {
pod.addr.do_send(message.clone())
}
}
}
}
impl Handler<ClientRequest> for Hub {
type Result = MessageResult<ClientRequest>;
fn handle(&mut self, msg: ClientRequest, _ctx: &mut Self::Context) -> Self::Result {
use ClientRequest::*;
let r = match msg {
ListAllPods => {
let pods = self.pods.iter().map(|(&id, info)| {
crate::protocols::PodDescription { id, name: info.name.clone(), }
}).collect();
ClientResponse::Pods(pods)
}
ListPodStructure(id) => {
match self.pods.get(&id) {
Some(info) => {
ClientResponse::PodStructure(info.image_paths.clone())
}
None => {
ClientResponse::UnknownPod(id)
}
}
}
};
MessageResult(r)
//_ctx.repyl(r)
}
}
pub struct PodInfo {
addr: Addr<Ws>,
name: String,
image_paths: Vec<String>,
}
/// Define http actor
pub struct Ws {
pub id: PodId,
pub hub: Addr<Hub>,
/// This will be empty once the actor is connected to the `Hub`.
/// TODO this is a douplicate, maybe remove it
pub name: String,
}
/*
impl Ws {
fn client_request(&mut self, msg: ClientRequest, ctx: &mut ws::WebsocketContext<Self>) {
self.hub.send(msg)
.into_actor(self)
.then(|response, _, ctx| {
self.do_send(response);
fut::ok( () )
})
.spawn(ctx);
}
fn pod_request(&mut self, msg: PodRequest, ctx: &mut ws::WebsocketContext<Self>) {
// match msg {}
}
}
*/
impl Actor for Ws {
type Context = ws::WebsocketContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
let mut name = String::new();
std::mem::swap(&mut self.name, &mut name);
self.hub.do_send(Subscribe {
id: self.id,
name: name,
addr: ctx.address(),
});
}
fn stopped(&mut self, _ctx: &mut Self::Context) {
self.hub.do_send(Unsubscribe(self.id));
}
}
/// Handler for ws::Message message
impl StreamHandler<ws::Message, ws::ProtocolError> for Ws {
fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
match msg {
ws::Message::Ping(msg) => ctx.pong(&msg),
ws::Message::Text(text) => {
use crate::protocols::JsonProtocol;
println!("text: {:?}", text);
let message: Result<JsonProtocol, _> = serde_json::from_str(&text);
match message {
Ok(JsonProtocol::ClientRequest(message)) => {
//self.client_request(message, ctx);
self.hub.send(message)
.into_actor(self)
.then(|response, _, ctx| {
//self.do_send(response);
ctx.text(serde_json::to_string(&response.unwrap()).expect("unable to serialize internal state"));
fut::ok( () )
})
.spawn(ctx);
}
Ok(JsonProtocol::PodRequest(message)) => {
//self.pod_request(message, ctx);
}
_invalid => {
ctx.text("\"invalid request\"".to_string());
}
}
},
ws::Message::Binary(bin) => ctx.binary(bin),
ws::Message::Close(reason) => ctx.close(reason),
_ => (),
}
}
}
impl Handler<ClientResponse> for Ws {
type Result = MessageResult<ClientResponse>;
fn handle(&mut self, message: ClientResponse, ctx: &mut Self::Context) -> Self::Result {
ctx.text(serde_json::to_string(&message).expect("unable to serialize internal state"));
MessageResult(())
}
}
impl Handler<PodResponse> for Ws {
type Result = MessageResult<PodResponse>;
fn handle(&mut self, message: PodResponse, ctx: &mut Self::Context) -> Self::Result {
ctx.text(serde_json::to_string(&message).expect("unable to serialize internal state"));
MessageResult(())
}
}
#[derive(Message)]
pub struct Subscribe {
id: PodId,
addr: Addr<Ws>,
name: String,
}
#[derive(Message)]
pub struct Unsubscribe(PodId);
#[macro_use]
extern crate actix;
extern crate actix_web;
//extern crate actix_net;
extern crate futures;
use actix::*;
use actix_web::*;
//use actix_net::server::Server;
use actix_net::service::{IntoNewService, NewServiceExt};
//use actix::prelude::*;
use actix_codec::{AsyncRead, AsyncWrite};
// Actix 1.0.0 deps
//use actix_files as fs;
//use actix_web_actors::ws;
#[macro_use]
extern crate serde_derive;
//use actix_net::ssl::NativeTlsAcceptor;
use actix_server::{
Io,
Server};
use actix_service::{fn_service, NewService};
use futures::{future, Future};
use actix::*;
use actix_web::*;
use native_tls::{Identity, TlsAcceptor};
use std::io::prelude::*;
use std::fs::File;
//use std::sync::Arc;
use tokio_openssl::{SslConnectorExt, SslAcceptorExt};
use std::io::prelude::*;
use std::sync::{Arc, Mutex};
/// Define http actor
struct Ws;
mod protocols;
use protocols::PodId;
impl Actor for Ws {
type Context = ws::WebsocketContext<Self>;
}
mod actors;
use actors::{Hub, Ws};
/// Handler for ws::Message message
impl StreamHandler<ws::Message, ws::ProtocolError> for Ws {
fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
match msg {
ws::Message::Ping(msg) => ctx.pong(&msg),
ws::Message::Text(text) => ctx.text(text),
ws::Message::Binary(bin) => ctx.binary(bin),
ws::Message::Close(reason) => ctx.close(reason),
_ => (),
}
}
}
fn main() {
let bind_addr = "[::]:3000";
let bind_addr_tls = "[::]:4000";
std::env::set_var("RUST_BACKTRACE", "1");
std::env::set_var("RUST_LOG", "actix_server=info,actix_web=trace");
protocols::print_all_messages();
env_logger::init();
let sys = actix::System::new("master process");
let native_acceptor = get_native_tls_acceptor().expect("unable to initialize TLS");
let incrementor =
Arc::new(Mutex::new(
Incrementor { i: 0 }
));
let hub = SyncArbiter::start(1, || {
Hub::default()
});
println!("binding to {:?}", bind_addr);
server::HttpServer::new(move|| {
App::new()
.resource("/ws/", |r| r.f(|req| ws::start(req, Ws)))
server::HttpServer::new(move || {
App::with_state((incrementor.clone(), hub.clone()))
.resource("/ws/", |r| r.f(|req| {
/*
*req.state()
.send(GetId)
.into_actor(&req.address())
.then(|id: Result<NewId, _>, act, ctx| {
let id = id.unwrap().0;
ws::start(&req.drop_state(), Ws { id })
})
//.map_err(Error::from)
// */
//*
let (incrementor, hub) = req.state();
ws::start(
&req.drop_state(),
Ws {
id: incrementor.lock().unwrap().increment(),
hub: hub.clone(),
name: "".into(),
},
) // */
}))
.handler(
"/",
fs::StaticFiles::new("static/")
......@@ -79,58 +82,10 @@ fn main() {
.expect("unable to construct HttpServer")
.start();
println!("binding TLS to {:?}", bind_addr_tls);
let acceptor = get_openssl_acceptor();
// bind socket address and start workers. By default server uses number of
// available logical cpu as threads count. actix net start separate
// instances of service pipeline in each worker.
//Server::default()
Server::build()
.bind(
// configure service pipeline
"basic", bind_addr_tls,
//move || fn_service(|_| Ok::<_, ()>(()))
move || {
//let num = num.clone();
let acceptor = acceptor.clone();
// service for converting incoming TcpStream to a SslStream<TcpStream>
fn_service(move |stream: Io<tokio_tcp::TcpStream>| {
SslAcceptorExt::accept_async(&acceptor, stream.into_parts().0)
// fn_service(move |stream: tokio_tcp::TcpStream| {
// SslAcceptorExt::accept_async(&acceptor, stream)
.map_err(|e| println!("Openssl error: {}", e))
})
// .and_then() combinator uses other service to convert incoming `Request` to a
// `Response` and then uses that response as an input for next
// service. in this case, on success we use `logger` service
.and_then(fn_service(logger))
// Next service counts number of connections
.and_then(move |req| {
//let num = num.fetch_add(1, Ordering::Relaxed);
//println!("processed {:?} connections", num);
println!("another connection");
future::ok(())
})
},
).expect("unable to start RAW TLS service")
.start();
println!("sys.run()");
let _ = sys.run();
}
/// Simple logger service, it just prints fact of the new connections
fn logger<T: AsyncRead + AsyncWrite + std::fmt::Debug>(
stream: T,
) -> impl Future<Item = T, Error = ()> {
println!("New connection: {:?}", stream);
future::ok(stream)
}
fn get_native_tls_acceptor() -> Result<TlsAcceptor, native_tls::Error> {
let mut file = File::open("identity.pfx").expect("unable to open identity");
let mut identity = vec![];
......@@ -140,12 +95,31 @@ fn get_native_tls_acceptor() -> Result<TlsAcceptor, native_tls::Error> {
TlsAcceptor::new(identity)
}
fn get_openssl_acceptor() -> openssl::ssl::SslAcceptor {
use openssl::ssl::*;
// load ssl keys
//let mut builder = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap();
let mut builder = SslAcceptor::mozilla_modern(SslMethod::tls()).expect("unable to set mozilla_modern");
builder.set_private_key_file("./key_decrypted.pem", SslFiletype::PEM).expect("unable to set set_private_key_file");
builder.set_certificate_chain_file("./cert.pem").expect("unable to set cert chain file");
builder.build()
struct Incrementor {
i: PodId,
}
impl Incrementor {
fn increment(&mut self) -> PodId {
let id = self.i;
self.i = self.i.checked_add(1).expect("we ran out of ids");
id
}
}
/*
impl Actor for Incrementor {
type Context = SyncContext<Self>;
}
impl Handler<GetId> for Incrementor {
type Result = MessageResult<GetId>;
fn handle(&mut self, _incoming: GetId, _ctx: &mut Self::Context) -> Self::Result {
MessageResult(NewId(self.increment()))
}
}
#[derive(Debug, Clone, Message)]
#[rtype(result = "NewId")]
pub struct GetId;
#[derive(Message)]
pub struct NewId(pub PodId);
*/
use serde_json as json;
pub type PodId = u64;
#[derive(Serialize, Deserialize, Debug, Message)]
//#[serde(tag = "tier", content = "data")]
//#[serde(tag = "tier")]
/// Communicate with everything
pub enum JsonProtocol {
ClientRequest(ClientRequest),
ClientResponse(ClientResponse),
PodRequest(PodRequest),
PodResponse(PodResponse),
}
/// Browser -> Master
#[derive(Serialize, Deserialize, Debug, Message)]
#[rtype(result = "ClientResponse")]
pub enum ClientRequest {
ListAllPods,
ListPodStructure(PodId),
}
/// Master -> Browser
#[derive(Serialize, Deserialize, Debug, Clone, Message)]
pub enum ClientResponse {
/*/// new ID for the Browser
NewId(PodId),*/
Pods(Vec<PodDescription>),
PodStructure(Vec<String>),
UnknownPod(PodId),
PodGone(PodId),
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct PodDescription {
pub id: PodId,
pub name: String,
}
/// Slave -> Master
#[derive(Serialize, Deserialize, Debug, Message)]
pub enum PodRequest {
RegisterSelf{
proposed_id: Option<PodId>,
name: String,
},
UpdateTitle { name: String, },
UpdatePaths { paths: Vec<String>, },
}
/// Master -> Slave
#[derive(Serialize, Deserialize, Debug, Message)]
pub enum PodResponse {
Registered { global_id: PodId, },
}
pub(crate) fn print_all_messages() {
let t = |t| { println!("\n==== {} ====", t); };
let p = |obj| {
let s = json::to_string(&obj).unwrap();
println!(" {}", s);
};
t("ClientRequest");
p(JsonProtocol::ClientRequest(ClientRequest::ListAllPods));
p(JsonProtocol::ClientRequest(ClientRequest::ListPodStructure(42)));
t("ClientResponse");
//p(JsonProtocol::ClientResponse(ClientResponse::NewId(23)));
p(JsonProtocol::ClientResponse(ClientResponse::Pods(vec![PodDescription{id: 42, name: "bla".into()}])));
p(JsonProtocol::ClientResponse(ClientResponse::PodStructure(vec!["bla".into(), "bli".into()])));
p(JsonProtocol::ClientResponse(ClientResponse::UnknownPod(123)));
p(JsonProtocol::ClientResponse(ClientResponse::PodGone(1234)));
t("PodRequest");
p(JsonProtocol::PodRequest(PodRequest::RegisterSelf{ proposed_id: Some(42), name: "bla".into(), }));
p(JsonProtocol::PodRequest(PodRequest::RegisterSelf{ proposed_id: None, name: "bla".into(), }));
p(JsonProtocol::PodRequest(PodRequest::UpdateTitle{ name: "bli".into(), }));
p(JsonProtocol::PodRequest(PodRequest::UpdatePaths{ paths: vec!["bli".into()], }));
t("PodResponse");
p(JsonProtocol::PodResponse(PodResponse::Registered { global_id: 42, }));
println!("\n");
}
......@@ -4,12 +4,24 @@
<title>Distributed Gallery</title>
<link rel="stylesheet" href="style.css"/>
<script src="main.js"></script>
<script src="self_host.js"></script>
</head>
<body>
<h1>A Distributed Gallery</h1>
<div id="galleries"></div>
<h1>Share yourself</h1>
<form action="#">
<label>Name your Gallery: <input id="pod_name" type="text"/></label> <br>
<input id="pod_share" type="file" multiple="multiple" accept="image/*" /> <br>
<input type="submit" value="Share your Gallery with the World" />
</form>
<section id="pod_preview">
<h1>Your title</h1>
<div></div>
</section>
<h2>Development stuff</h2>
<div>
<button id="ws_echo">Test WebSocket echo</button>
......
const log = console.log;
let html_logger = undefined;
let ws = undefined;
let galleries = [], galleries_element = undefined;
setup_ws();
WebSocket.prototype.send_object = function(obj) {
return this.send(JSON.stringify(obj));
}
window.addEventListener('load', function() {
html_logger = make_logger(document.querySelector('#ws_echo_out'));
setup_test_websocket();
......@@ -16,7 +22,7 @@ window.addEventListener('load', function() {
function update_ui() {
if (galleries.length === 0) {
galleries_element.innerHTML = "No Gallerie connected";
galleries_element.innerHTML = "No Gallery connected";
} else {
let html = 'TODO display galleries';
......@@ -29,11 +35,14 @@ function update_ui() {
function setup_test_websocket() {
const button = document.querySelector('#ws_echo');
const pre = document.querySelector('#ws_echo_out');
const log = make_logger(pre);
button.addEventListener('click', _ => {
log(''+ws);
const entry = `> readyState: ${ws.readyState} - OPEN: ${ws.readyState === ws.OPEN}`;
log(entry);
html_logger(entry);
ws.send('"bla"')
});
}
......@@ -50,9 +59,11 @@ function setup_ws() {
ws = new WebSocket('ws'+(location.protocol.indexOf('https') === 0 ? 's' : '')+'://'+location.host+'/ws/');
ws.onclose = _ => {
Pod.reconnect_handler(false);
// TODO add more than one retry here
requestAnimationFrame(_ => {
log('reconnecting ws in a second');
log('reconnecting WebSocket in a second');
setTimeout(setup_ws, 1024);
});
}
......@@ -60,27 +71,25 @@ function setup_ws() {
ws.onmessage = msg => {
log(msg.data);
const data = JSON.parse(msg.data);
if (data === 'Updated') {
log('logged in');
build_ui();
} else if (data['CompleteSet'] !== undefined) {
log('CompleteSet', data['CompleteSet']);
people = data['CompleteSet'];
build_ui();
update_ui();
} else if (data['UpdateEntry'] !== undefined) {
log('UpdateEntry', data['UpdateEntry']);
const entry = data['UpdateEntry'];
people[entry[0]] = [entry[1], entry[2]];
update_ui();
if (typeof data.ClientResponse !== 'undefined') {
client_response(data.ClientResponse);
} if (typeof data.PodResponse !== 'undefined') {
Pod.message_handler(data.PodResponse);
} else {
console.error(["unimplemented message", data]);
html_logger('< ' + msg.data);
}
};
ws.onopen = _ => {
log('Connected.');
ws.send('"GetAll"');
ws.send('{"ClientRequest":"ListAllPods"}');
Pod.reconnect_handler(true);
}
}
function client_response(message) {
error(['client_response unimplemented', message]);
}
// The actor interface
const Pod = {
id: undefined,
shared_files: [],
message_handler: _ => {},
reconnect_handler: _ => {},
connected: false,
};
window.addEventListener('load', function() {
'use strict';
const inputElement = document.querySelector('#pod_share');
inputElement.addEventListener("change", handleFiles, false);
const pod_name = document.querySelector('#pod_name');
pod_name.addEventListener("change", updatePreviewTitle, false);
pod_name.addEventListener("keyup", updatePreviewTitle, false);
// catch form submit
inputElement.parentElement.addEventListener("submit", ev => {
registerSelf();
}, false);
Pod.message_handler = message_handler;
Pod.reconnect_handler = reconnect_handler;
inputElement.value = '';
regenPreview();
updatePreviewTitle();
function message_handler(message) {
console.log(['Pod::message_handler()', message]);
throw message;
}
function reconnect_handler(connected) {
console.log(['Pod::reconnect_handler()', connected]);
Pod.connected = connected;
if (connected) {
registerSelf();
publishPictures();
}
}
function handleFiles() {
const files = this.files; /* now you can work with the file list */
log(["handleFiles", files]);
if (typeof files === 'undefined') {
return;
}
for (let i = 0, numFiles = files.length; i < numFiles; ++i) {
const file = files[i];
if (file.type.indexOf('image/') === 0) {
Pod.shared_files.push(file)
} else {
console.error(['Pod::handleFiles() invalid file type', file]);
}
}
publishPictures();
regenPreview();
}
function registerSelf() {
const data = {
"PodRequest":{
"RegisterSelf":{ "proposed_id":Pod.id || null, "name": normalized_title() }
}
};
ws.send_object(data);
}
function publishPictures() {
if (Pod.connected) {
const paths = Pod.shared_files.reduce((b, cur) => { b.push(cur.name); return b }, []);
ws.send_object({"PodRequest":{"UpdatePaths":{"paths":paths}}});
}
}
function updatePreviewTitle() {
const title = document.querySelector('#pod_preview h1');
const name = normalized_title();
title.innerText = name;
if (Pod.connected) {
// TODO delay until the user stops typing?
ws.send_object({"PodRequest":{"UpdateTitle":{"name":name}}});
}
}
function normalized_title() {
const pod_name = document.querySelector('#pod_name');