Commit 6d3c6568 authored by Robert Rudman's avatar Robert Rudman

More RXJS fixes

parent 47271192
Pipeline #44867123 passed with stages
in 4 minutes and 57 seconds
......@@ -40,7 +40,7 @@ namespace Rodgort.Services.HostedServices
if (!hasCookies && !hasCredentials)
return;
await RunWithLogging(async () =>
try
{
var chatClient = _serviceProvider.GetRequiredService<ChatClient>();
var dateService = _serviceProvider.GetRequiredService<DateService>();
......@@ -49,69 +49,73 @@ namespace Rodgort.Services.HostedServices
var burnakiFollows = context.BurnakiFollows.Where(bf => !bf.FollowEnded.HasValue).ToList();
foreach (var burnakiFollow in burnakiFollows)
FollowInRoom(burnakiFollow.RoomId, burnakiFollow.BurnakiId, burnakiFollow.FollowStarted, burnakiFollow.Tag, dateService, cancellationToken);
FollowInRoom(burnakiFollow.RoomId, burnakiFollow.BurnakiId, burnakiFollow.FollowStarted,
burnakiFollow.Tag, dateService, cancellationToken);
var events = chatClient.SubscribeToEvents(ChatSite.StackOverflow, ChatRooms.HEADQUARTERS);
await events.FirstAsync();
chatClient.SendMessage(ChatSite.StackOverflow, ChatRooms.HEADQUARTERS, "o/");
_logger.LogInformation("Successfully joined headquarters");
await events
events
.ReplyAlive()
.Pinged()
.SameRoomOnly()
.Where(r => r.ChatEventDetails.UserId == ChatUserIds.ROB)
.ForEachAsync(
.Subscribe(
async chatEvent =>
{
await RunWithLogging(async () =>
{
await ParseCommands(chatClient, chatEvent, dateService, cancellationToken);
});
await ParseCommands(chatClient, chatEvent, dateService, cancellationToken);
}, cancellationToken);
});
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed chat");
}
}
public async Task FollowInRoom(int roomId, int followingUserId, DateTime fromTime, string followingTag, DateService dateService, CancellationToken cancellationToken)
{
await RunWithLogging(async () =>
try
{
var chatClient = _serviceProvider.GetRequiredService<ChatClient>();
var questionIdRegex = new Regex(@"stackoverflow\.com\/q\/(\d+)");
var events = chatClient.SubscribeToEvents(ChatSite.StackOverflow, roomId);
await events.FirstAsync();
_logger.LogInformation($"Successfully joined room {roomId}");
chatClient.SendMessage(ChatSite.StackOverflow, ChatRooms.HEADQUARTERS, $"I just joined {roomId}");
using (var scope = _serviceProvider.CreateScope())
{
var burnProcessingService = scope.ServiceProvider.GetRequiredService<BurnProcessingService>();
await events
.ReplyAlive()
.OnlyMessages()
.SameRoomOnly()
.Where(r => r.ChatEventDetails.UserId == followingUserId)
.SlidingBuffer(TimeSpan.FromSeconds(30))
.ForEachAsync(async chatEvents =>
events
.ReplyAlive()
.OnlyMessages()
.SameRoomOnly()
.Where(r => r.ChatEventDetails.UserId == followingUserId)
.SlidingBuffer(TimeSpan.FromSeconds(30))
.Subscribe(async chatEvents =>
{
try
{
await RunWithLogging(async () =>
var questionIds = chatEvents
.SelectMany(ceg => questionIdRegex.Matches(ceg.ChatEventDetails.Content).Select(m => int.Parse(m.Groups[1].Value)))
.Distinct();
using (var scope = _serviceProvider.CreateScope())
{
var questionIds =
chatEvents.SelectMany(ceg =>
questionIdRegex
.Matches(ceg.ChatEventDetails.Content)
.Select(m => int.Parse(m.Groups[1].Value))
)
.Distinct();
await burnProcessingService.ProcessQuestionIds(questionIds, followingTag, roomId,
true);
});
}, cancellationToken);
}
});
var burnProcessingService = scope.ServiceProvider.GetRequiredService<BurnProcessingService>();
await burnProcessingService.ProcessQuestionIds(questionIds, followingTag, roomId, true);
}
}
catch (Exception ex)
{
_logger.LogError("Failed processing chat events", ex);
}
}, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed chat");
}
}
private async Task ParseCommands(ChatClient chatClient, ChatEvent chatEvent, DateService dateService, CancellationToken cancellationToken)
......@@ -206,22 +210,5 @@ namespace Rodgort.Services.HostedServices
await chatClient.SendMessage(ChatSite.StackOverflow, chatEvent.RoomDetails.RoomId, $":{chatEvent.ChatEventDetails.MessageId} I'm not following anyone");
}
}
private async Task RunWithLogging(Func<Task> task)
{
try
{
await task();
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed chat");
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
......@@ -27,40 +26,45 @@ namespace Rodgort.Services.HostedServices
{
_serviceProvider = serviceProvider;
_logger = logger;
_logger.LogTrace("In constructor for LiveMetaQuestionWatcherService");
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogTrace("In ExecuteAsync for LiveMetaQuestionWatcherService");
try
{
var websocket = CreateLiveWebsocket();
using (var scope = _serviceProvider.CreateScope())
{
var metaCrawlerService = scope.ServiceProvider.GetRequiredService<MetaCrawlerService>();
var apiClient = scope.ServiceProvider.GetRequiredService<ApiClient>();
await websocket
.SlidingBuffer(TimeSpan.FromSeconds(5))
.ForEachAsync(async questionIdList =>
websocket
.SlidingBuffer(TimeSpan.FromSeconds(5))
.Subscribe(async questionIdList =>
{
foreach (var batch in questionIdList.Distinct().Batch(95))
{
foreach (var batch in questionIdList.Distinct().Batch(95))
try
{
var batchList = batch.ToList();
_logger.LogInformation($"Processing batch {string.Join(",", batchList)} from meta websocket");
var questions = await apiClient.MetaQuestionsByIds("meta.stackoverflow.com", batchList.ToList());
var result = metaCrawlerService.ProcessQuestions(questions.Items);
await metaCrawlerService.PostProcessQuestions(result);
using (var scope = _serviceProvider.CreateScope())
{
var metaCrawlerService = scope.ServiceProvider.GetRequiredService<MetaCrawlerService>();
var apiClient = scope.ServiceProvider.GetRequiredService<ApiClient>();
var questions = await apiClient.MetaQuestionsByIds("meta.stackoverflow.com", batchList.ToList());
var result = metaCrawlerService.ProcessQuestions(questions.Items);
await metaCrawlerService.PostProcessQuestions(result);
}
}
}, stoppingToken);
}
catch (Exception ex)
{
_logger.LogError("Failed processing meta websocket batch", ex);
}
}
}, stoppingToken);
}
catch (Exception ex)
{
_logger.LogError("Failed watching live meta", ex);
}
return Task.CompletedTask;
}
private IObservable<int> CreateLiveWebsocket()
......@@ -94,13 +98,11 @@ namespace Rodgort.Services.HostedServices
_logger.LogError("Failed to process 552-home-active for 'https://stackoverflow.com'. Message received: " + message, ex);
}
};
try
{
_logger.LogTrace("Trying to connect");
await webSocket.ConnectAsync();
_logger.LogTrace("Connected. Specifying type");
await webSocket.SendAsync("552-home-active");
_logger.LogTrace("Type specified");
}
catch (Exception ex)
{
......@@ -111,7 +113,7 @@ namespace Rodgort.Services.HostedServices
return webSocket;
});
return websocket;
return websocket.Publish().RefCount();
}
}
}
......@@ -239,11 +239,8 @@ namespace StackExchangeChat
ChatClient = this
});
return Disposable.Create(() =>
{
// webSocket.Dispose();
});
});
return webSocket;
}).Publish().RefCount();
}
private void LogResponseError(HttpResponseMessage responseMessage)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment