Commit 10203ca8 authored by Robert Rudman's avatar Robert Rudman

Chat chat subscription/authentication across threads

parent b8ca512a
Pipeline #55296924 passed with stages
in 6 minutes and 27 seconds
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
......@@ -24,6 +23,8 @@ namespace StackExchangeChat
private readonly HttpClientWithHandler _httpClient;
private readonly ILogger<ChatClient> _logger;
private static Dictionary<SiteAuthenticator.SiteRoomIdPair, IObservable<ChatEvent>> _roomSubscriptions = new Dictionary<SiteAuthenticator.SiteRoomIdPair, IObservable<ChatEvent>>();
public ChatClient(SiteAuthenticator siteAuthenticator,
IServiceProvider serviceProvider,
HttpClientWithHandler httpClient, ILogger<ChatClient> logger)
......@@ -193,68 +194,80 @@ namespace StackExchangeChat
public IObservable<ChatEvent> SubscribeToEvents(ChatSite chatSite, int roomId)
{
return Observable.Create<ChatEvent>(async observer =>
var pair = new SiteAuthenticator.SiteRoomIdPair {ChatSite = chatSite, RoomId = roomId};
lock (_roomSubscriptions)
{
var roomDetails = await _siteAuthenticator.GetRoomDetails(chatSite, roomId);
await _siteAuthenticator.AuthenticateClient(_httpClient, chatSite);
var wsAuthRequest = await _httpClient.PostAsync($"https://{chatSite.ChatDomain}/ws-auth",
new FormUrlEncodedContent(
new Dictionary<string, string>
{
{"fkey", roomDetails.FKey},
{"roomid", roomId.ToString()}
}));
if (!_roomSubscriptions.ContainsKey(pair))
{
_roomSubscriptions[pair] = Observable.Create<ChatEvent>(async observer =>
{
var roomDetails = await _siteAuthenticator.GetRoomDetails(chatSite, roomId);
var wsAuthUrl = JsonConvert.DeserializeObject<JObject>(await wsAuthRequest.Content.ReadAsStringAsync())["url"].Value<string>();
await _siteAuthenticator.AuthenticateClient(_httpClient, chatSite);
var wsAuthRequest = await _httpClient.PostAsync($"https://{chatSite.ChatDomain}/ws-auth",
new FormUrlEncodedContent(
new Dictionary<string, string>
{
{"fkey", roomDetails.FKey},
{"roomid", roomId.ToString()}
}));
var eventsRequest = await _httpClient.PostAsync($"https://{chatSite.ChatDomain}/chats/{roomId}/events",
new FormUrlEncodedContent(
new Dictionary<string, string>
var wsAuthUrl =
JsonConvert.DeserializeObject<JObject>(await wsAuthRequest.Content.ReadAsStringAsync())
["url"].Value<string>();
var eventsRequest = await _httpClient.PostAsync(
$"https://{chatSite.ChatDomain}/chats/{roomId}/events",
new FormUrlEncodedContent(
new Dictionary<string, string>
{
{"mode", "events"},
{"msgCount", "0"},
{"fkey", roomDetails.FKey}
}));
var lastEventTime = JsonConvert.DeserializeObject<JObject>(await eventsRequest.Content.ReadAsStringAsync())["time"].Value<string>();
var webSocket = new PlainWebSocket($"{wsAuthUrl}?l={lastEventTime}", new Dictionary<string, string> {{"Origin", $"https://{chatSite.ChatDomain}"}}, _serviceProvider.GetRequiredService<ILogger<PlainWebSocket>>());
webSocket.OnTextMessage += (message) =>
{
{"mode", "events"},
{"msgCount", "0"},
{"fkey", roomDetails.FKey}
}));
var dataObject = JsonConvert.DeserializeObject<JObject>(message);
var eventsObject = dataObject.First.First["e"];
if (eventsObject == null)
return;
var lastEventTime = JsonConvert.DeserializeObject<JObject>(await eventsRequest.Content.ReadAsStringAsync())["time"].Value<string>();
var events = eventsObject.ToObject<List<ChatEventDetails>>();
foreach (var @event in events)
{
var chatEvent = new ChatEvent
{
RoomDetails = roomDetails,
ChatEventDetails = @event,
ChatClient = this
};
observer.OnNext(chatEvent);
}
};
var webSocket = new PlainWebSocket($"{wsAuthUrl}?l={lastEventTime}", new Dictionary<string, string> { { "Origin", $"https://{chatSite.ChatDomain}"} }, _serviceProvider.GetRequiredService<ILogger<PlainWebSocket>>());
webSocket.OnTextMessage += (message) =>
{
var dataObject = JsonConvert.DeserializeObject<JObject>(message);
var eventsObject = dataObject.First.First["e"];
if (eventsObject == null)
return;
await webSocket.ConnectAsync();
var events = eventsObject.ToObject<List<ChatEventDetails>>();
foreach (var @event in events)
{
var chatEvent = new ChatEvent
observer.OnNext(new ChatEvent
{
ChatEventDetails = new ChatEventDetails
{
ChatEventType = ChatEventType.ChatJoined,
RoomId = roomId
},
RoomDetails = roomDetails,
ChatEventDetails = @event,
ChatClient = this
};
observer.OnNext(chatEvent);
}
};
});
await webSocket.ConnectAsync();
observer.OnNext(new ChatEvent
{
ChatEventDetails = new ChatEventDetails
{
ChatEventType = ChatEventType.ChatJoined,
RoomId = roomId
},
RoomDetails = roomDetails,
ChatClient = this
});
return webSocket;
}).Publish().RefCount();
return webSocket;
}).Publish().RefCount();
}
return _roomSubscriptions[pair];
}
}
private void LogResponseError(HttpResponseMessage responseMessage)
......
......@@ -16,12 +16,12 @@ namespace StackExchangeChat
private readonly IServiceProvider _serviceProvider;
private readonly IChatCredentials _chatCredentials;
private struct SiteRoomIdPair { public ChatSite ChatSite; public int RoomId; }
public struct SiteRoomIdPair { public ChatSite ChatSite; public int RoomId; }
private readonly Dictionary<ChatSite, DateTime> _cookieExpires = new Dictionary<ChatSite, DateTime>();
private readonly Dictionary<ChatSite, Task<Cookie>> _authenticateTasks = new Dictionary<ChatSite, Task<Cookie>>();
private readonly object _locker = new object();
private readonly Dictionary<SiteRoomIdPair, Task<RoomDetails>> _cachedRoomDetails = new Dictionary<SiteRoomIdPair, Task<RoomDetails>>();
private static readonly Dictionary<SiteRoomIdPair, Task<RoomDetails>> _cachedRoomDetails = new Dictionary<SiteRoomIdPair, Task<RoomDetails>>();
public SiteAuthenticator(IServiceProvider serviceProvider, IChatCredentials chatCredentials)
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment