feat(messagestore): async background FTS5 bulk-insert with progress notification

Adds the worker that fills the messages_fts virtual table after Migrate4.
The bulk-insert runs off the framework thread on its own SqliteConnection
opened via OpenSecondaryConnection -- WAL lets the live UpsertMessage
path on the primary Connection keep flowing, and the worker's writer
lock yields every 500 rows with a 5ms breather so PendingMessageThread
does not hit "database is locked" after DefaultTimeout=5s.

InitFtsReadyCache runs in the ctor and short-circuits to ready=true when
the index is already populated or when the messages table is empty. The
DbViewer (Task 4.4) reads IsFtsIndexBuilt per frame as a single volatile
field read.

Plugin.cs LoadAsync kicks the worker after FilterAllTabsAsync, gated on
IsFtsIndexBuilt and a CancellationTokenSource that DisposeAsync cancels
before MessageManager tears down. Progress reports back via IActiveNotification,
marshalled onto the framework thread via Framework.RunOnTick. Success path
finishes the notification as Success with a 5s linger; cancellation
dismisses it; an error swaps the type to Error with a fallback hint.
This commit is contained in:
2026-05-13 20:38:05 +02:00
parent 7f317a2b18
commit d26c4701fa
2 changed files with 285 additions and 0 deletions
+158
View File
@@ -181,6 +181,14 @@ internal class MessageStore : IDisposable
private readonly IPlatformUtil _platformUtil;
private readonly IPluginLogProxy _logger;
// Readiness gate for the FTS5 full-text index. Volatile so the DbViewer's
// per-frame IsFtsIndexBuilt read sees the flip the moment the bulk-insert
// worker calls MarkFtsIndexBuilt(). Set in the ctor by InitFtsReadyCache:
// true when the index already has rows (no rebuild needed) or when the
// messages table itself is empty (nothing to index yet); false otherwise.
private volatile bool _ftsReady;
public bool IsFtsIndexBuilt => _ftsReady;
internal MessageStore(string dbPath, IPlatformUtil platformUtil, IPluginLogProxy logger)
{
DbPath = dbPath;
@@ -188,6 +196,7 @@ internal class MessageStore : IDisposable
_logger = logger;
Connection = Connect();
Migrate();
InitFtsReadyCache();
}
public void Dispose()
@@ -547,6 +556,155 @@ internal class MessageStore : IDisposable
return (long)(cmd.ExecuteScalar() ?? 0L) > 0;
}
// Decides whether the FTS index already covers the messages table. Called
// once after Migrate -- empty messages-table is "ready" because there is
// nothing to index yet; a populated fts-table is "ready" because some
// previous run filled it. A populated messages-table with an empty
// fts-table is the "needs rebuild" case the worker (Plugin.cs LoadAsync)
// picks up.
internal void InitFtsReadyCache()
{
using (var cmd = Connection.CreateCommand())
{
cmd.CommandText = "SELECT count(*) FROM messages_fts;";
var ftsRows = (long)(cmd.ExecuteScalar() ?? 0L);
if (ftsRows > 0)
{
_ftsReady = true;
return;
}
}
using var cmd2 = Connection.CreateCommand();
cmd2.CommandText = "SELECT count(*) FROM messages;";
var messageRows = (long)(cmd2.ExecuteScalar() ?? 0L);
_ftsReady = messageRows == 0;
}
// Opens a worker-owned SqliteConnection on the same db path. Used by the
// FTS rebuild worker so the bulk-insert writer stream does not contend
// with the live UpsertMessage path on the primary Connection (WAL allows
// N readers + 1 writer; two writer sessions on the same connection are
// not safe per Microsoft.Data.Sqlite). Caller closes+disposes the
// returned connection and only then calls MarkFtsIndexBuilt() -- the
// DbViewer never sees IsFtsIndexBuilt=true while the worker connection
// is still alive.
internal SqliteConnection OpenSecondaryConnection()
{
var conn = new SqliteConnection(BuildConnectionString(DbPath));
conn.Open();
ApplyPragmas(conn);
return conn;
}
// Worker-only mutator. The bulk-insert worker is the single legitimate
// caller; the flag flips after the worker has closed its own connection.
internal void MarkFtsIndexBuilt() => _ftsReady = true;
// Builds the FTS5 index from scratch on a worker-owned SqliteConnection.
// Chunked-commit (every 500 rows + 5ms sleep) releases the WAL writer
// lock between transactions so the live PendingMessageThread UpsertMessage
// path on the primary Connection does not hit "database is locked" after
// DefaultTimeout=5s. The Thread.Sleep is intentional: it gives the live
// writer a deterministic window to acquire the lock before we re-take
// it for the next chunk.
//
// Cancellation: checked at the top of each row and again after each
// chunk commit, so a Dispose-during-rebuild collapses on the next row
// without trashing the half-built index (DELETE FROM messages_fts at
// the start makes the next run idempotent).
public long RebuildFtsIndex(
SqliteConnection conn,
IProgress<long> progress,
CancellationToken ct
)
{
const int ChunkSize = 500;
using (var clear = conn.CreateCommand())
{
clear.CommandText = "DELETE FROM messages_fts;";
clear.ExecuteNonQuery();
}
long total;
using (var totalCmd = conn.CreateCommand())
{
totalCmd.CommandText = "SELECT count(*) FROM messages;";
total = (long)(totalCmd.ExecuteScalar() ?? 0L);
}
long done = 0;
using var cmd = conn.CreateCommand();
cmd.CommandText = "SELECT Id, Sender, Content FROM messages ORDER BY Id;";
using var reader = cmd.ExecuteReader();
using var insert = conn.CreateCommand();
insert.CommandText =
"INSERT INTO messages_fts(message_guid, sender_text, content_text) VALUES ($g, $s, $c);";
var pG = insert.CreateParameter();
pG.ParameterName = "$g";
insert.Parameters.Add(pG);
var pS = insert.CreateParameter();
pS.ParameterName = "$s";
insert.Parameters.Add(pS);
var pC = insert.CreateParameter();
pC.ParameterName = "$c";
insert.Parameters.Add(pC);
// Nullable so the finally can dispose exactly once whether the loop
// ends normally, via cancellation between Dispose and BeginTransaction,
// or via an exception in the body.
SqliteTransaction? transaction = conn.BeginTransaction();
insert.Transaction = transaction;
try
{
while (reader.Read())
{
ct.ThrowIfCancellationRequested();
var guidBytes = (byte[])reader.GetValue(0);
var senderChunks = MessagePackSerializer.Deserialize<List<Chunk>>(
reader.GetFieldValue<byte[]>(1),
MsgPackOptions
);
var contentChunks = MessagePackSerializer.Deserialize<List<Chunk>>(
reader.GetFieldValue<byte[]>(2),
MsgPackOptions
);
pG.Value = Convert.ToHexString(guidBytes);
pS.Value = ChunkUtil.ToRawString(senderChunks);
pC.Value = ChunkUtil.ToRawString(contentChunks);
insert.ExecuteNonQuery();
done++;
if (done % ChunkSize == 0)
{
transaction.Commit();
transaction.Dispose();
transaction = null;
progress.Report(done);
Thread.Sleep(5);
ct.ThrowIfCancellationRequested();
transaction = conn.BeginTransaction();
insert.Transaction = transaction;
}
}
transaction?.Commit();
}
finally
{
transaction?.Dispose();
}
progress.Report(done);
return total;
}
internal void UpsertMessage(Message message)
{
// Privacy filter -- drop disallowed ChatTypes before they reach storage.
+127
View File
@@ -14,6 +14,7 @@ using HellionChat.Ipc;
using HellionChat.Resources;
using HellionChat.Ui;
using HellionChat.Util;
using Microsoft.Data.Sqlite;
namespace HellionChat;
@@ -127,6 +128,12 @@ public sealed class Plugin : IAsyncDalamudPlugin
internal int DeferredSaveFrames = -1;
// Cancels the v1.4.8 FTS5 bulk-insert worker on plugin teardown. The
// worker runs off the framework thread on its own SqliteConnection, so a
// Dispose mid-rebuild must signal cancellation before MessageManager
// tears down (the worker logs "rebuild failed" via Log on error paths).
private CancellationTokenSource? _ftsRebuildCts;
// Serialises retention sweeps so a manual trigger and the 24h auto-sweep
// can't run in parallel. Volatile because the ImGui thread reads it outside
// the lock to gate the manual button.
@@ -282,6 +289,113 @@ public sealed class Plugin : IAsyncDalamudPlugin
if (Interface.Reason is not PluginLoadReason.Boot)
MessageManager.FilterAllTabsAsync();
// Kick the FTS5 rebuild worker if Migrate4 just added the schema or
// a previous run was cut short (InitFtsReadyCache leaves _ftsReady
// false in that case). Runs off the framework thread on its own
// SqliteConnection so the live UpsertMessage path keeps flowing
// through the chunked-commit windows.
_ftsRebuildCts = new CancellationTokenSource();
if (!MessageManager.Store.IsFtsIndexBuilt)
{
var token = _ftsRebuildCts.Token;
_ = Task.Run(
async () =>
{
// FQN: Plugin.Notification (Z.74) shadows the type name.
Dalamud.Interface.ImGuiNotification.IActiveNotification? notif = null;
try
{
notif = Notification.AddNotification(
new Dalamud.Interface.ImGuiNotification.Notification
{
Title = "Hellion Chat",
Content = "Indexing chat history for full-text search...",
Type = Dalamud
.Interface
.ImGuiNotification
.NotificationType
.Info,
Minimized = false,
InitialDuration = TimeSpan.FromMinutes(10),
}
);
// Progress<T> raises this callback on the captured
// sync-context (Task.Run worker pool). IActiveNotification
// is ImGui-backed and mutates the UI, so marshal the
// mutation onto the framework thread via RunOnTick.
var progress = new Progress<long>(done =>
{
Framework.RunOnTick(() =>
{
if (notif is { } n)
n.Content = $"Indexing chat history: {done:N0} messages...";
});
});
// Worker-owned connection. Closed+disposed before we
// flip the readiness flag so the DbViewer never sees
// IsFtsIndexBuilt=true while the worker connection
// is still alive.
SqliteConnection? workerConn = null;
try
{
workerConn = MessageManager.Store.OpenSecondaryConnection();
var total = await Task.Run(
() =>
MessageManager.Store.RebuildFtsIndex(
workerConn,
progress,
token
),
token
)
.ConfigureAwait(false);
workerConn.Close();
workerConn.Dispose();
workerConn = null;
MessageManager.Store.MarkFtsIndexBuilt();
if (notif is { } final)
{
final.Content = $"Indexed {total:N0} messages.";
final.Type = Dalamud
.Interface
.ImGuiNotification
.NotificationType
.Success;
final.InitialDuration = TimeSpan.FromSeconds(5);
}
}
finally
{
workerConn?.Dispose();
}
}
catch (OperationCanceledException)
{
notif?.DismissNow();
}
catch (Exception ex)
{
Log.Error(ex, "FTS index rebuild failed");
if (notif is { } err)
{
err.Content =
"Full-text indexing failed -- search will use local filter only.";
err.Type = Dalamud
.Interface
.ImGuiNotification
.NotificationType
.Error;
}
}
},
_ftsRebuildCts.Token
);
}
Interface.UiBuilder.DisableCutsceneUiHide = true;
Interface.UiBuilder.DisableGposeUiHide = true;
@@ -328,6 +442,19 @@ public sealed class Plugin : IAsyncDalamudPlugin
failure = CaptureFailure(failure, () => Interface.UiBuilder.Draw -= Draw);
failure = CaptureFailure(failure, () => Framework.Update -= FrameworkUpdate);
// Signal the FTS rebuild worker to bail. Runs before MessageManager
// tears down so the worker's "rebuild failed" log path still finds
// a live Log static. Worker owns its own SqliteConnection and disposes
// it itself; we only flip the cancellation flag here.
failure = CaptureFailure(
failure,
() =>
{
_ftsRebuildCts?.Cancel();
_ftsRebuildCts?.Dispose();
}
);
// Flush a pending DeferredSave — FrameworkUpdate won't fire it anymore.
failure = CaptureFailure(
failure,