Redo the message protocols to work with SSE data directly

This commit is contained in:
Infi
2024-08-24 19:52:53 +02:00
parent c7a52e8edb
commit 29e3c6acee
8 changed files with 98 additions and 54 deletions
@@ -0,0 +1,19 @@
using Newtonsoft.Json;
namespace ChatTwo.Http.MessageProtocol;
public struct SwitchChannel(string name)
{
[JsonProperty("channel")] public string Name = name;
}
public struct Messages(MessageResponse[] set)
{
[JsonProperty("messages")] public MessageResponse[] Set = set;
}
public struct MessageResponse()
{
[JsonProperty("timestamp")] public string Timestamp = "";
[JsonProperty("messageHTML")] public string Message = "";
}
+14 -13
View File
@@ -1,21 +1,22 @@
using Newtonsoft.Json;
using System.Text;
using Newtonsoft.Json;
namespace ChatTwo.Http.MessageProtocol;
public struct MessageResponse()
{
[JsonProperty("timestamp")] public string Timestamp = "";
[JsonProperty("messageHTML")] public string Message = "";
}
public class CloseEvent() : BaseEvent("close");
public class NewMessage(MessageResponse[] messages) : BaseMessage(MessageName)
{
private const string MessageName = "chat-message";
public class SwitchChannelEvent(SwitchChannel switchChannel) : BaseEvent("switch-channel", JsonConvert.SerializeObject(switchChannel));
[JsonProperty("messages")] public MessageResponse[] Messages { get; set; } = messages;
}
public class NewMessageEvent(Messages messages) : BaseEvent("new-message", JsonConvert.SerializeObject(messages));
public class BaseMessage(string messageType)
public class BaseEvent(string eventType, string? data = null)
{
[JsonProperty("messageType")] public string MessageType { get; set; } = messageType;
private string Event = eventType;
private string Data = data ?? "0"; // SSE requires data on each response
public byte[] Build()
{
// SSE always ends with \n\n
return Encoding.UTF8.GetBytes($"event: {Event}\ndata: {Data}\n\n");
}
}
+2 -6
View File
@@ -17,13 +17,9 @@ public class Processing
Plugin = plugin;
}
public string ReadChannelName()
public string ReadChannelName(Chunk[] channelName)
{
var messages = new List<string>();
foreach (var chunk in Plugin.ChatLogWindow.ReadChannelName(Plugin.ChatLogWindow.CurrentTab))
messages.Add(ProcessChunk(chunk, noColor: true));
return string.Join("", messages);
return string.Join("", channelName.Select(chunk => ProcessChunk(chunk, noColor: true)));
}
internal async Task<MessageResponse[]> ReadMessageList()
+6 -3
View File
@@ -35,7 +35,8 @@ public class RouteController
Core.HostContext.Routes.PostAuthentication.Static.Add(HttpMethod.POST, "/send", ReceiveMessage, ExceptionRoute);
Core.HostContext.Routes.PostAuthentication.Parameter.Add(HttpMethod.GET, "/emote/{name}", GetEmote, ExceptionRoute);
Core.HostContext.Routes.PreAuthentication.Parameter.Add(HttpMethod.GET, "/sse", StartServerEvent, ExceptionRoute);
// Server-Sent Events Route
Core.HostContext.Routes.PostAuthentication.Static.Add(HttpMethod.GET, "/sse", NewServerEvent, ExceptionRoute);
}
private async Task ExceptionRoute(HttpContextBase ctx, Exception _)
@@ -153,7 +154,7 @@ public class RouteController
await ctx.Response.Send("Message was send to the channel.");
}
private async Task StartServerEvent(HttpContextBase ctx)
private async Task NewServerEvent(HttpContextBase ctx)
{
try
{
@@ -164,7 +165,9 @@ public class RouteController
// TODO Check if reconnect or new connection
var messages = await WebserverUtil.FrameworkWrapper(Core.Processing.ReadMessageList);
sse.OutboundStack.Push(new NewMessage(messages.ToArray()));
var channelName = await Plugin.Framework.RunOnTick(() => Core.Processing.ReadChannelName(Plugin.ChatLogWindow.PreviousChannel));
sse.OutboundQueue.Enqueue(new NewMessageEvent(new Messages(messages)));
sse.OutboundQueue.Enqueue(new SwitchChannelEvent(new SwitchChannel(channelName)));
await sse.HandleEventLoop(ctx);
+5 -7
View File
@@ -1,6 +1,5 @@
using System.Text;
using ChatTwo.Http.MessageProtocol;
using Newtonsoft.Json;
using WatsonWebserver.Core;
namespace ChatTwo.Http;
@@ -12,7 +11,7 @@ public class SSEConnection
private readonly CancellationToken Token;
public bool Done;
public readonly Stack<BaseMessage> OutboundStack = new();
public readonly Queue<BaseEvent> OutboundQueue = new();
public SSEConnection(CancellationToken token)
{
@@ -34,11 +33,10 @@ public class SSEConnection
if (Token.IsCancellationRequested)
return;
if (!OutboundStack.TryPop(out var message))
if (!OutboundQueue.TryDequeue(out var outgoingEvent))
continue;
var data = JsonConvert.SerializeObject(message);
await ctx.Response.SendChunk(Encoding.UTF8.GetBytes($"id: {Index}\ndata: {data}\n\n"), Token);
await ctx.Response.SendChunk(outgoingEvent.Build(), Token);
Index++;
}
}
@@ -48,12 +46,12 @@ public class SSEConnection
}
catch (Exception ex)
{
Plugin.Log.Error(ex, "Event queued failed to continue");
Plugin.Log.Error(ex, "SSE handler failed.");
}
finally
{
// "No Content" (204) didn't work for Firefox, so manually closing the connection on client side
await ctx.Response.SendFinalChunk("data: closing\nevent: close\n\n"u8.ToArray());
await ctx.Response.SendFinalChunk(new CloseEvent().Build());
Done = true;
}
+23 -6
View File
@@ -45,14 +45,31 @@ public class ServerCore : IAsyncDisposable
{
Plugin.Framework.RunOnTick(() =>
{
var bundledMessage = new NewMessage([Processing.ReadMessageContent(message)]);
var bundledResponse = new NewMessageEvent(new Messages([Processing.ReadMessageContent(message)]));
foreach (var eventServer in EventConnections)
eventServer.OutboundStack.Push(bundledMessage);
eventServer.OutboundQueue.Enqueue(bundledResponse);
});
}
catch (Exception ex)
{
Plugin.Log.Error(ex, "Send message to websockets failed.");
Plugin.Log.Error(ex, "Sending message over SSE failed.");
}
}
internal void SendChannelSwitch(Chunk[] channelName)
{
try
{
Plugin.Framework.RunOnTick(() =>
{
var bundledResponse = new SwitchChannelEvent(new SwitchChannel(Processing.ReadChannelName(channelName)));
foreach (var eventServer in EventConnections)
eventServer.OutboundQueue.Enqueue(bundledResponse);
});
}
catch (Exception ex)
{
Plugin.Log.Error(ex, "Sending channel switch over SSE failed.");
}
}
#endregion
@@ -101,13 +118,13 @@ public class ServerCore : IAsyncDisposable
public async ValueTask DisposeAsync()
{
await TokenSource.CancelAsync();
HostContext.Stop();
foreach (var eventServer in EventConnections)
// We get a copy, so that the original can be cleaned up succesfully
foreach (var eventServer in EventConnections.ToArray())
await eventServer.DisposeAsync();
HostContext.Stop();
HostContext.Dispose();
RouteController.Dispose();
}
}
+7 -3
View File
@@ -3,18 +3,22 @@ class SSEConnection {
constructor() {
this.socket = new EventSource('/sse', );
this.socket.addEventListener('close', (e) => {
this.socket.addEventListener('close', (event) => {
console.log("Closing SSE connection.")
this.socket.close()
});
this.socket.onmessage = (event) => {
this.socket.addEventListener('switch-channel', (event) => {
updateChannelHint(JSON.parse(event.data).channel)
});
this.socket.addEventListener('new-message', (event) => {
let eventData = JSON.parse(event.data);
for (let message of eventData.messages)
{
addMessage(message);
}
};
});
}
send(message) {