Commit a678a1fe authored by Robert Rudman's avatar Robert Rudman

Use Sam's implementation of websocket here:...

Use Sam's implementation of websocket here: https://github.com/SOBotics/SharpExchange/blob/master/SharpExchange/Net/WebSocket/DefaultWebSocket.cs - It will remove the warnings about incompatibility, and Sam's already got re-connect on error implemented
parent 2f56effd
......@@ -53,22 +53,22 @@ namespace StackExchangeChat.Console
var events = chatClient.SubscribeToEvents(ChatSite.StackExchange, 86421);
events.Subscribe(System.Console.WriteLine);
chatClient
.SubscribeToEvents(ChatSite.StackExchange, 86421)
.OnlyMessages()
.SameRoomOnly()
.SkipMyMessages()
.Subscribe(async chatEvent =>
{
try
{
await chatClient.SendMessage(chatEvent.RoomDetails.ChatSite, chatEvent.RoomDetails.RoomId, $":{chatEvent.ChatEventDetails.MessageId} Replying to message..");
}
catch (Exception ex) { }
}, exception =>
{
System.Console.WriteLine(exception);
});
//chatClient
// .SubscribeToEvents(ChatSite.StackExchange, 86421)
// .OnlyMessages()
// .SameRoomOnly()
// .SkipMyMessages()
// .Subscribe(async chatEvent =>
// {
// try
// {
// await chatClient.SendMessage(chatEvent.RoomDetails.ChatSite, chatEvent.RoomDetails.RoomId, $":{chatEvent.ChatEventDetails.MessageId} Replying to message..");
// }
// catch (Exception) { }
// }, exception =>
// {
// System.Console.WriteLine(exception);
// });
System.Console.ReadKey();
}
......
......@@ -7,7 +7,6 @@ using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using StackExchangeChat.Utilities;
using WebSocketSharp;
namespace StackExchangeChat
{
......@@ -90,10 +89,10 @@ namespace StackExchangeChat
var lastEventTime = JsonConvert.DeserializeObject<JObject>(await eventsRequest.Content.ReadAsStringAsync())["time"].Value<string>();
var webSocket = new WebSocket($"{wsAuthUrl}?l={lastEventTime}") {Origin = $"https://{chatSite.ChatDomain}"};
webSocket.OnMessage += (sender, args) =>
var webSocket = new PlainWebSocket($"{wsAuthUrl}?l={lastEventTime}", new Dictionary<string, string> { { "Origin", $"https://{chatSite.ChatDomain}"} });
webSocket.OnTextMessage += (message) =>
{
var dataObject = JsonConvert.DeserializeObject<JObject>(args.Data);
var dataObject = JsonConvert.DeserializeObject<JObject>(message);
var eventsObject = dataObject.First.First["e"];
if (eventsObject == null)
return;
......@@ -111,7 +110,7 @@ namespace StackExchangeChat
}
};
webSocket.Connect();
await webSocket.ConnectAsync();
observer.OnNext(new ChatEvent
{
......@@ -126,7 +125,7 @@ namespace StackExchangeChat
return Disposable.Create(() =>
{
webSocket.Close();
// webSocket.Dispose();
});
});
}
......
using System;
using System.Collections.Generic;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace StackExchangeChat
{
// https://github.com/SOBotics/SharpExchange/blob/master/SharpExchange/Net/WebSocket/DefaultWebSocket.cs
public class PlainWebSocket : IDisposable
{
private const int BufferSize = 4 * 1024;
private ClientWebSocket _socket;
private readonly CancellationTokenSource _socketTokenSource;
private bool _dispose;
public event Action OnOpen;
public event Action<string> OnTextMessage;
public event Action<byte[]> OnBinaryMessage;
public event Action OnClose;
public event Action<Exception> OnError;
public event Action OnReconnectFailed;
public Uri Endpoint { get; }
public IReadOnlyDictionary<string, string> Headers { get; }
public bool AutoReconnect { get; set; } = true;
public PlainWebSocket(string endpoint, IReadOnlyDictionary<string, string> headers = null)
{
ThrowIfNullOrEmpty(endpoint, nameof(endpoint));
_socketTokenSource = new CancellationTokenSource();
Endpoint = new Uri(endpoint);
Headers = headers;
}
public void Dispose()
{
if (_dispose)
return;
_dispose = true;
if (_socket?.State == WebSocketState.Open)
_socketTokenSource.Cancel();
_socket?.Dispose();
}
public async Task ConnectAsync()
{
if (_dispose) return;
if (_socket?.State == WebSocketState.Open || _socket?.State == WebSocketState.Connecting)
throw new Exception("WebSocket is already open/connecting.");
_socket = new ClientWebSocket();
if (Headers != null)
foreach (var kv in Headers)
_socket.Options.SetRequestHeader(kv.Key, kv.Value);
await _socket.ConnectAsync(Endpoint, _socketTokenSource.Token);
new Thread(Listen).Start();
InvokeAsync(OnOpen);
}
private void Listen()
{
while (!_dispose)
{
var buffers = new List<byte[]>();
WebSocketReceiveResult msgInfo = null;
try
{
while (!msgInfo?.EndOfMessage ?? true)
{
var b = new ArraySegment<byte>(new byte[BufferSize]);
msgInfo = _socket.ReceiveAsync(b, _socketTokenSource.Token).Result;
var bArray = b.Array;
Array.Resize(ref bArray, msgInfo.Count);
buffers.Add(bArray);
}
}
catch (AggregateException ex)
when (ex.InnerException?.GetType() == typeof(TaskCanceledException))
{
InvokeAsync(OnClose);
return;
}
catch (Exception e1)
{
OnError?.Invoke(e1);
if (!AutoReconnect) return;
try
{
_socketTokenSource.Token.WaitHandle.WaitOne(1000);
ConnectAsync().Wait();
}
catch (Exception e2)
{
InvokeAsync(OnReconnectFailed);
InvokeAsync(OnError, e2);
InvokeAsync(OnClose);
}
return;
}
var buffer = new List<byte>();
foreach (var b in buffers)
{
buffer.AddRange(b);
}
Task.Run(() => HandleNewMessage(msgInfo, buffer.ToArray()));
}
InvokeAsync(OnClose);
}
private void HandleNewMessage(WebSocketReceiveResult msgInfo, byte[] buffer)
{
if (msgInfo == null) return;
try
{
if (msgInfo.MessageType == WebSocketMessageType.Text)
{
var text = Encoding.UTF8.GetString(buffer, 0, buffer.Length);
OnTextMessage?.Invoke(text);
}
else if (msgInfo.MessageType == WebSocketMessageType.Binary)
{
OnBinaryMessage?.Invoke(buffer);
}
}
catch (Exception ex)
{
OnError?.Invoke(ex);
}
}
public static void InvokeAsync(Action del)
{
if (del == null)
return;
new Thread(del.Invoke).Start();
}
public static void InvokeAsync<T>(Action<T> del, T arg)
{
if (del == null)
return;
new Thread(() => del.Invoke(arg)).Start();
}
public static void ThrowIfNullOrEmpty(string str, string argName)
{
if (string.IsNullOrEmpty(argName))
{
throw new ArgumentException($"'{argName}' cannot be null or empty.");
}
if (string.IsNullOrEmpty(str))
{
throw new ArgumentException($"'{argName}' cannot be null or empty.");
}
}
}
}
\ No newline at end of file
......@@ -10,7 +10,6 @@
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="2.2.0-preview3-35497" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.1-beta1" />
<PackageReference Include="System.Reactive" Version="4.1.2" />
<PackageReference Include="WebSocketSharp" Version="1.0.3-rc11" />
</ItemGroup>
</Project>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment