Commit 4105d8c4 authored by dns2utf8's avatar dns2utf8

Metadata gets distributed and Clients behave nice

parent 0eb28323
......@@ -8,40 +8,68 @@ use std::collections::HashMap;
#[derive(Default)]
pub struct Hub {
pods: HashMap<PodId, PodInfo>,
clients: HashMap<PodId, Addr<Ws>>,
}
impl Hub {
/*fn broadcast<T, M, R>(&self, message: M) where T: actix::Handler<M>, M: Message<Result=R> + Clone + Send, R: 'static + Send {
for pod in self.pods.values() {
pod.addr.do_send(message.clone())
}
}*/
fn broadcast_client_response(&self, message: ClientResponse) {
for addr in self.clients.values() {
addr.do_send(message.clone())
}
}
fn broadcast_pod_response(&self, message: PodResponse) {
for pod in self.pods.values() {
pod.addr.do_send(message.clone())
}
}
}
impl Actor for Hub {
type Context = SyncContext<Self>;
}
impl Handler<Subscribe> for Hub {
impl Handler<SubscribeClient> for Hub {
type Result = ();
fn handle(&mut self, msg: SubscribeClient, _ctx: &mut Self::Context) -> Self::Result {
self.clients.insert(msg.id, msg.addr);
self.handle(ClientRequest::ListAllPods, _ctx);
}
}
impl Handler<SubscribePod> for Hub {
type Result = ();
fn handle(&mut self, msg: Subscribe, _ctx: &mut Self::Context) -> Self::Result {
fn handle(&mut self, msg: SubscribePod, _ctx: &mut Self::Context) -> Self::Result {
self.pods.insert(msg.id, PodInfo {
addr: msg.addr,
name: msg.name,
name: msg.name.clone(),
image_paths: vec![],
});
/*let message = ClientResponse::Pods();
for pod in self.pods.values() {
pod.addr.do_send(message.clone())
}*/
self.handle(ClientRequest::ListAllPods, _ctx);
self.broadcast_client_response(ClientResponse::NewPod { id: msg.id, name: msg.name, });
}
}
impl Handler<Unsubscribe> for Hub {
impl Handler<UnsubscribePod> for Hub {
type Result = ();
fn handle(&mut self, msg: Unsubscribe, _ctx: &mut Self::Context) -> Self::Result {
if let Some(lost_pod) = self.pods.remove(&msg.0) {
fn handle(&mut self, msg: UnsubscribePod, _ctx: &mut Self::Context) -> Self::Result {
if let Some(_lost_pod) = self.pods.remove(&msg.0) {
let message = ClientResponse::PodGone(msg.0);
for pod in self.pods.values() {
pod.addr.do_send(message.clone())
}
self.broadcast_client_response(message);
}
}
}
impl Handler<UnsubscribeClient> for Hub {
type Result = ();
fn handle(&mut self, msg: UnsubscribeClient, _ctx: &mut Self::Context) -> Self::Result {
self.clients.remove(&msg.0);
}
}
impl Handler<ClientRequest> for Hub {
type Result = MessageResult<ClientRequest>;
......@@ -57,7 +85,10 @@ impl Handler<ClientRequest> for Hub {
ListPodStructure(id) => {
match self.pods.get(&id) {
Some(info) => {
ClientResponse::PodStructure(info.image_paths.clone())
ClientResponse::PodUpdatePaths {
id,
paths: info.image_paths.clone()
}
}
None => {
ClientResponse::UnknownPod(id)
......@@ -69,6 +100,25 @@ impl Handler<ClientRequest> for Hub {
//_ctx.repyl(r)
}
}
impl Handler<IdedPodRequest> for Hub {
type Result = ();
fn handle(&mut self, msg: IdedPodRequest, _ctx: &mut Self::Context) -> Self::Result {
use PodRequest::*;
match msg.message {
RegisterSelf { .. } => unreachable!("must be handled by Ws"),
UpdateTitle { name } => {
self.pods.get_mut(&msg.id).expect("unable to find PodInfo").name = name.clone();
self.broadcast_client_response(ClientResponse::PodUpdateName{ id: msg.id, name, });
}
UpdatePaths { paths } => {
self.pods.get_mut(&msg.id).expect("unable to find PodInfo").image_paths = paths.clone();
self.broadcast_client_response(ClientResponse::PodUpdatePaths{ id: msg.id, paths, });
}
}
//let message = ClientResponse::PodGone(msg.0);
}
}
pub struct PodInfo {
addr: Addr<Ws>,
......@@ -107,17 +157,14 @@ impl Actor for Ws {
type Context = ws::WebsocketContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
let mut name = String::new();
std::mem::swap(&mut self.name, &mut name);
self.hub.do_send(Subscribe {
self.hub.do_send(SubscribeClient {
id: self.id,
name: name,
addr: ctx.address(),
});
}
fn stopped(&mut self, _ctx: &mut Self::Context) {
self.hub.do_send(Unsubscribe(self.id));
self.hub.do_send(UnsubscribeClient(self.id));
}
}
......@@ -129,7 +176,6 @@ impl StreamHandler<ws::Message, ws::ProtocolError> for Ws {
ws::Message::Ping(msg) => ctx.pong(&msg),
ws::Message::Text(text) => {
use crate::protocols::JsonProtocol;
println!("text: {:?}", text);
let message: Result<JsonProtocol, _> = serde_json::from_str(&text);
match message {
Ok(JsonProtocol::ClientRequest(message)) => {
......@@ -184,23 +230,43 @@ impl Handler<PodRequest> for Ws {
use PodRequest::*;
match message {
RegisterSelf { proposed_id, name } => {
self.hub.do_send(SubscribePod { id: self.id, name, addr: ctx.address(), });
actix::Handler::handle(self, PodResponse::Registered { global_id: self.id }, ctx);
}
other_messages => {
self.hub.do_send(IdedPodRequest { id: self.id, message: other_messages });
}
/*
UpdateTitle { name } => {
}
UpdatePaths { paths } => {
}
}*/
};
MessageResult(())
}
}
#[derive(Message)]
pub struct Subscribe {
pub struct SubscribePod {
id: PodId,
addr: Addr<Ws>,
name: String,
}
#[derive(Message)]
pub struct Unsubscribe(PodId);
pub struct UnsubscribePod(PodId);
#[derive(Message)]
pub struct SubscribeClient {
id: PodId,
addr: Addr<Ws>,
}
#[derive(Message)]
pub struct UnsubscribeClient(PodId);
#[derive(Message)]
pub struct IdedPodRequest {
id: PodId,
message: PodRequest,
}
......@@ -23,12 +23,12 @@ pub enum ClientRequest {
/// Master -> Browser
#[derive(Serialize, Deserialize, Debug, Clone, Message)]
pub enum ClientResponse {
/*/// new ID for the Browser
NewId(PodId),*/
Pods(Vec<PodDescription>),
PodStructure(Vec<String>),
NewPod { id: PodId, name: String },
UnknownPod(PodId),
PodGone(PodId),
PodUpdateName { id: PodId, name: String, },
PodUpdatePaths { id: PodId, paths: Vec<String>, },
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct PodDescription {
......@@ -47,7 +47,7 @@ pub enum PodRequest {
UpdatePaths { paths: Vec<String>, },
}
/// Master -> Slave
#[derive(Serialize, Deserialize, Debug, Message)]
#[derive(Serialize, Deserialize, Clone, Debug, Message)]
pub enum PodResponse {
Registered { global_id: PodId, },
}
......@@ -63,11 +63,12 @@ pub(crate) fn print_all_messages() {
p(JsonProtocol::ClientRequest(ClientRequest::ListPodStructure(42)));
t("ClientResponse");
//p(JsonProtocol::ClientResponse(ClientResponse::NewId(23)));
p(JsonProtocol::ClientResponse(ClientResponse::Pods(vec![PodDescription{id: 42, name: "bla".into()}])));
p(JsonProtocol::ClientResponse(ClientResponse::PodStructure(vec!["bla".into(), "bli".into()])));
p(JsonProtocol::ClientResponse(ClientResponse::NewPod{id:23, name: "blubb".into()}));
p(JsonProtocol::ClientResponse(ClientResponse::UnknownPod(123)));
p(JsonProtocol::ClientResponse(ClientResponse::PodGone(1234)));
p(JsonProtocol::ClientResponse(ClientResponse::PodUpdateName{ id: 42, name: "String".into(), }));
p(JsonProtocol::ClientResponse(ClientResponse::PodUpdatePaths{ id: 42, paths: vec!["String".into()] }));
t("PodRequest");
......
......@@ -23,10 +23,10 @@ window.addEventListener('load', function() {
});
function update_ui() {
if (galleries.length === 0) {
galleries_element.innerHTML = "No Gallery connected";
} else {
galleries.sort((a, b) => a.name.localeCompare(b.name));
galleries_element.innerHTML = '';
galleries.forEach(x => {
......@@ -34,8 +34,8 @@ function update_ui() {
const title = document.createElement('h3');
const text = document.createElement('div');
title.innerHTML = x.name || `unnamed Gallery #${x.id}`;
text.innerHTML = 'TODO loading tree...';
title.innerText = x.name || `unnamed Gallery #${x.id}`;
text.innerText = x.paths === undefined ? 'No images' : `${x.paths.length} images`;
div.appendChild(title);
div.appendChild(text);
......@@ -57,8 +57,8 @@ function setup_test_websocket() {
log(entry);
html_logger(entry);
ws.send('{"ClientRequest":"ListAllPods"}');
ws.send('{"ClientRequest":{"ListPodStructure":2}}');
ws.send('{"ClientRequest":"ListAllPods"}');
ws.send('{"ClientRequest":{"ListPodStructure":2}}');
});
}
......@@ -113,9 +113,33 @@ function client_response(message) {
} else
if (typeof message.Pods !== 'undefined') {
galleries = message.Pods;
galleries.sort((a, b) => a.name.localeCompare(b.name))
update_ui();
} else
if (typeof message.PodUpdatePaths !== 'undefined') {
galleries[indexOfPod(message.PodUpdatePaths.id)].paths = message.PodUpdatePaths.paths;
update_ui();
} else
if (typeof message.NewPod !== 'undefined') {
galleries.push(message.NewPod);
update_ui();
} else
if (typeof message.PodUpdateName !== 'undefined') {
galleries[indexOfPod(message.PodUpdateName.id)].name = message.PodUpdateName.name;
update_ui();
} else
if (typeof message.NewPod !== 'undefined') {
galleries.push(message.NewPod);
update_ui();
} else {
error(['client_response unimplemented', message]);
}
}
function indexOfPod(id) {
for (let i = 0; i < galleries.length; ++i) {
if (galleries[i].id === id) {
return i;
}
}
return undefined;
}
......@@ -5,6 +5,7 @@ const Pod = {
message_handler: _ => {},
reconnect_handler: _ => {},
connected: false,
registered: false,
};
window.addEventListener('load', function() {
......@@ -32,6 +33,7 @@ function message_handler(message) {
console.log(['Pod::message_handler()', message]);
if (typeof message.Registered !== 'undefined') {
Pod.id = message.Registered.global_id;
Pod.registered = true;
} else {
error(['pod_response unimplemented', message]);
}
......@@ -40,10 +42,14 @@ function reconnect_handler(connected) {
console.log(['Pod::reconnect_handler()', connected]);
Pod.connected = connected;
if (connected) {
if (connected && Pod.shared_files.length > 0) {
registerSelf();
publishPictures();
}
if (connected === false) {
Pod.registered = false;
}
}
......@@ -78,6 +84,9 @@ function registerSelf() {
function publishPictures() {
if (Pod.connected) {
if (Pod.registered === false) {
registerSelf();
}
const paths = Pod.shared_files.reduce((b, cur) => { b.push(cur.name); return b }, []);
ws.send_object({"PodRequest": {"UpdatePaths": { "paths": paths } } });
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment