From c9be7c9c2c51ff2a1e19395d2ae15031abcea31d Mon Sep 17 00:00:00 2001 From: Infi Date: Sat, 24 Aug 2024 16:49:56 +0200 Subject: [PATCH] Move to SSE from websocket --- ChatTwo/Http/EventServer.cs | 78 +++++++++++++++++++ .../Http/MessageProtocol/OutgoingMessage.cs | 4 +- ChatTwo/Http/Processing.cs | 4 +- ChatTwo/Http/RouteController.cs | 26 ++++++- ChatTwo/Http/ServerCore.cs | 49 ++++-------- ChatTwo/Http/Websocket.cs | 48 ------------ ChatTwo/Http/static/start.css | 4 +- ChatTwo/Http/static/start.js | 29 +++---- ChatTwo/Plugin.cs | 2 +- 9 files changed, 133 insertions(+), 111 deletions(-) create mode 100644 ChatTwo/Http/EventServer.cs delete mode 100644 ChatTwo/Http/Websocket.cs diff --git a/ChatTwo/Http/EventServer.cs b/ChatTwo/Http/EventServer.cs new file mode 100644 index 0000000..16f33b5 --- /dev/null +++ b/ChatTwo/Http/EventServer.cs @@ -0,0 +1,78 @@ +using System.Text; +using ChatTwo.Http.MessageProtocol; +using Newtonsoft.Json; +using WatsonWebserver.Core; + +namespace ChatTwo.Http; + +public class EventServer +{ + private bool Stopping; + private bool Finished; + private readonly CancellationToken Token; + + public readonly Stack OutboundStack = new(); + + private long Index; + + public EventServer(CancellationToken token) + { + Token = token; + } + + public async Task HandleEventLoop(HttpContextBase ctx) + { + try + { + ctx.Response.Headers.Add("Content-Type", "text/event-stream"); + ctx.Response.Headers.Add("Cache-Control", "no-cache"); + ctx.Response.Headers.Add("Connection", "keep-alive"); + + ctx.Response.ChunkedTransfer = true; + while (!Token.IsCancellationRequested && !Stopping) + { + await Task.Delay(10, Token); + if (Token.IsCancellationRequested) + return; + + if (!OutboundStack.TryPop(out var message)) + continue; + + var data = JsonConvert.SerializeObject(message); + await ctx.Response.SendChunk(Encoding.UTF8.GetBytes($"id: {Index}\ndata: {data}\n\n"), Token); + Index++; + } + } + catch (TaskCanceledException) + { + // Ignore + } + catch (Exception ex) + { + Plugin.Log.Error(ex, "Event queued failed to continue"); + } + finally + { + // No Content (204) didn't work for Firefox, so manually closing the connection on client side + await ctx.Response.SendFinalChunk("data: closing\nevent: close\n\n"u8.ToArray()); + + Finished = true; + } + } + + public async ValueTask DisposeAsync() + { + Stopping = true; + + var timeout = 1000; // 1000ms + while (timeout > 0) + { + if (Finished) + break; + + timeout -= 100; + await Task.Delay(100); + Plugin.Log.Debug("Sleeping because EventServer still alive"); + } + } +} \ No newline at end of file diff --git a/ChatTwo/Http/MessageProtocol/OutgoingMessage.cs b/ChatTwo/Http/MessageProtocol/OutgoingMessage.cs index 210dba4..0b5084f 100644 --- a/ChatTwo/Http/MessageProtocol/OutgoingMessage.cs +++ b/ChatTwo/Http/MessageProtocol/OutgoingMessage.cs @@ -8,14 +8,14 @@ public struct MessageResponse() [JsonProperty("messageHTML")] public string Message = ""; } -public class WebSocketNewMessage(MessageResponse[] messages) : BaseOutboundMessage(MessageName) +public class NewMessage(MessageResponse[] messages) : BaseMessage(MessageName) { private const string MessageName = "chat-message"; [JsonProperty("messages")] public MessageResponse[] Messages { get; set; } = messages; } -public class BaseOutboundMessage(string messageType) +public class BaseMessage(string messageType) { [JsonProperty("messageType")] public string MessageType { get; set; } = messageType; } \ No newline at end of file diff --git a/ChatTwo/Http/Processing.cs b/ChatTwo/Http/Processing.cs index 087564d..c0c48e8 100644 --- a/ChatTwo/Http/Processing.cs +++ b/ChatTwo/Http/Processing.cs @@ -26,10 +26,10 @@ public class Processing return string.Join("", messages); } - internal async Task> ReadMessageList() + internal async Task ReadMessageList() { var tabMessages = await Plugin.ChatLogWindow.CurrentTab!.Messages.GetCopy(); - return tabMessages.Select(ReadMessageContent).ToList(); + return tabMessages.Select(ReadMessageContent).ToArray(); } internal MessageResponse ReadMessageContent(Message message) diff --git a/ChatTwo/Http/RouteController.cs b/ChatTwo/Http/RouteController.cs index 32e57a3..69aae86 100644 --- a/ChatTwo/Http/RouteController.cs +++ b/ChatTwo/Http/RouteController.cs @@ -1,4 +1,5 @@ -using Lumina.Data.Files; +using ChatTwo.Http.MessageProtocol; +using Lumina.Data.Files; using WatsonWebserver.Core; using HttpMethod = WatsonWebserver.Core.HttpMethod; @@ -33,6 +34,8 @@ public class RouteController Core.HostContext.Routes.PostAuthentication.Static.Add(HttpMethod.GET, "/chat", ChatBoxRoute, ExceptionRoute); Core.HostContext.Routes.PostAuthentication.Static.Add(HttpMethod.POST, "/send", ReceiveMessage, ExceptionRoute); Core.HostContext.Routes.PostAuthentication.Parameter.Add(HttpMethod.GET, "/emote/{name}", GetEmote, ExceptionRoute); + + Core.HostContext.Routes.PreAuthentication.Parameter.Add(HttpMethod.GET, "/sse", StartServerEvent, ExceptionRoute); } private async Task ExceptionRoute(HttpContextBase ctx, Exception _) @@ -149,5 +152,26 @@ public class RouteController await ctx.Response.Send("Message was send to the channel."); } + + private async Task StartServerEvent(HttpContextBase ctx) + { + try + { + Plugin.Log.Information($"Client connected: {ctx.Guid}"); + + var sse = new EventServer(Core.TokenSource.Token); + Core.EventConnections.Add(sse); + + // TODO Check if reconnect or new connection + var messages = await WebserverUtil.FrameworkWrapper(Core.Processing.ReadMessageList); + sse.OutboundStack.Push(new NewMessage(messages.ToArray())); + + await sse.HandleEventLoop(ctx); + } + catch (Exception ex) + { + Plugin.Log.Error(ex, "Failed to finish the server event function"); + } + } #endregion } \ No newline at end of file diff --git a/ChatTwo/Http/ServerCore.cs b/ChatTwo/Http/ServerCore.cs index c799950..5bda16d 100644 --- a/ChatTwo/Http/ServerCore.cs +++ b/ChatTwo/Http/ServerCore.cs @@ -1,24 +1,23 @@ using ChatTwo.Http.MessageProtocol; -using EmbedIO; using WatsonWebserver.Core; using WatsonWebserver.Lite; using ExceptionEventArgs = WatsonWebserver.Core.ExceptionEventArgs; namespace ChatTwo.Http; -public class ServerCore : IDisposable +public class ServerCore : IAsyncDisposable { private readonly Plugin Plugin; - private readonly Processing Processing; - private readonly RouteController RouteController; + internal readonly Processing Processing; + internal readonly RouteController RouteController; internal readonly WebserverLite HostContext; - private readonly WebSocketServer Websocket; - private readonly WebServer Host; internal readonly CancellationTokenSource TokenSource = new(); internal readonly string StaticDir = Path.Combine(Plugin.Interface.AssemblyLocation.DirectoryName!, "Http"); + internal readonly List EventConnections = []; + public ServerCore(Plugin plugin) { Plugin = plugin; @@ -37,37 +36,18 @@ public class ServerCore : IDisposable HostContext.Settings.Debug.Responses = true; HostContext.Settings.Debug.AccessControl = true; HostContext.Events.Logger = logMessage => Plugin.Log.Information(logMessage); - - - // Websocket - Host = new WebServer(o => o - .WithUrlPrefixes($"http://*:9001") - .WithMode(HttpListenerMode.EmbedIO) - ); - - Websocket = new WebSocketServer("/ws"); - Host.WithModule(Websocket); - - Websocket.OnClientConnected += ClientConnected; - } - - #region WebsocketFunctions - private void ClientConnected(object? sender, EventArgs args) - { - Task.Run(async () => - { - var messages = await WebserverUtil.FrameworkWrapper(Processing.ReadMessageList); - Websocket.BroadcastMessage(new WebSocketNewMessage(messages.ToArray())); - }); } + #region SSEFunctions internal void SendNewMessage(Message message) { try { Plugin.Framework.RunOnTick(() => { - Websocket.BroadcastMessage(new WebSocketNewMessage([Processing.ReadMessageContent(message)])); + var bundledMessage = new NewMessage([Processing.ReadMessageContent(message)]); + foreach (var eventServer in EventConnections) + eventServer.OutboundStack.Push(bundledMessage); }); } catch (Exception ex) @@ -111,7 +91,6 @@ public class ServerCore : IDisposable try { HostContext.Start(TokenSource.Token); - Host.Start(TokenSource.Token); } catch (Exception ex) { @@ -119,16 +98,16 @@ public class ServerCore : IDisposable } } - public void Dispose() + public async ValueTask DisposeAsync() { - TokenSource.Cancel(); + await TokenSource.CancelAsync(); + + foreach (var eventServer in EventConnections) + await eventServer.DisposeAsync(); HostContext.Stop(); HostContext.Dispose(); - Websocket.Dispose(); - Host.Dispose(); - RouteController.Dispose(); } } \ No newline at end of file diff --git a/ChatTwo/Http/Websocket.cs b/ChatTwo/Http/Websocket.cs deleted file mode 100644 index 0a94082..0000000 --- a/ChatTwo/Http/Websocket.cs +++ /dev/null @@ -1,48 +0,0 @@ -using ChatTwo.Http.MessageProtocol; -using EmbedIO.WebSockets; -using Newtonsoft.Json; - -namespace ChatTwo.Http; - -public class WebSocketServer : WebSocketModule { - private readonly SemaphoreSlim SendLock = new(1, 1); - - public event EventHandler? OnClientConnected; - - public WebSocketServer(string urlPath) : base(urlPath, true) { - - } - - protected override async Task OnMessageReceivedAsync(IWebSocketContext context, byte[] buffer, IWebSocketReceiveResult result) - { - // Unused method - } - - protected override Task OnClientConnectedAsync(IWebSocketContext context) - { - Plugin.Log.Information($"Client connected: {context.Id}"); - OnClientConnected?.Invoke(this, EventArgs.Empty); - return base.OnClientConnectedAsync(context); - } - - protected override Task OnClientDisconnectedAsync(IWebSocketContext context) - { - Plugin.Log.Information($"Client disconnected: {context.Id}"); - return base.OnClientConnectedAsync(context); - } - - protected override void Dispose(bool disposing) { - base.Dispose(disposing); - - SendLock.Dispose(); - } - - public void BroadcastMessage(BaseOutboundMessage message) { - Task.Run(async () => { - using (await SendLock.UseWaitAsync()) { - var serializedData = JsonConvert.SerializeObject(message); - await BroadcastAsync(serializedData); - } - }); - } -} \ No newline at end of file diff --git a/ChatTwo/Http/static/start.css b/ChatTwo/Http/static/start.css index abb0e1d..32c2141 100644 --- a/ChatTwo/Http/static/start.css +++ b/ChatTwo/Http/static/start.css @@ -1,7 +1,7 @@ /* fonts */ @font-face { font-family: Lodestone; - src: url('files/FFXIV_Lodestone_SSF.ttf') format('truetype'); + src: url('/files/FFXIV_Lodestone_SSF.ttf') format('truetype'); unicode-range: U+E020-E0DB; } @@ -9,7 +9,7 @@ font-family: 'Inter var'; font-weight: 100 900; font-style: oblique 0deg 10deg; - src: url('static/Inter.var.woff2') format('woff2'); + src: url('Inter.var.woff2') format('woff2'); } /* variables */ diff --git a/ChatTwo/Http/static/start.js b/ChatTwo/Http/static/start.js index e39a7a2..f645b8a 100644 --- a/ChatTwo/Http/static/start.js +++ b/ChatTwo/Http/static/start.js @@ -1,31 +1,20 @@ // websocket connection -class WSConnection { +class SSEConnection { constructor() { - this.socket = new WebSocket('ws://192.168.2.106:9001/ws'); - this.socket.addEventListener('open', this.onWSOpen.bind(this)); - this.socket.addEventListener('close', this.onWSClose.bind(this)); - this.socket.addEventListener('message', this.onWSMessage.bind(this)); - } + this.socket = new EventSource('/sse', ); - onWSOpen() { - // send request for initial data (channels, currently selected channel) - } + this.socket.addEventListener('close', (e) => { + console.log("Closing SSE connection.") + this.socket.close() + }); - onWSClose() { - // open new websocket here? for mobile - } - - onWSMessage(event) { - try { + this.socket.onmessage = (event) => { let eventData = JSON.parse(event.data); for (let message of eventData.messages) { addMessage(message); } - } catch (error) { - // TODO: error handling? - return; - } + }; } send(message) { @@ -33,7 +22,7 @@ class WSConnection { } } -const ws = new WSConnection(); +const sse = new SSEConnection(); // channel switcher diff --git a/ChatTwo/Plugin.cs b/ChatTwo/Plugin.cs index b974ce3..d37747d 100755 --- a/ChatTwo/Plugin.cs +++ b/ChatTwo/Plugin.cs @@ -167,7 +167,7 @@ public sealed class Plugin : IDalamudPlugin Commands?.Dispose(); EmoteCache.Dispose(); - ServerCore.Dispose(); + ServerCore?.DisposeAsync().AsTask().Wait(); } private void Draw()