Move to SSE from websocket

This commit is contained in:
Infi
2024-08-24 16:49:56 +02:00
parent 5e93732183
commit c9be7c9c2c
9 changed files with 133 additions and 111 deletions
+78
View File
@@ -0,0 +1,78 @@
using System.Text;
using ChatTwo.Http.MessageProtocol;
using Newtonsoft.Json;
using WatsonWebserver.Core;
namespace ChatTwo.Http;
public class EventServer
{
private bool Stopping;
private bool Finished;
private readonly CancellationToken Token;
public readonly Stack<BaseMessage> OutboundStack = new();
private long Index;
public EventServer(CancellationToken token)
{
Token = token;
}
public async Task HandleEventLoop(HttpContextBase ctx)
{
try
{
ctx.Response.Headers.Add("Content-Type", "text/event-stream");
ctx.Response.Headers.Add("Cache-Control", "no-cache");
ctx.Response.Headers.Add("Connection", "keep-alive");
ctx.Response.ChunkedTransfer = true;
while (!Token.IsCancellationRequested && !Stopping)
{
await Task.Delay(10, Token);
if (Token.IsCancellationRequested)
return;
if (!OutboundStack.TryPop(out var message))
continue;
var data = JsonConvert.SerializeObject(message);
await ctx.Response.SendChunk(Encoding.UTF8.GetBytes($"id: {Index}\ndata: {data}\n\n"), Token);
Index++;
}
}
catch (TaskCanceledException)
{
// Ignore
}
catch (Exception ex)
{
Plugin.Log.Error(ex, "Event queued failed to continue");
}
finally
{
// No Content (204) didn't work for Firefox, so manually closing the connection on client side
await ctx.Response.SendFinalChunk("data: closing\nevent: close\n\n"u8.ToArray());
Finished = true;
}
}
public async ValueTask DisposeAsync()
{
Stopping = true;
var timeout = 1000; // 1000ms
while (timeout > 0)
{
if (Finished)
break;
timeout -= 100;
await Task.Delay(100);
Plugin.Log.Debug("Sleeping because EventServer still alive");
}
}
}
@@ -8,14 +8,14 @@ public struct MessageResponse()
[JsonProperty("messageHTML")] public string Message = ""; [JsonProperty("messageHTML")] public string Message = "";
} }
public class WebSocketNewMessage(MessageResponse[] messages) : BaseOutboundMessage(MessageName) public class NewMessage(MessageResponse[] messages) : BaseMessage(MessageName)
{ {
private const string MessageName = "chat-message"; private const string MessageName = "chat-message";
[JsonProperty("messages")] public MessageResponse[] Messages { get; set; } = messages; [JsonProperty("messages")] public MessageResponse[] Messages { get; set; } = messages;
} }
public class BaseOutboundMessage(string messageType) public class BaseMessage(string messageType)
{ {
[JsonProperty("messageType")] public string MessageType { get; set; } = messageType; [JsonProperty("messageType")] public string MessageType { get; set; } = messageType;
} }
+2 -2
View File
@@ -26,10 +26,10 @@ public class Processing
return string.Join("", messages); return string.Join("", messages);
} }
internal async Task<List<MessageResponse>> ReadMessageList() internal async Task<MessageResponse[]> ReadMessageList()
{ {
var tabMessages = await Plugin.ChatLogWindow.CurrentTab!.Messages.GetCopy(); var tabMessages = await Plugin.ChatLogWindow.CurrentTab!.Messages.GetCopy();
return tabMessages.Select(ReadMessageContent).ToList(); return tabMessages.Select(ReadMessageContent).ToArray();
} }
internal MessageResponse ReadMessageContent(Message message) internal MessageResponse ReadMessageContent(Message message)
+25 -1
View File
@@ -1,4 +1,5 @@
using Lumina.Data.Files; using ChatTwo.Http.MessageProtocol;
using Lumina.Data.Files;
using WatsonWebserver.Core; using WatsonWebserver.Core;
using HttpMethod = WatsonWebserver.Core.HttpMethod; using HttpMethod = WatsonWebserver.Core.HttpMethod;
@@ -33,6 +34,8 @@ public class RouteController
Core.HostContext.Routes.PostAuthentication.Static.Add(HttpMethod.GET, "/chat", ChatBoxRoute, ExceptionRoute); Core.HostContext.Routes.PostAuthentication.Static.Add(HttpMethod.GET, "/chat", ChatBoxRoute, ExceptionRoute);
Core.HostContext.Routes.PostAuthentication.Static.Add(HttpMethod.POST, "/send", ReceiveMessage, ExceptionRoute); Core.HostContext.Routes.PostAuthentication.Static.Add(HttpMethod.POST, "/send", ReceiveMessage, ExceptionRoute);
Core.HostContext.Routes.PostAuthentication.Parameter.Add(HttpMethod.GET, "/emote/{name}", GetEmote, ExceptionRoute); Core.HostContext.Routes.PostAuthentication.Parameter.Add(HttpMethod.GET, "/emote/{name}", GetEmote, ExceptionRoute);
Core.HostContext.Routes.PreAuthentication.Parameter.Add(HttpMethod.GET, "/sse", StartServerEvent, ExceptionRoute);
} }
private async Task ExceptionRoute(HttpContextBase ctx, Exception _) private async Task ExceptionRoute(HttpContextBase ctx, Exception _)
@@ -149,5 +152,26 @@ public class RouteController
await ctx.Response.Send("Message was send to the channel."); await ctx.Response.Send("Message was send to the channel.");
} }
private async Task StartServerEvent(HttpContextBase ctx)
{
try
{
Plugin.Log.Information($"Client connected: {ctx.Guid}");
var sse = new EventServer(Core.TokenSource.Token);
Core.EventConnections.Add(sse);
// TODO Check if reconnect or new connection
var messages = await WebserverUtil.FrameworkWrapper(Core.Processing.ReadMessageList);
sse.OutboundStack.Push(new NewMessage(messages.ToArray()));
await sse.HandleEventLoop(ctx);
}
catch (Exception ex)
{
Plugin.Log.Error(ex, "Failed to finish the server event function");
}
}
#endregion #endregion
} }
+14 -35
View File
@@ -1,24 +1,23 @@
using ChatTwo.Http.MessageProtocol; using ChatTwo.Http.MessageProtocol;
using EmbedIO;
using WatsonWebserver.Core; using WatsonWebserver.Core;
using WatsonWebserver.Lite; using WatsonWebserver.Lite;
using ExceptionEventArgs = WatsonWebserver.Core.ExceptionEventArgs; using ExceptionEventArgs = WatsonWebserver.Core.ExceptionEventArgs;
namespace ChatTwo.Http; namespace ChatTwo.Http;
public class ServerCore : IDisposable public class ServerCore : IAsyncDisposable
{ {
private readonly Plugin Plugin; private readonly Plugin Plugin;
private readonly Processing Processing; internal readonly Processing Processing;
private readonly RouteController RouteController; internal readonly RouteController RouteController;
internal readonly WebserverLite HostContext; internal readonly WebserverLite HostContext;
private readonly WebSocketServer Websocket;
private readonly WebServer Host;
internal readonly CancellationTokenSource TokenSource = new(); internal readonly CancellationTokenSource TokenSource = new();
internal readonly string StaticDir = Path.Combine(Plugin.Interface.AssemblyLocation.DirectoryName!, "Http"); internal readonly string StaticDir = Path.Combine(Plugin.Interface.AssemblyLocation.DirectoryName!, "Http");
internal readonly List<EventServer> EventConnections = [];
public ServerCore(Plugin plugin) public ServerCore(Plugin plugin)
{ {
Plugin = plugin; Plugin = plugin;
@@ -37,37 +36,18 @@ public class ServerCore : IDisposable
HostContext.Settings.Debug.Responses = true; HostContext.Settings.Debug.Responses = true;
HostContext.Settings.Debug.AccessControl = true; HostContext.Settings.Debug.AccessControl = true;
HostContext.Events.Logger = logMessage => Plugin.Log.Information(logMessage); HostContext.Events.Logger = logMessage => Plugin.Log.Information(logMessage);
// Websocket
Host = new WebServer(o => o
.WithUrlPrefixes($"http://*:9001")
.WithMode(HttpListenerMode.EmbedIO)
);
Websocket = new WebSocketServer("/ws");
Host.WithModule(Websocket);
Websocket.OnClientConnected += ClientConnected;
}
#region WebsocketFunctions
private void ClientConnected(object? sender, EventArgs args)
{
Task.Run(async () =>
{
var messages = await WebserverUtil.FrameworkWrapper(Processing.ReadMessageList);
Websocket.BroadcastMessage(new WebSocketNewMessage(messages.ToArray()));
});
} }
#region SSEFunctions
internal void SendNewMessage(Message message) internal void SendNewMessage(Message message)
{ {
try try
{ {
Plugin.Framework.RunOnTick(() => Plugin.Framework.RunOnTick(() =>
{ {
Websocket.BroadcastMessage(new WebSocketNewMessage([Processing.ReadMessageContent(message)])); var bundledMessage = new NewMessage([Processing.ReadMessageContent(message)]);
foreach (var eventServer in EventConnections)
eventServer.OutboundStack.Push(bundledMessage);
}); });
} }
catch (Exception ex) catch (Exception ex)
@@ -111,7 +91,6 @@ public class ServerCore : IDisposable
try try
{ {
HostContext.Start(TokenSource.Token); HostContext.Start(TokenSource.Token);
Host.Start(TokenSource.Token);
} }
catch (Exception ex) catch (Exception ex)
{ {
@@ -119,16 +98,16 @@ public class ServerCore : IDisposable
} }
} }
public void Dispose() public async ValueTask DisposeAsync()
{ {
TokenSource.Cancel(); await TokenSource.CancelAsync();
foreach (var eventServer in EventConnections)
await eventServer.DisposeAsync();
HostContext.Stop(); HostContext.Stop();
HostContext.Dispose(); HostContext.Dispose();
Websocket.Dispose();
Host.Dispose();
RouteController.Dispose(); RouteController.Dispose();
} }
} }
-48
View File
@@ -1,48 +0,0 @@
using ChatTwo.Http.MessageProtocol;
using EmbedIO.WebSockets;
using Newtonsoft.Json;
namespace ChatTwo.Http;
public class WebSocketServer : WebSocketModule {
private readonly SemaphoreSlim SendLock = new(1, 1);
public event EventHandler? OnClientConnected;
public WebSocketServer(string urlPath) : base(urlPath, true) {
}
protected override async Task OnMessageReceivedAsync(IWebSocketContext context, byte[] buffer, IWebSocketReceiveResult result)
{
// Unused method
}
protected override Task OnClientConnectedAsync(IWebSocketContext context)
{
Plugin.Log.Information($"Client connected: {context.Id}");
OnClientConnected?.Invoke(this, EventArgs.Empty);
return base.OnClientConnectedAsync(context);
}
protected override Task OnClientDisconnectedAsync(IWebSocketContext context)
{
Plugin.Log.Information($"Client disconnected: {context.Id}");
return base.OnClientConnectedAsync(context);
}
protected override void Dispose(bool disposing) {
base.Dispose(disposing);
SendLock.Dispose();
}
public void BroadcastMessage(BaseOutboundMessage message) {
Task.Run(async () => {
using (await SendLock.UseWaitAsync()) {
var serializedData = JsonConvert.SerializeObject(message);
await BroadcastAsync(serializedData);
}
});
}
}
+2 -2
View File
@@ -1,7 +1,7 @@
/* fonts */ /* fonts */
@font-face { @font-face {
font-family: Lodestone; font-family: Lodestone;
src: url('files/FFXIV_Lodestone_SSF.ttf') format('truetype'); src: url('/files/FFXIV_Lodestone_SSF.ttf') format('truetype');
unicode-range: U+E020-E0DB; unicode-range: U+E020-E0DB;
} }
@@ -9,7 +9,7 @@
font-family: 'Inter var'; font-family: 'Inter var';
font-weight: 100 900; font-weight: 100 900;
font-style: oblique 0deg 10deg; font-style: oblique 0deg 10deg;
src: url('static/Inter.var.woff2') format('woff2'); src: url('Inter.var.woff2') format('woff2');
} }
/* variables */ /* variables */
+9 -20
View File
@@ -1,31 +1,20 @@
// websocket connection // websocket connection
class WSConnection { class SSEConnection {
constructor() { constructor() {
this.socket = new WebSocket('ws://192.168.2.106:9001/ws'); this.socket = new EventSource('/sse', );
this.socket.addEventListener('open', this.onWSOpen.bind(this));
this.socket.addEventListener('close', this.onWSClose.bind(this));
this.socket.addEventListener('message', this.onWSMessage.bind(this));
}
onWSOpen() { this.socket.addEventListener('close', (e) => {
// send request for initial data (channels, currently selected channel) console.log("Closing SSE connection.")
} this.socket.close()
});
onWSClose() { this.socket.onmessage = (event) => {
// open new websocket here? for mobile
}
onWSMessage(event) {
try {
let eventData = JSON.parse(event.data); let eventData = JSON.parse(event.data);
for (let message of eventData.messages) for (let message of eventData.messages)
{ {
addMessage(message); addMessage(message);
} }
} catch (error) { };
// TODO: error handling?
return;
}
} }
send(message) { send(message) {
@@ -33,7 +22,7 @@ class WSConnection {
} }
} }
const ws = new WSConnection(); const sse = new SSEConnection();
// channel switcher // channel switcher
+1 -1
View File
@@ -167,7 +167,7 @@ public sealed class Plugin : IDalamudPlugin
Commands?.Dispose(); Commands?.Dispose();
EmoteCache.Dispose(); EmoteCache.Dispose();
ServerCore.Dispose(); ServerCore?.DisposeAsync().AsTask().Wait();
} }
private void Draw() private void Draw()