Commit c68d9f47 authored by Robert Rudman's avatar Robert Rudman

Watch meta main page for updates. Only mark burn start/featured for questions...

Watch meta main page for updates. Only mark burn start/featured for questions with burn request tags
parent f1b96fc1
Pipeline #43808953 passed with stages
in 5 minutes and 23 seconds
......@@ -7,6 +7,8 @@ namespace Rodgort.ApiUtilities
{
public static class MetaQuestionsUtility
{
private const string BaseQuestionFilter = "!1PVN1yfHP2mFv5xsYE3Iaa)w(-d*V(48A";
public static Task<ApiItemsResponse<BaseQuestion>> MetaQuestionsByTag(this ApiClient apiClient, string siteName, string tag, PagingOptions pagingOptions = null)
{
return apiClient.ApplyWithPaging<BaseQuestion>($"{ApiClient.BASE_URL}/questions",
......@@ -14,7 +16,18 @@ namespace Rodgort.ApiUtilities
{
{"site", siteName},
{"tagged", tag},
{"filter", "!1PVN1yfHP2mFv5xsYE3Iaa)w(-d*V(48A"}
{"filter", BaseQuestionFilter}
}, pagingOptions);
}
public static Task<ApiItemsResponse<BaseQuestion>> MetaQuestionsByIds(this ApiClient apiClient, string siteName, List<int> questionIds, PagingOptions pagingOptions = null)
{
var questionIdString = string.Join(";", questionIds);
return apiClient.ApplyWithPaging<BaseQuestion>($"{ApiClient.BASE_URL}/questions/{questionIdString}",
new Dictionary<string, string>
{
{"site", siteName},
{"filter", BaseQuestionFilter}
}, pagingOptions);
}
}
......
......@@ -15,8 +15,9 @@ namespace Rodgort.Data.Tables
public const string BURNINATE_REQUEST = "burninate-request";
public const string SYNONYM_REQUEST = "synonym-request";
public const string RETAG_REQUEST = "retag-request";
public const string TAG_DISAMBIGUATION = "tag-disambiguation";
public static string[] RequestTypes = { BURNINATE_REQUEST, SYNONYM_REQUEST, RETAG_REQUEST };
public static string[] RequestTypes = { BURNINATE_REQUEST, SYNONYM_REQUEST, RETAG_REQUEST, TAG_DISAMBIGUATION };
public string Name { get; set; }
......
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using MoreLinq;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Rodgort.ApiUtilities;
using Rodgort.Utilities.ReactiveX;
using StackExchangeApi;
using StackExchangeChat;
namespace Rodgort.Services
{
public class LiveMetaQuestionWatcherService : IHostedService
{
private readonly MetaCrawlerService _metaCrawlerService;
private readonly ApiClient _apiClient;
private readonly ILogger<BurnakiFollowService> _logger;
public LiveMetaQuestionWatcherService(IServiceProvider serviceProvider, ILogger<BurnakiFollowService> logger)
{
var scopedServiceProvider = serviceProvider.CreateScope().ServiceProvider;
_metaCrawlerService = scopedServiceProvider.GetRequiredService<MetaCrawlerService>();
_apiClient = scopedServiceProvider.GetRequiredService<ApiClient>();
_logger = logger;
}
public Task StartAsync(CancellationToken cancellationToken)
{
var websocket = CreateLiveWebsocket();
websocket
.SlidingBuffer(TimeSpan.FromSeconds(5))
.Subscribe(async questionIdList =>
{
foreach (var batch in questionIdList.Distinct().Batch(95))
{
var questions = await _apiClient.MetaQuestionsByIds("meta.stackoverflow.com", batch.ToList());
var result = _metaCrawlerService.ProcessQuestions(questions.Items);
await _metaCrawlerService.PostProcessQuestions(result);
}
});
return Task.CompletedTask;
}
private IObservable<int> CreateLiveWebsocket()
{
const string wsEndpoint = "wss://qa.sockets.stackexchange.com/";
const string homePage = "https://stackoverflow.com";
var websocket = Observable.Create<int>(async observer =>
{
var webSocket = new PlainWebSocket(wsEndpoint, new Dictionary<string, string> {{"Origin", homePage}});
webSocket.OnTextMessage += async message =>
{
try
{
var messageObject = JsonConvert.DeserializeObject<JObject>(message);
var dataStr = messageObject["data"].Value<string>();
if (string.Equals(dataStr, "pong"))
{
await webSocket.SendAsync("pong");
return;
}
var payload = JsonConvert.DeserializeObject<JObject>(dataStr);
var questionId = payload.First.First.Value<int>();
observer.OnNext(questionId);
}
catch (Exception ex)
{
_logger.LogError("Failed to process 552-home-active for 'https://stackoverflow.com'. Message received: " + message, ex);
}
};
await webSocket.ConnectAsync();
await webSocket.SendAsync("552-home-active");
return Disposable.Empty;
});
return websocket;
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
}
This diff is collapsed.
......@@ -77,6 +77,7 @@ namespace Rodgort
services.AddTransient<BurnakiFollowService>();
services.AddTransient<BurnProcessingService>();
services.AddTransient<BurnCatchupService>();
services.AddTransient<MetaCrawlerService>();
services.AddTransient(_ => new HttpClient(new HttpClientHandler { AutomaticDecompression = DecompressionMethods.GZip | DecompressionMethods.Deflate }));
services.AddTransient(_ => new HttpClientWithHandler(new HttpClientHandler { AutomaticDecompression = DecompressionMethods.GZip | DecompressionMethods.Deflate }));
......@@ -94,6 +95,8 @@ namespace Rodgort
services.AddScoped<NewBurninationService>();
services.AddHostedService<BurnakiFollowService>();
services.AddHostedService<LiveMetaQuestionWatcherService>();
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
......
......@@ -63,11 +63,36 @@ namespace StackExchangeChat
_socket.Options.SetRequestHeader(kv.Key, kv.Value);
await _socket.ConnectAsync(Endpoint, _socketTokenSource.Token);
new Thread(Listen).Start();
InvokeAsync(OnOpen);
}
public async Task SendAsync(string message)
{
if (_dispose) return;
if (_socket?.State != WebSocketState.Open)
throw new Exception("The WebSocket must be open before attempting to send a message.");
var bytes = Encoding.UTF8.GetBytes(message);
await SendAsync(bytes, WebSocketMessageType.Text);
}
public async Task SendAsync(byte[] bytes, WebSocketMessageType messageType)
{
if (_dispose) return;
if (_socket?.State != WebSocketState.Open)
throw new Exception("The WebSocket must be open before attempting to send a message.");
var bytesSegment = new ArraySegment<byte>(bytes);
await _socket.SendAsync(bytesSegment, messageType, true, _socketTokenSource.Token);
}
private void Listen()
{
while (!_dispose)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment