Commit c02df97a authored by dns2utf8's avatar dns2utf8

Transfer Pictures, Cache the Contents, first cleanup

parent 236fec11
use ::actix::*;
use ::actix_web::*;
use crate::protocols::{PodId, PodRequest, PodResponse, ClientRequest, ClientResponse};
use crate::protocols::{PodId, PodRequest, PodResponse, ClientRequest, ClientRequestAsync, ClientResponse};
use std::collections::HashMap;
......@@ -22,11 +22,12 @@ impl Hub {
addr.do_send(message.clone())
}
}
/*
fn broadcast_pod_response(&self, message: PodResponse) {
for pod in self.pods.values() {
pod.addr.do_send(message.clone())
}
}
}*/
}
impl Actor for Hub {
......@@ -53,6 +54,7 @@ impl Handler<SubscribePod> for Hub {
self.broadcast_client_response(ClientResponse::NewPod { id: msg.id, name: msg.name, });
}
}
/*
impl Handler<UnsubscribePod> for Hub {
type Result = ();
......@@ -60,14 +62,23 @@ impl Handler<UnsubscribePod> for Hub {
if let Some(_lost_pod) = self.pods.remove(&msg.0) {
let message = ClientResponse::PodGone(msg.0);
self.broadcast_client_response(message);
println!("removing pod {}:{:?}", msg.0, _lost_pod.name);
}
}
}
}*/
impl Handler<UnsubscribeClient> for Hub {
type Result = ();
fn handle(&mut self, msg: UnsubscribeClient, _ctx: &mut Self::Context) -> Self::Result {
self.clients.remove(&msg.0);
// and pod if it was serving data
if let Some(_lost_pod) = self.pods.remove(&msg.0) {
let message = ClientResponse::PodGone(msg.0);
self.broadcast_client_response(message);
println!("removing pod {}:{:?}", msg.0, _lost_pod.name);
println!(" remaining: {:?}", self.pods);
}
}
}
impl Handler<ClientRequest> for Hub {
......@@ -100,6 +111,33 @@ impl Handler<ClientRequest> for Hub {
//_ctx.repyl(r)
}
}
impl Handler<ClientRequestAsync> for Hub {
type Result = ();
fn handle(&mut self, msg: ClientRequestAsync, _ctx: &mut Self::Context) -> Self::Result {
use ClientRequestAsync::*;
match msg {
RequestImage { client_id, gallery_id, path } => {
match self.pods.get(&gallery_id) {
Some(pod) => {
pod.addr
.do_send(PodResponse::RequestImage {
path, client_id,
})
}
None => {
if let Some(client) = self.clients.get(&client_id) {
client.do_send(
ClientResponse::UnknownPod(gallery_id)
)
}
}
}
}
}
}
}
impl Handler<IdedPodRequest> for Hub {
type Result = ();
......@@ -111,12 +149,22 @@ impl Handler<IdedPodRequest> for Hub {
self.pods.get_mut(&msg.id).expect("unable to find PodInfo").name = name.clone();
self.broadcast_client_response(ClientResponse::PodUpdateName{ id: msg.id, name, });
}
UpdatePaths { paths } => {
UpdatePaths { mut paths } => {
paths.sort();
paths.dedup_by(|a, b| a == b);
self.pods.get_mut(&msg.id).expect("unable to find PodInfo").image_paths = paths.clone();
self.broadcast_client_response(ClientResponse::PodUpdatePaths{ id: msg.id, paths, });
}
DeliverImage { client_id, path, blob } => {
if let Some(client) = self.clients.get(&client_id) {
client.do_send(ClientResponse::DeliverImage {
gallery_id: msg.id,
path,
blob,
});
}
}
}
//let message = ClientResponse::PodGone(msg.0);
}
}
......@@ -125,15 +173,21 @@ pub struct PodInfo {
name: String,
image_paths: Vec<String>,
}
impl std::fmt::Debug for PodInfo {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("PodInfo")
.field("name", &self.name)
.field("image_paths", &self.image_paths)
.finish()
}
}
/// Define http actor
pub struct Ws {
pub id: PodId,
pub hub: Addr<Hub>,
/// This will be empty once the actor is connected to the `Hub`.
/// TODO this is a douplicate, maybe remove it
pub name: String,
pub is_pod: bool,
}
/*
......@@ -179,7 +233,6 @@ impl StreamHandler<ws::Message, ws::ProtocolError> for Ws {
let message: Result<JsonProtocol, _> = serde_json::from_str(&text);
match message {
Ok(JsonProtocol::ClientRequest(message)) => {
//self.client_request(message, ctx);
self.hub.send(message)
.into_actor(self)
.then(|response, this, ctx| {
......@@ -189,6 +242,9 @@ impl StreamHandler<ws::Message, ws::ProtocolError> for Ws {
})
.spawn(ctx);
}
Ok(JsonProtocol::ClientRequestAsync(message)) => {
actix::Handler::handle(self, message, ctx);
}
Ok(JsonProtocol::PodRequest(message)) => {
//self.pod_request(message, ctx);
actix::Handler::handle(self, message, ctx);
......@@ -204,6 +260,18 @@ impl StreamHandler<ws::Message, ws::ProtocolError> for Ws {
}
}
}
impl Handler<ClientRequestAsync> for Ws {
//type Result = MessageResult<()>;
type Result = ();
fn handle(&mut self, mut message: ClientRequestAsync, _ctx: &mut Self::Context) -> Self::Result {
// Don't ever trust the client
if let ClientRequestAsync::RequestImage { client_id, .. } = &mut message {
*client_id = self.id;
};
self.hub.do_send(message);
}
}
impl Handler<ClientResponse> for Ws {
type Result = MessageResult<ClientResponse>;
......@@ -229,18 +297,18 @@ impl Handler<PodRequest> for Ws {
fn handle(&mut self, message: PodRequest, ctx: &mut Self::Context) -> Self::Result {
use PodRequest::*;
match message {
RegisterSelf { proposed_id, name } => {
self.hub.do_send(SubscribePod { id: self.id, name, addr: ctx.address(), });
actix::Handler::handle(self, PodResponse::Registered { global_id: self.id }, ctx);
RegisterSelf { name, .. } => {
if self.is_pod == false {
self.is_pod = true;
self.hub.do_send(SubscribePod { id: self.id, name, addr: ctx.address(), });
actix::Handler::handle(self, PodResponse::Registered { global_id: self.id }, ctx);
} else {
actix::Handler::handle(self, PodResponse::AlreadyRegistered { global_id: self.id }, ctx);
}
}
other_messages => {
self.hub.do_send(IdedPodRequest { id: self.id, message: other_messages });
}
/*
UpdateTitle { name } => {
}
UpdatePaths { paths } => {
}*/
};
MessageResult(())
}
......
......@@ -45,7 +45,7 @@ fn main() {
Hub::default()
});
println!("binding to {:?}", bind_addr);
println!("binding to https://{}", bind_addr);
server::HttpServer::new(move || {
App::with_state((incrementor.clone(), hub.clone()))
.resource("/ws/", |r| r.f(|req| {
......@@ -66,7 +66,7 @@ fn main() {
Ws {
id: incrementor.lock().unwrap().increment(),
hub: hub.clone(),
name: "".into(),
is_pod: false,
},
) // */
}))
......
......@@ -8,18 +8,29 @@ pub type PodId = u64;
/// Communicate with everything
pub enum JsonProtocol {
ClientRequest(ClientRequest),
ClientRequestAsync(ClientRequestAsync),
ClientResponse(ClientResponse),
PodRequest(PodRequest),
PodResponse(PodResponse),
}
/// Browser -> Master
/// Browser -> Master rpc style
#[derive(Serialize, Deserialize, Debug, Message)]
#[rtype(result = "ClientResponse")]
pub enum ClientRequest {
ListAllPods,
ListPodStructure(PodId),
}
/// Browser -> Master
#[derive(Serialize, Deserialize, Debug, Message)]
pub enum ClientRequestAsync {
RequestImage {
gallery_id: PodId,
path: String,
#[serde(skip)]
client_id: PodId,
},
}
/// Master -> Browser
#[derive(Serialize, Deserialize, Debug, Clone, Message)]
pub enum ClientResponse {
......@@ -29,6 +40,7 @@ pub enum ClientResponse {
PodGone(PodId),
PodUpdateName { id: PodId, name: String, },
PodUpdatePaths { id: PodId, paths: Vec<String>, },
DeliverImage { gallery_id: PodId, path: String, blob: String, },
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct PodDescription {
......@@ -45,11 +57,14 @@ pub enum PodRequest {
},
UpdateTitle { name: String, },
UpdatePaths { paths: Vec<String>, },
DeliverImage { client_id: PodId, path: String, blob: String, },
}
/// Master -> Slave
#[derive(Serialize, Deserialize, Clone, Debug, Message)]
pub enum PodResponse {
Registered { global_id: PodId, },
AlreadyRegistered { global_id: PodId, },
RequestImage { client_id: PodId, path: String },
}
pub(crate) fn print_all_messages() {
......@@ -62,6 +77,9 @@ pub(crate) fn print_all_messages() {
p(JsonProtocol::ClientRequest(ClientRequest::ListAllPods));
p(JsonProtocol::ClientRequest(ClientRequest::ListPodStructure(42)));
t("ClientRequestAsync");
p(JsonProtocol::ClientRequestAsync(ClientRequestAsync::RequestImage{gallery_id:42, path: "bla".into(), client_id: 0, }));
t("ClientResponse");
p(JsonProtocol::ClientResponse(ClientResponse::Pods(vec![PodDescription{id: 42, name: "bla".into()}])));
p(JsonProtocol::ClientResponse(ClientResponse::NewPod{id:23, name: "blubb".into()}));
......@@ -69,6 +87,7 @@ pub(crate) fn print_all_messages() {
p(JsonProtocol::ClientResponse(ClientResponse::PodGone(1234)));
p(JsonProtocol::ClientResponse(ClientResponse::PodUpdateName{ id: 42, name: "String".into(), }));
p(JsonProtocol::ClientResponse(ClientResponse::PodUpdatePaths{ id: 42, paths: vec!["String".into()] }));
p(JsonProtocol::ClientResponse(ClientResponse::DeliverImage { gallery_id: 42, path: "String".into(), blob: "String".into(), },));
t("PodRequest");
......@@ -76,9 +95,12 @@ pub(crate) fn print_all_messages() {
p(JsonProtocol::PodRequest(PodRequest::RegisterSelf{ proposed_id: None, name: "bla".into(), }));
p(JsonProtocol::PodRequest(PodRequest::UpdateTitle{ name: "bli".into(), }));
p(JsonProtocol::PodRequest(PodRequest::UpdatePaths{ paths: vec!["bli".into()], }));
p(JsonProtocol::PodRequest(PodRequest::DeliverImage { client_id: 23, path: "String".into(), blob: "String".into(), },));
t("PodResponse");
p(JsonProtocol::PodResponse(PodResponse::Registered { global_id: 42, }));
p(JsonProtocol::PodResponse(PodResponse::AlreadyRegistered { global_id: 42, }));
p(JsonProtocol::PodResponse(PodResponse::RequestImage { client_id: 42, path: "bli".into(), }));
println!("\n");
}
// The actor interface
const Gallery = {
id: undefined,
shared_files: [],
message_handler: _ => {},
reconnect_handler: _ => {},
connected: false,
};
window.addEventListener('load', function() {
'use strict';
let galleries = [];
let selected_gallery_id = undefined;
const image_cache /*: Map<id, Map<String, CachedPicture>> */ = {};
const galleries_element = document.querySelector('#galleries');
const galleries_list = galleries_element.querySelector('ul');
const gallery_view = galleries_element.querySelector('div');
Gallery.message_handler = message_handler;
Gallery.reconnect_handler = reconnect_handler;
Gallery.update_ui = update_ui;
update_ui();
update_view();
function message_handler(message) {
if (typeof message.PodGone !== 'undefined') {
galleries = galleries.filter(x => x.id !== message.PodGone);
update_ui();
if (selected_gallery_id === message.PodGone) {
update_view();
}
} else
if (typeof message.Pods !== 'undefined') {
galleries = message.Pods;
galleries.forEach(pod => pod.paths = []);
update_ui();
} else
if (typeof message.UnknownPod !== 'undefined') {
Gallery.reconnect_handler();
} else
if (typeof message.PodUpdatePaths !== 'undefined') {
const id = message.PodUpdatePaths.id;
galleries[indexOfPod(id)].paths = message.PodUpdatePaths.paths;
// Cache Layer
if (typeof image_cache[id] !== 'object') {
image_cache[id] = [];
}
update_ui();
if (message.PodUpdatePaths.id === selected_gallery_id) {
update_view();
}
} else
if (typeof message.NewPod !== 'undefined') {
// FIX galleries = galleries.filter(g => g.id !== message.NewPod.id);
galleries.push(message.NewPod);
update_ui();
} else
if (typeof message.PodUpdateName !== 'undefined') {
galleries[indexOfPod(message.PodUpdateName.id)].name = message.PodUpdateName.name;
update_ui();
} else
if (typeof message.NewPod !== 'undefined') {
galleries.push(message.NewPod);
update_ui();
} else
if (typeof message.DeliverImage !== 'undefined') {
image_cache[message.DeliverImage.gallery_id][message.DeliverImage.path].save_blob(message.DeliverImage.blob);
update_view();
} else {
error(['client_response unimplemented', message]);
}
}
function indexOfPod(id) {
for (let i = 0; i < galleries.length; ++i) {
if (galleries[i].id === id) {
return i;
}
}
return undefined;
}
function reconnect_handler(online) {
ws.send_object({"ClientRequest":"ListAllPods"});
}
/// Update the list of galleries
function update_ui() {
if (galleries.length === 0) {
galleries_list.innerHTML = "<h3>No Gallery connected</h3>";
} else {
galleries.sort((a, b) => a.name.localeCompare(b.name));
galleries_list.innerHTML = '';
galleries.forEach(x => {
const div = document.createElement('li');
const title = document.createElement('h3');
const text = document.createElement('div');
title.innerText = x.name || `unnamed Gallery #${x.id}`;
text.innerText = x.paths === undefined ? 'No images' : `${x.paths.length} images`;
div.appendChild(title);
div.appendChild(text);
div.addEventListener('click', ev => {
// switch to that gallery
selected_gallery_id = x.id;
update_view();
});
galleries_list.appendChild(div);
});
}
}
/// Update the Gallery
function update_view() {
if (selected_gallery_id === undefined) {
gallery_view.innerHTML = '<div class="centered">No Gallery selected</div>';
return;
}
const i = indexOfPod(selected_gallery_id);
if (i === undefined) {
gallery_view.innerHTML = `<div class="centered">Invalid Gallery Id: ${selected_gallery_id}</div>`;
return;
}
gallery_view.innerHTML = '';
galleries[i].paths.forEach(path => {
let cp = image_cache[selected_gallery_id][path];
if (cp === undefined) {
cp = new CachedPicture(selected_gallery_id, path);
image_cache[selected_gallery_id][path] = cp;
} else {
cp.reappend_to_gallery();
}
});
}
function CachedPicture(id, path) {
this.gallery_id = id;
this.path = path;
this.div = document.createElement('div');
this.img = document.createElement('img');
this.div.appendChild(this.img);
const text = document.createElement('div');
text.innerText = path;
this.div.appendChild(text);
this.reappend_to_gallery();
}
CachedPicture.prototype = {
reappend_to_gallery: function() {
this.cache_update();
gallery_view.appendChild(this.div);
},
cache_update: function() {
if (this.blob === undefined) {
ws.send_object({"ClientRequestAsync": {
"RequestImage": {
"gallery_id": this.gallery_id,
"path": this.path,
},
}});
} else {
this.img.src = this.blob;
}
},
save_blob: function(blob) {
this.blob = blob;
},
};
});
......@@ -4,12 +4,16 @@
<title>Distributed Gallery</title>
<link rel="stylesheet" href="style.css"/>
<script src="main.js"></script>
<script src="gallery.js"></script>
<script src="self_host.js"></script>
</head>
<body>
<h1>A Distributed Gallery</h1>
<section id="galleries"></section>
<section id="galleries">
<ul></ul>
<div></div>
</section>
<h1>Share yourself</h1>
<form action="#">
......@@ -25,6 +29,7 @@
<h2>Development stuff</h2>
<div>
<button id="ws_echo">Test WebSocket echo</button>
<p id="ws_state"></p>
<pre id="ws_echo_out"></pre>
</div>
</body>
......
......@@ -4,7 +4,6 @@ const log = console.log
let html_logger = _ => {};
let ws = undefined;
let galleries = [], galleries_element = undefined;
setup_ws();
WebSocket.prototype.send_object = function(obj) {
......@@ -14,38 +13,14 @@ WebSocket.prototype.send_object = function(obj) {
}
window.addEventListener('load', function() {
html_logger = make_logger(document.querySelector('#ws_echo_out'));
html_logger = make_logger(
document.querySelector('#ws_echo_out'),
document.querySelector('#ws_state')
);
setup_test_websocket();
galleries_element = document.querySelector('#galleries');
update_ui();
});
function update_ui() {
if (galleries.length === 0) {
galleries_element.innerHTML = "No Gallery connected";
} else {
galleries.sort((a, b) => a.name.localeCompare(b.name));
galleries_element.innerHTML = '';
galleries.forEach(x => {
const div = document.createElement('div');
const title = document.createElement('h3');
const text = document.createElement('div');
title.innerText = x.name || `unnamed Gallery #${x.id}`;
text.innerText = x.paths === undefined ? 'No images' : `${x.paths.length} images`;
div.appendChild(title);
div.appendChild(text);
galleries_element.appendChild(div)
});
}
}
function setup_test_websocket() {
const button = document.querySelector('#ws_echo');
......@@ -62,10 +37,11 @@ function setup_test_websocket() {
});
}
function make_logger(element) {
function make_logger(element, state_element) {
const now = Date.now;
return function(str) {
element.innerHTML += `${now()}: ${str}\n`;
state_element.innerText = `Pod { connected: ${Pod.connected}, registered: ${Pod.registered}, } `;
}
}
......@@ -84,11 +60,11 @@ function setup_ws() {
}
ws.onmessage = msg => {
log(msg.data);
html_logger('< ' + msg.data);
log(msg.data.substr(0, 32));
html_logger('< ' + msg.data.substr(0, 128));
const data = JSON.parse(msg.data);
if (typeof data.ClientResponse !== 'undefined') {
client_response(data.ClientResponse);
Gallery.message_handler(data.ClientResponse);
} else
if (typeof data.PodResponse !== 'undefined') {
Pod.message_handler(data.PodResponse);
......@@ -101,45 +77,7 @@ function setup_ws() {
ws.onopen = _ => {
log('Connected.');
ws.send('{"ClientRequest":"ListAllPods"}');
Gallery.reconnect_handler(true);
Pod.reconnect_handler(true);
}
}
function client_response(message) {
if (typeof message.PodGone !== 'undefined') {
galleries = galleries.filter(x => x.id !== message.PodGone);
} else
if (typeof message.Pods !== 'undefined') {
galleries = message.Pods;
update_ui();
} else
if (typeof message.PodUpdatePaths !== 'undefined') {
galleries[indexOfPod(message.PodUpdatePaths.id)].paths = message.PodUpdatePaths.paths;
update_ui();
} else
if (typeof message.NewPod !== 'undefined') {
galleries.push(message.NewPod);
update_ui();
} else
if (typeof message.PodUpdateName !== 'undefined') {
galleries[indexOfPod(message.PodUpdateName.id)].name = message.PodUpdateName.name;
update_ui();
} else
if (typeof message.NewPod !== 'undefined') {
galleries.push(message.NewPod);
update_ui();
} else {
error(['client_response unimplemented', message]);
}
}
function indexOfPod(id) {
for (let i = 0; i < galleries.length; ++i) {
if (galleries[i].id === id) {
return i;
}
}
return undefined;
}
// The actor interface
const Pod = {
id: undefined,
id: Math.round(Math.random() * 100000),
shared_files: [],
message_handler: _ => {},
reconnect_handler: _ => {},
......@@ -33,8 +33,34 @@ function message_handler(message) {
console.log(['Pod::message_handler()', message]);
if (typeof message.Registered !== 'undefined') {
Pod.id = message.Registered.global_id;
updatePreviewTitle();
Pod.registered = true;
} else {
} else
if (typeof message.AlreadyRegistered !== 'undefined') {
Pod.id = message.AlreadyRegistered.global_id;
updatePreviewTitle();
Pod.registered = true;
html_logger("you are already sharing");
} else
if (typeof message.RequestImage !== 'undefined') {
const client_id = message.RequestImage.client_id;
const path = message.RequestImage.path;
const candidate = Pod.shared_files.find(file => file.name === path);
if (candidate === undefined) {
// TODO send an error: ws.send_object()
console.error(["no candidate found for path", path])
} else {
ws.send_object({"PodRequest":{
"DeliverImage":{
"client_id": client_id,
"path": path,
"blob": candidate.blob,
}
}});
}
}
else {
error(['pod_response unimplemented', message]);
}
}
......@@ -76,7 +102,7 @@ function handleFiles() {
function registerSelf() {
const data = {
"PodRequest":{
"RegisterSelf":{ "proposed_id":Pod.id || null, "name": normalized_title() }
"RegisterSelf":{ "proposed_id": typeof Pod.id === 'number' ? Pod.id : null, "name": normalized_title() }
}
};
ws.send_object(data);
......@@ -97,7 +123,7 @@ function updatePreviewTitle() {
const name = normalized_title();
title.innerText = name;
if (Pod.connected) {
if (Pod.registered) {
// TODO delay until the user stops typing?
ws.send_object({"PodRequest":{"UpdateTitle":{"name":name}}});
}
......@@ -105,12 +131,20 @@ function updatePreviewTitle() {
function normalized_title() {
const pod_name = document.querySelector('#pod_name');
return pod_name.value.trim() || 'Unnamed Gallery #'+Math.round(Math.random() * 100000)
return pod_name.value.trim() || 'Unnamed Gallery #'+Pod.id;
}
function regenPreview() {
const preview = document.querySelector('#pod_preview div');
const event_to_src = (function(aImg) { return function(e) { aImg.src = e.target.result; }; });
const event_to_src = (function(aImg, file_handler) {
return function(e) {
aImg.src = e.target.result;
// backup into RAM
if (file_handler.blob === undefined) {
file_handler.blob = e.target.result;
}
};
});
if (Pod.shared_files.length === 0) {
preview.innerHTML = '<div>No files shared, add some to see the previews</div>';
......@@ -123,10 +157,9 @@ function regenPreview() {
const text = document.createElement("div");
const img = document.createElement("img");
img.classList.add("obj");
img.file = file;