d0be75e79d
MessageStore, MessageEnumerator, MessageManager, AutoTellTabsService
move from Plugin.LogProxy / IPluginLogProxy onto
Microsoft.Extensions.Logging.ILogger<T> via constructor injection.
MessageStore additionally takes ILoggerFactory so it can build a
per-instance ILogger<MessageEnumerator> at each of the five reader-
spawning sites; the enumerator is not a container singleton.
PluginHostFactory's MessageManager and AutoTellTabsService factory
lambdas grow to resolve the new logger args; everything else stays in
place.
Site-level migration in the four files:
- MessageStore: 12 calls, _logger field IPluginLogProxy -> ILogger<MessageStore>
- MessageManager: 7 Plugin.LogProxy.* sites, new _logger field
- AutoTellTabsService: 9 Plugin.LogProxy.* sites, new _logger field
Plus a pre-existing template bug surfaced by CA2017: a LogDebug call
in AutoTellTabsService used "{tab.Name}" with no `$` prefix, which
landed in xllog as literal text under Plugin.LogProxy; ILogger now
reads that as a structured placeholder, so the call was promoted to
proper structured logging with tab.Name passed as a parameter.
1309 lines
47 KiB
C#
1309 lines
47 KiB
C#
using System.Buffers;
|
|
using System.Collections;
|
|
using System.Data.Common;
|
|
using Dalamud.Game.Text.SeStringHandling;
|
|
using HellionChat.Code;
|
|
using HellionChat.Ui;
|
|
using HellionChat.Util;
|
|
using MessagePack;
|
|
using MessagePack.Formatters;
|
|
using MessagePack.Resolvers;
|
|
using Microsoft.Data.Sqlite;
|
|
using Microsoft.Extensions.Logging;
|
|
using Encoding = System.Text.Encoding;
|
|
|
|
namespace HellionChat;
|
|
|
|
internal static class DbExtensions
|
|
{
|
|
internal static void Execute(this DbConnection conn, string sql)
|
|
{
|
|
using var cmd = conn.CreateCommand();
|
|
cmd.CommandText = sql;
|
|
cmd.ExecuteNonQuery();
|
|
}
|
|
}
|
|
|
|
internal enum PayloadMessagePackType : byte
|
|
{
|
|
Achievement,
|
|
PartyFinder,
|
|
Uri,
|
|
Emote,
|
|
Other = 255,
|
|
}
|
|
|
|
public class PayloadMessagePackFormatter : IMessagePackFormatter<Payload?>
|
|
{
|
|
public void Serialize(
|
|
ref MessagePackWriter writer,
|
|
Payload? value,
|
|
MessagePackSerializerOptions options
|
|
)
|
|
{
|
|
if (value == null)
|
|
{
|
|
writer.WriteNil();
|
|
return;
|
|
}
|
|
|
|
writer.WriteArrayHeader(2);
|
|
switch (value)
|
|
{
|
|
case AchievementPayload achievementPayload:
|
|
writer.WriteUInt8((byte)PayloadMessagePackType.Achievement);
|
|
writer.WriteUInt32(achievementPayload.Id);
|
|
break;
|
|
case PartyFinderPayload partyFinderPayload:
|
|
writer.WriteUInt8((byte)PayloadMessagePackType.PartyFinder);
|
|
writer.WriteUInt32(partyFinderPayload.Id);
|
|
break;
|
|
case UriPayload uriPayload:
|
|
writer.WriteUInt8((byte)PayloadMessagePackType.Uri);
|
|
writer.WriteString(Encoding.UTF8.GetBytes(uriPayload.Uri.ToString()));
|
|
break;
|
|
case EmotePayload emotePayload:
|
|
writer.WriteUInt8((byte)PayloadMessagePackType.Emote);
|
|
writer.WriteString(Encoding.UTF8.GetBytes(emotePayload.Code));
|
|
break;
|
|
default:
|
|
writer.WriteUInt8((byte)PayloadMessagePackType.Other);
|
|
writer.Write(value.Encode());
|
|
break;
|
|
}
|
|
}
|
|
|
|
public Payload? Deserialize(ref MessagePackReader reader, MessagePackSerializerOptions options)
|
|
{
|
|
if (reader.TryReadNil())
|
|
return null;
|
|
|
|
if (reader.ReadArrayHeader() != 2)
|
|
throw new InvalidOperationException("Invalid array count for Payload object");
|
|
|
|
var type = (PayloadMessagePackType)reader.ReadByte();
|
|
switch (type)
|
|
{
|
|
case PayloadMessagePackType.Achievement:
|
|
return new AchievementPayload(reader.ReadUInt32());
|
|
case PayloadMessagePackType.PartyFinder:
|
|
return new PartyFinderPayload(reader.ReadUInt32());
|
|
case PayloadMessagePackType.Uri:
|
|
return new UriPayload(new Uri(reader.ReadString() ?? ""));
|
|
case PayloadMessagePackType.Emote:
|
|
return EmotePayload.ResolveEmote(reader.ReadString() ?? "");
|
|
case PayloadMessagePackType.Other:
|
|
default:
|
|
var bytes = reader.ReadBytes() ?? new ReadOnlySequence<byte>();
|
|
var binReader = new BinaryReader(new MemoryStream(bytes.ToArray()));
|
|
return Payload.Decode(binReader);
|
|
}
|
|
}
|
|
}
|
|
|
|
public class SeStringMessagePackFormatter : IMessagePackFormatter<SeString?>
|
|
{
|
|
public void Serialize(
|
|
ref MessagePackWriter writer,
|
|
SeString? value,
|
|
MessagePackSerializerOptions options
|
|
)
|
|
{
|
|
options
|
|
.Resolver.GetFormatter<List<Payload>>()!
|
|
.Serialize(ref writer, value?.Payloads ?? [], options);
|
|
}
|
|
|
|
public SeString Deserialize(ref MessagePackReader reader, MessagePackSerializerOptions options)
|
|
{
|
|
return new SeString(
|
|
options.Resolver.GetFormatter<List<Payload>>()!.Deserialize(ref reader, options)
|
|
);
|
|
}
|
|
}
|
|
|
|
internal class MessageStore : IDisposable
|
|
{
|
|
private const int MessageQueryLimit = 10_000;
|
|
|
|
private string DbPath { get; }
|
|
|
|
// Internal so the Build-Suite tests can verify Migrate4's CREATE VIRTUAL
|
|
// TABLE result via a one-off PRAGMA without exposing a dedicated helper
|
|
// for each schema invariant. Setter stays private; the ctor is the only
|
|
// place that assigns.
|
|
internal SqliteConnection Connection { get; private set; }
|
|
|
|
internal static readonly MessagePackSerializerOptions MsgPackOptions =
|
|
MessagePackSerializerOptions.Standard.WithResolver(
|
|
CompositeResolver.Create(
|
|
[new PayloadMessagePackFormatter(), new SeStringMessagePackFormatter()],
|
|
[StandardResolver.Instance]
|
|
)
|
|
);
|
|
|
|
// Pure deserialisation of one messages-row at the reader's current position.
|
|
// Shared between the MessageEnumerator load path and the upcoming v1.4.8
|
|
// LoadByGuids FTS-join path so both stay in lockstep when the column layout
|
|
// moves. Throws on row-level errors; the caller decides whether to skip+log
|
|
// (enumerator) or fail-fast (bulk lookup).
|
|
internal static Message ReadMessageRow(DbDataReader reader)
|
|
{
|
|
return new Message(
|
|
reader.GetGuid(0),
|
|
(ulong)reader.GetInt64(1),
|
|
(ulong)reader.GetInt64(2),
|
|
DateTimeOffset.FromUnixTimeMilliseconds(reader.GetInt64(3)),
|
|
new ChatCode(
|
|
(byte)reader.GetInt32(4),
|
|
(byte)reader.GetInt32(5),
|
|
(byte)reader.GetInt32(6)
|
|
),
|
|
MessagePackSerializer.Deserialize<List<Chunk>>(
|
|
reader.GetFieldValue<byte[]>(7),
|
|
MsgPackOptions
|
|
),
|
|
MessagePackSerializer.Deserialize<List<Chunk>>(
|
|
reader.GetFieldValue<byte[]>(8),
|
|
MsgPackOptions
|
|
),
|
|
MessagePackSerializer.Deserialize<SeString>(
|
|
reader.GetFieldValue<byte[]>(9),
|
|
MsgPackOptions
|
|
),
|
|
MessagePackSerializer.Deserialize<SeString>(
|
|
reader.GetFieldValue<byte[]>(10),
|
|
MsgPackOptions
|
|
),
|
|
reader.GetGuid(11)
|
|
);
|
|
}
|
|
|
|
private readonly IPlatformUtil _platformUtil;
|
|
private readonly ILogger<MessageStore> _logger;
|
|
private readonly ILoggerFactory _loggerFactory;
|
|
|
|
// Readiness gate for the FTS5 full-text index. Volatile so the DbViewer's
|
|
// per-frame IsFtsIndexBuilt read sees the flip the moment the bulk-insert
|
|
// worker calls MarkFtsIndexBuilt(). Set in the ctor by InitFtsReadyCache:
|
|
// true when the index already has rows (no rebuild needed) or when the
|
|
// messages table itself is empty (nothing to index yet); false otherwise.
|
|
private volatile bool _ftsReady;
|
|
public bool IsFtsIndexBuilt => _ftsReady;
|
|
|
|
// Serialises read/write access to the primary Connection so the DbViewer
|
|
// filter-worker (Task.Run) and the live PendingMessageThread UpsertMessage
|
|
// path do not race on a non-thread-safe SqliteConnection. Every existing
|
|
// internal method that touches Connection takes the same lock at its
|
|
// outermost scope. RebuildFtsIndex stays outside the lock -- it owns its
|
|
// own SqliteConnection via OpenSecondaryConnection.
|
|
private readonly object _readLock = new();
|
|
|
|
internal MessageStore(
|
|
string dbPath,
|
|
IPlatformUtil platformUtil,
|
|
ILogger<MessageStore> logger,
|
|
ILoggerFactory loggerFactory
|
|
)
|
|
{
|
|
DbPath = dbPath;
|
|
_platformUtil = platformUtil;
|
|
_logger = logger;
|
|
_loggerFactory = loggerFactory;
|
|
Connection = Connect();
|
|
Migrate();
|
|
InitFtsReadyCache();
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
// Pooling=false avoids ClearAllPools which is provider-wide and
|
|
// would touch other plugins' SQLite connections.
|
|
Connection.Close();
|
|
Connection.Dispose();
|
|
}
|
|
|
|
private static string BuildConnectionString(string dbPath)
|
|
{
|
|
var uriBuilder = new SqliteConnectionStringBuilder
|
|
{
|
|
DataSource = dbPath,
|
|
DefaultTimeout = 5,
|
|
Pooling = false,
|
|
Mode = SqliteOpenMode.ReadWriteCreate,
|
|
};
|
|
return uriBuilder.ToString();
|
|
}
|
|
|
|
private void ApplyPragmas(SqliteConnection conn)
|
|
{
|
|
conn.Execute(@"PRAGMA journal_mode=WAL;");
|
|
conn.Execute(@"PRAGMA synchronous=NORMAL;");
|
|
if (_platformUtil.IsWine)
|
|
conn.Execute(@"PRAGMA cache_size = 32768;");
|
|
}
|
|
|
|
private SqliteConnection Connect()
|
|
{
|
|
// v1.4.9 R3 profiling: trace cost of SQLite open + pragma-apply. Paired
|
|
// with the Migrate-Stopwatch below — Connect alone is the cheap half
|
|
// (Open + a handful of PRAGMAs); the expensive half typically lives in
|
|
// Migrate, especially on a large DB after a schema bump.
|
|
var connectSw = System.Diagnostics.Stopwatch.StartNew();
|
|
var conn = new SqliteConnection(BuildConnectionString(DbPath));
|
|
conn.Open();
|
|
ApplyPragmas(conn);
|
|
connectSw.Stop();
|
|
_logger.LogInformation($"MessageStore.Connect took {connectSw.ElapsedMilliseconds}ms");
|
|
return conn;
|
|
}
|
|
|
|
private void Migrate()
|
|
{
|
|
// v1.4.9 R3 profiling: trace cost of the schema-migration chain. On a
|
|
// large DB after a fresh schema bump this is the dominant SQLite cost
|
|
// at plugin-load, not Connect.
|
|
var migrateSw = System.Diagnostics.Stopwatch.StartNew();
|
|
|
|
using var cmd = Connection.CreateCommand();
|
|
cmd.CommandText = "PRAGMA user_version;";
|
|
var userVersion = Convert.ToInt32(cmd.ExecuteScalar());
|
|
|
|
var migrationsToDo = new List<Action>();
|
|
switch (userVersion)
|
|
{
|
|
case <= 0:
|
|
migrationsToDo.Add(Migrate0);
|
|
// Migration support was only added in version 1. Migrate0 is idempotent.
|
|
migrationsToDo.Add(Migrate1);
|
|
migrationsToDo.Add(Migrate2);
|
|
migrationsToDo.Add(Migrate3);
|
|
migrationsToDo.Add(Migrate4);
|
|
break;
|
|
case 1:
|
|
migrationsToDo.Add(Migrate2);
|
|
migrationsToDo.Add(Migrate3);
|
|
migrationsToDo.Add(Migrate4);
|
|
break;
|
|
case 2:
|
|
migrationsToDo.Add(Migrate3);
|
|
migrationsToDo.Add(Migrate4);
|
|
break;
|
|
case 3:
|
|
migrationsToDo.Add(Migrate4);
|
|
break;
|
|
}
|
|
|
|
foreach (var migration in migrationsToDo)
|
|
migration();
|
|
|
|
migrateSw.Stop();
|
|
_logger.LogInformation($"MessageStore.Migrate took {migrateSw.ElapsedMilliseconds}ms");
|
|
}
|
|
|
|
private void Migrate0()
|
|
{
|
|
_logger.LogInformation("Running migration 0: Creating tables");
|
|
Connection.Execute(
|
|
@"
|
|
CREATE TABLE IF NOT EXISTS messages (
|
|
Id BLOB PRIMARY KEY NOT NULL, -- Guid
|
|
Receiver INTEGER NOT NULL, -- uint64 (first bits are always 0)
|
|
ContentId INTEGER NOT NULL, -- uint64 (first bits are always 0)
|
|
Date INTEGER NOT NULL, -- unix timestamp with millisecond precision
|
|
Code INTEGER NOT NULL, -- ChatCode encoding
|
|
Sender BLOB NOT NULL, -- Chunk[] msgpack
|
|
Content BLOB NOT NULL, -- Chunk[] msgpack
|
|
SenderSource BLOB NOT NULL, -- SeString
|
|
ContentSource BLOB NOT NULL, -- SeString
|
|
SortCode INTEGER NOT NULL, -- SortCode encoding
|
|
ExtraChatChannel BLOB NOT NULL -- Guid
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_messages_receiver ON messages (Receiver);
|
|
CREATE INDEX IF NOT EXISTS idx_messages_date ON messages (Date);
|
|
"
|
|
);
|
|
|
|
SetMigrationVersion(0);
|
|
}
|
|
|
|
private void Migrate1()
|
|
{
|
|
_logger.LogInformation("Running migration 1: Adding Deleted column");
|
|
Connection.Execute(
|
|
@"
|
|
ALTER TABLE messages ADD COLUMN Deleted BOOLEAN NOT NULL DEFAULT false;
|
|
"
|
|
);
|
|
|
|
SetMigrationVersion(1);
|
|
}
|
|
|
|
private void Migrate2()
|
|
{
|
|
_logger.LogInformation("Running migration 2: Adding Channel generated column");
|
|
Connection.Execute(
|
|
@"
|
|
ALTER TABLE messages ADD COLUMN Channel INTEGER GENERATED ALWAYS AS (Code & 0x7f) VIRTUAL;
|
|
CREATE INDEX IF NOT EXISTS idx_messages_channel ON messages (Channel);
|
|
"
|
|
);
|
|
|
|
SetMigrationVersion(2);
|
|
}
|
|
|
|
private bool ColumnExists(string table, string column)
|
|
{
|
|
// PRAGMA does not accept SQLite parameter bindings. Table name is a
|
|
// compile-time constant from internal call sites only.
|
|
using var cmd = Connection.CreateCommand();
|
|
cmd.CommandText = $"PRAGMA table_info({table});";
|
|
using var reader = cmd.ExecuteReader();
|
|
while (reader.Read())
|
|
{
|
|
if (reader.GetString(1) == column)
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
private void Migrate3()
|
|
{
|
|
_logger.LogInformation("Running migration 3: Fix log kinds to fit the new format");
|
|
|
|
// Recovery for partially-applied Migrate3: schema already in target
|
|
// shape but user_version was never bumped -- just record and exit.
|
|
if (ColumnExists("messages", "ChatType") && !ColumnExists("messages", "Code"))
|
|
{
|
|
_logger.LogInformation(
|
|
"Migration 3: schema already migrated, only bumping user_version"
|
|
);
|
|
SetMigrationVersion(3);
|
|
return;
|
|
}
|
|
|
|
Connection.Execute(
|
|
@"
|
|
ALTER TABLE messages ADD COLUMN ChatType INTEGER;
|
|
CREATE INDEX IF NOT EXISTS idx_messages_chat_type ON messages (ChatType);
|
|
ALTER TABLE messages ADD COLUMN SourceKind INTEGER;
|
|
ALTER TABLE messages ADD COLUMN TargetKind INTEGER;
|
|
|
|
UPDATE messages SET
|
|
ChatType = Code & 0x7f,
|
|
SourceKind = trunc(log2(1 << ((Code >> 11) & 0xF))),
|
|
TargetKind = trunc(log2(1 << ((Code >> 7) & 0xF)))
|
|
WHERE true;
|
|
|
|
DROP INDEX idx_messages_channel;
|
|
ALTER TABLE messages DROP COLUMN Channel;
|
|
ALTER TABLE messages DROP COLUMN Code;
|
|
ALTER TABLE messages DROP COLUMN SortCode;
|
|
"
|
|
);
|
|
|
|
SetMigrationVersion(3);
|
|
}
|
|
|
|
private void Migrate4()
|
|
{
|
|
_logger.LogInformation("Running migration 4: Add FTS5 virtual table for full-text search");
|
|
|
|
// Standalone FTS5 table (no content='messages' linking, no content_rowid).
|
|
// messages.Id is BLOB-PK (Guid), which is incompatible with FTS5's
|
|
// content_rowid requirement of an INTEGER rowid alias. We store the
|
|
// GUID as a hex TEXT column (UNINDEXED so the tokenizer skips it) and
|
|
// FTS5 manages its own internal INTEGER rowid. LoadByGuids joins back
|
|
// via WHERE Id IN (... unhex(message_guid)) when the search returns.
|
|
using var cmd = Connection.CreateCommand();
|
|
cmd.CommandText = """
|
|
CREATE VIRTUAL TABLE IF NOT EXISTS messages_fts USING fts5(
|
|
message_guid UNINDEXED,
|
|
sender_text,
|
|
content_text,
|
|
tokenize='unicode61 remove_diacritics 2'
|
|
);
|
|
""";
|
|
cmd.ExecuteNonQuery();
|
|
|
|
SetMigrationVersion(4);
|
|
}
|
|
|
|
private void SetMigrationVersion(int version)
|
|
{
|
|
_logger.LogInformation($"Setting version {version}");
|
|
using var cmd = Connection.CreateCommand();
|
|
// PRAGMA does not accept SQLite parameter bindings; version is a
|
|
// compile-time int from the migration sequence, never user input.
|
|
cmd.CommandText = $"PRAGMA user_version = {version};";
|
|
cmd.ExecuteNonQuery();
|
|
}
|
|
|
|
internal void ClearMessages()
|
|
{
|
|
lock (_readLock)
|
|
{
|
|
Connection.Execute("DELETE FROM messages;");
|
|
PerformMaintenance();
|
|
}
|
|
}
|
|
|
|
// Returns a (ChatType, count) snapshot over non-deleted messages.
|
|
// Used by the Privacy tab to preview retroactive cleanup impact.
|
|
internal Dictionary<int, long> GetMessageCountsByChatType()
|
|
{
|
|
lock (_readLock)
|
|
{
|
|
var result = new Dictionary<int, long>();
|
|
using var cmd = Connection.CreateCommand();
|
|
cmd.CommandText =
|
|
"SELECT ChatType, COUNT(*) FROM messages WHERE deleted = false GROUP BY ChatType;";
|
|
cmd.CommandTimeout = 120;
|
|
using var reader = cmd.ExecuteReader();
|
|
while (reader.Read())
|
|
{
|
|
var chatType = reader.GetInt32(0);
|
|
var count = reader.GetInt64(1);
|
|
result[chatType] = count;
|
|
}
|
|
return result;
|
|
}
|
|
}
|
|
|
|
// Deletes messages older than the per-channel retention window, with a global
|
|
// default for unmapped channels. Runs VACUUM only if rows were removed.
|
|
// Returns the number of rows deleted.
|
|
internal long DeleteByRetentionPolicy(
|
|
IReadOnlyDictionary<int, int> chatTypeDaysMap,
|
|
int defaultDays
|
|
)
|
|
{
|
|
if (defaultDays < 0)
|
|
throw new ArgumentOutOfRangeException(
|
|
nameof(defaultDays),
|
|
"Negative retention is not allowed."
|
|
);
|
|
foreach (var (_, days) in chatTypeDaysMap)
|
|
if (days < 0)
|
|
throw new ArgumentOutOfRangeException(
|
|
nameof(chatTypeDaysMap),
|
|
"Negative retention is not allowed."
|
|
);
|
|
|
|
var nowMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
|
|
|
|
if (chatTypeDaysMap.Count == 0 && defaultDays <= 0)
|
|
return 0;
|
|
|
|
lock (_readLock)
|
|
{
|
|
long deleted;
|
|
using (var cmd = Connection.CreateCommand())
|
|
{
|
|
var clauses = new List<string>();
|
|
var index = 0;
|
|
foreach (var (type, days) in chatTypeDaysMap)
|
|
{
|
|
var cutoff = nowMs - days * 86400000L;
|
|
var typeParam = $"$type{index}";
|
|
var cutoffParam = $"$cutoff{index}";
|
|
cmd.Parameters.AddWithValue(typeParam, type);
|
|
cmd.Parameters.AddWithValue(cutoffParam, cutoff);
|
|
clauses.Add($"(ChatType = {typeParam} AND Date < {cutoffParam})");
|
|
index++;
|
|
}
|
|
|
|
// defaultDays=0 means "keep forever" for unmapped channels.
|
|
if (defaultDays > 0)
|
|
{
|
|
var defaultCutoff = nowMs - defaultDays * 86400000L;
|
|
cmd.Parameters.AddWithValue("$defaultCutoff", defaultCutoff);
|
|
|
|
var explicitPlaceholders =
|
|
chatTypeDaysMap.Count > 0
|
|
? BindIntList(cmd, "explicit", chatTypeDaysMap.Keys)
|
|
: "-1"; // empty list would produce invalid SQL
|
|
clauses.Add(
|
|
$"(ChatType NOT IN ({explicitPlaceholders}) AND Date < $defaultCutoff)"
|
|
);
|
|
}
|
|
|
|
if (clauses.Count == 0)
|
|
return 0;
|
|
|
|
cmd.CommandText = $"DELETE FROM messages WHERE {string.Join(" OR ", clauses)};";
|
|
cmd.CommandTimeout = 600;
|
|
deleted = cmd.ExecuteNonQuery();
|
|
}
|
|
|
|
if (deleted > 0)
|
|
PerformMaintenance();
|
|
return deleted;
|
|
}
|
|
}
|
|
|
|
// Hard-deletes every message whose ChatType is not in the allowlist,
|
|
// then VACUUMs. Returns the number of rows deleted.
|
|
internal long CleanupRetainOnly(IReadOnlyCollection<int> allowedTypes)
|
|
{
|
|
if (allowedTypes.Count == 0)
|
|
throw new InvalidOperationException(
|
|
"CleanupRetainOnly requires at least one allowed ChatType. Use ClearMessages for a full wipe."
|
|
);
|
|
|
|
lock (_readLock)
|
|
{
|
|
long deleted;
|
|
using (var cmd = Connection.CreateCommand())
|
|
{
|
|
var placeholders = BindIntList(cmd, "ct", allowedTypes);
|
|
cmd.CommandText = $"DELETE FROM messages WHERE ChatType NOT IN ({placeholders});";
|
|
cmd.CommandTimeout = 600;
|
|
deleted = cmd.ExecuteNonQuery();
|
|
}
|
|
PerformMaintenance();
|
|
return deleted;
|
|
}
|
|
}
|
|
|
|
internal void PerformMaintenance()
|
|
{
|
|
lock (_readLock)
|
|
{
|
|
Connection.Execute(
|
|
@"
|
|
VACUUM;
|
|
REINDEX messages;
|
|
ANALYZE;
|
|
"
|
|
);
|
|
}
|
|
}
|
|
|
|
private string LogPath => DbPath + "-wal";
|
|
|
|
internal long DatabaseSize() => !File.Exists(DbPath) ? 0 : new FileInfo(DbPath).Length;
|
|
|
|
internal long DatabaseLogSize() => !File.Exists(LogPath) ? 0 : new FileInfo(LogPath).Length;
|
|
|
|
internal int MessageCount()
|
|
{
|
|
lock (_readLock)
|
|
{
|
|
using var cmd = Connection.CreateCommand();
|
|
cmd.CommandText = "SELECT COUNT(*) FROM messages;";
|
|
return Convert.ToInt32(cmd.ExecuteScalar());
|
|
}
|
|
}
|
|
|
|
// Schema probe for the v1.4.8 FTS5 virtual table. Used by the Build-Suite
|
|
// tests to verify Migrate4's CREATE VIRTUAL TABLE actually landed without
|
|
// duplicating PRAGMA glue in each test body.
|
|
internal bool HasMessagesFtsTable()
|
|
{
|
|
using var cmd = Connection.CreateCommand();
|
|
cmd.CommandText = "SELECT count(*) FROM sqlite_master WHERE name='messages_fts';";
|
|
return (long)(cmd.ExecuteScalar() ?? 0L) > 0;
|
|
}
|
|
|
|
// Decides whether the FTS index already covers the messages table. Called
|
|
// once after Migrate -- empty messages-table is "ready" because there is
|
|
// nothing to index yet; a populated fts-table is "ready" because some
|
|
// previous run filled it. A populated messages-table with an empty
|
|
// fts-table is the "needs rebuild" case the worker (Plugin.cs LoadAsync)
|
|
// picks up.
|
|
internal void InitFtsReadyCache()
|
|
{
|
|
using (var cmd = Connection.CreateCommand())
|
|
{
|
|
cmd.CommandText = "SELECT count(*) FROM messages_fts;";
|
|
var ftsRows = (long)(cmd.ExecuteScalar() ?? 0L);
|
|
if (ftsRows > 0)
|
|
{
|
|
_ftsReady = true;
|
|
return;
|
|
}
|
|
}
|
|
|
|
using var cmd2 = Connection.CreateCommand();
|
|
cmd2.CommandText = "SELECT count(*) FROM messages;";
|
|
var messageRows = (long)(cmd2.ExecuteScalar() ?? 0L);
|
|
_ftsReady = messageRows == 0;
|
|
}
|
|
|
|
// Opens a worker-owned SqliteConnection on the same db path. Used by the
|
|
// FTS rebuild worker so the bulk-insert writer stream does not contend
|
|
// with the live UpsertMessage path on the primary Connection (WAL allows
|
|
// N readers + 1 writer; two writer sessions on the same connection are
|
|
// not safe per Microsoft.Data.Sqlite). Caller closes+disposes the
|
|
// returned connection and only then calls MarkFtsIndexBuilt() -- the
|
|
// DbViewer never sees IsFtsIndexBuilt=true while the worker connection
|
|
// is still alive.
|
|
internal SqliteConnection OpenSecondaryConnection()
|
|
{
|
|
var conn = new SqliteConnection(BuildConnectionString(DbPath));
|
|
conn.Open();
|
|
ApplyPragmas(conn);
|
|
return conn;
|
|
}
|
|
|
|
// Worker-only mutator. The bulk-insert worker is the single legitimate
|
|
// caller; the flag flips after the worker has closed its own connection.
|
|
internal void MarkFtsIndexBuilt() => _ftsReady = true;
|
|
|
|
// Builds the FTS5 index from scratch on a worker-owned SqliteConnection.
|
|
// Chunked-commit (every 500 rows + 5ms sleep) releases the WAL writer
|
|
// lock between transactions so the live PendingMessageThread UpsertMessage
|
|
// path on the primary Connection does not hit "database is locked" after
|
|
// DefaultTimeout=5s. The Thread.Sleep is intentional: it gives the live
|
|
// writer a deterministic window to acquire the lock before we re-take
|
|
// it for the next chunk.
|
|
//
|
|
// Cancellation: checked at the top of each row and again after each
|
|
// chunk commit, so a Dispose-during-rebuild collapses on the next row
|
|
// without trashing the half-built index (DELETE FROM messages_fts at
|
|
// the start makes the next run idempotent).
|
|
public long RebuildFtsIndex(
|
|
SqliteConnection conn,
|
|
IProgress<long> progress,
|
|
CancellationToken ct
|
|
)
|
|
{
|
|
const int ChunkSize = 500;
|
|
|
|
using (var clear = conn.CreateCommand())
|
|
{
|
|
clear.CommandText = "DELETE FROM messages_fts;";
|
|
clear.ExecuteNonQuery();
|
|
}
|
|
|
|
long total;
|
|
using (var totalCmd = conn.CreateCommand())
|
|
{
|
|
totalCmd.CommandText = "SELECT count(*) FROM messages;";
|
|
total = (long)(totalCmd.ExecuteScalar() ?? 0L);
|
|
}
|
|
|
|
long done = 0;
|
|
|
|
using var cmd = conn.CreateCommand();
|
|
cmd.CommandText = "SELECT Id, Sender, Content FROM messages ORDER BY Id;";
|
|
using var reader = cmd.ExecuteReader();
|
|
|
|
using var insert = conn.CreateCommand();
|
|
insert.CommandText =
|
|
"INSERT INTO messages_fts(message_guid, sender_text, content_text) VALUES ($g, $s, $c);";
|
|
var pG = insert.CreateParameter();
|
|
pG.ParameterName = "$g";
|
|
insert.Parameters.Add(pG);
|
|
var pS = insert.CreateParameter();
|
|
pS.ParameterName = "$s";
|
|
insert.Parameters.Add(pS);
|
|
var pC = insert.CreateParameter();
|
|
pC.ParameterName = "$c";
|
|
insert.Parameters.Add(pC);
|
|
|
|
// Nullable so the finally can dispose exactly once whether the loop
|
|
// ends normally, via cancellation between Dispose and BeginTransaction,
|
|
// or via an exception in the body.
|
|
SqliteTransaction? transaction = conn.BeginTransaction();
|
|
insert.Transaction = transaction;
|
|
try
|
|
{
|
|
while (reader.Read())
|
|
{
|
|
ct.ThrowIfCancellationRequested();
|
|
|
|
// messages.Id is BLOB-typed in the schema but stored as TEXT
|
|
// because Microsoft.Data.Sqlite binds Guid parameters as UUID
|
|
// strings by default (UpsertMessage uses AddWithValue("$Id",
|
|
// message.Id)). reader.GetValue(0) therefore returns string,
|
|
// not byte[]; GetGuid parses the TEXT form regardless.
|
|
var idGuid = reader.GetGuid(0);
|
|
var senderChunks = MessagePackSerializer.Deserialize<List<Chunk>>(
|
|
reader.GetFieldValue<byte[]>(1),
|
|
MsgPackOptions
|
|
);
|
|
var contentChunks = MessagePackSerializer.Deserialize<List<Chunk>>(
|
|
reader.GetFieldValue<byte[]>(2),
|
|
MsgPackOptions
|
|
);
|
|
|
|
pG.Value = idGuid.ToString();
|
|
pS.Value = ChunkUtil.ToRawString(senderChunks);
|
|
pC.Value = ChunkUtil.ToRawString(contentChunks);
|
|
insert.ExecuteNonQuery();
|
|
done++;
|
|
|
|
if (done % ChunkSize == 0)
|
|
{
|
|
transaction.Commit();
|
|
transaction.Dispose();
|
|
transaction = null;
|
|
progress.Report(done);
|
|
|
|
Thread.Sleep(5);
|
|
ct.ThrowIfCancellationRequested();
|
|
|
|
transaction = conn.BeginTransaction();
|
|
insert.Transaction = transaction;
|
|
}
|
|
}
|
|
transaction?.Commit();
|
|
}
|
|
finally
|
|
{
|
|
transaction?.Dispose();
|
|
}
|
|
|
|
progress.Report(done);
|
|
return total;
|
|
}
|
|
|
|
// FTS5 full-text search across the entire messages_fts index. Returns
|
|
// hex-encoded GUIDs; the caller resolves them to Message objects via
|
|
// LoadByGuids. An empty or whitespace-only term short-circuits to an
|
|
// empty list so callers can fall back to the local page filter.
|
|
public IReadOnlyList<string> FullTextSearch(string term, int limit = 1000)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(term))
|
|
return Array.Empty<string>();
|
|
|
|
lock (_readLock)
|
|
{
|
|
var hexIds = new List<string>(capacity: 256);
|
|
using var cmd = Connection.CreateCommand();
|
|
cmd.CommandText = """
|
|
SELECT message_guid FROM messages_fts
|
|
WHERE messages_fts MATCH $term
|
|
ORDER BY rank
|
|
LIMIT $limit;
|
|
""";
|
|
cmd.Parameters.AddWithValue("$term", EscapeFtsTerm(term));
|
|
cmd.Parameters.AddWithValue("$limit", limit);
|
|
|
|
using var reader = cmd.ExecuteReader();
|
|
while (reader.Read())
|
|
hexIds.Add(reader.GetString(0));
|
|
return hexIds;
|
|
}
|
|
}
|
|
|
|
// Joins UUID strings from FullTextSearch back to Message rows. messages.Id
|
|
// is BLOB-declared in the schema but actually stored as TEXT (UUID form)
|
|
// because Microsoft.Data.Sqlite serialises Guid parameters as strings by
|
|
// default. Binding the lookup parameters as Guid keeps the same TEXT
|
|
// storage form on both sides so the IN(...) compare matches. SQLite has a
|
|
// hard parameter limit of 999 in default builds, so we chunk the input --
|
|
// a 1000-hit FTS query never explodes the SELECT. Result ordering is not
|
|
// guaranteed; callers re-sort (e.g. DbViewer sorts by Date descending in
|
|
// Sub-Task 4.4).
|
|
public IReadOnlyList<Message> LoadByGuids(IReadOnlyList<string> guidStrings)
|
|
{
|
|
if (guidStrings.Count == 0)
|
|
return Array.Empty<Message>();
|
|
|
|
lock (_readLock)
|
|
{
|
|
var result = new List<Message>(guidStrings.Count);
|
|
const int chunkSize = 500;
|
|
for (var offset = 0; offset < guidStrings.Count; offset += chunkSize)
|
|
{
|
|
var batch = guidStrings.Skip(offset).Take(chunkSize).ToList();
|
|
using var cmd = Connection.CreateCommand();
|
|
var placeholders = string.Join(",", batch.Select((_, i) => $"$id{i}"));
|
|
cmd.CommandText = $"""
|
|
SELECT Id, Receiver, ContentId, Date, ChatType, SourceKind, TargetKind,
|
|
Sender, Content, SenderSource, ContentSource, ExtraChatChannel
|
|
FROM messages
|
|
WHERE Id IN ({placeholders}) AND Deleted = false;
|
|
""";
|
|
for (var i = 0; i < batch.Count; i++)
|
|
cmd.Parameters.AddWithValue($"$id{i}", Guid.Parse(batch[i]));
|
|
|
|
using var reader = cmd.ExecuteReader();
|
|
while (reader.Read())
|
|
result.Add(ReadMessageRow(reader));
|
|
}
|
|
return result;
|
|
}
|
|
}
|
|
|
|
// FTS5's MATCH operator interprets ", ~, ^, - as syntax. Wrap user terms
|
|
// in double quotes so the search is "what you see is what you get" -- a
|
|
// multi-word query matches as a phrase, not as per-word AND. Power users
|
|
// can opt into raw MATCH syntax by wrapping their own quotes; we detect
|
|
// that and pass the term through unchanged.
|
|
internal static string EscapeFtsTerm(string term)
|
|
{
|
|
if (term.Contains('"'))
|
|
return term;
|
|
return $"\"{term.Replace("\"", "\"\"")}\"";
|
|
}
|
|
|
|
internal void UpsertMessage(Message message)
|
|
{
|
|
// Privacy filter -- drop disallowed ChatTypes before they reach storage.
|
|
if (!Plugin.Config.IsAllowedForStorage(message.Code.Type))
|
|
{
|
|
_logger.LogTrace($"Privacy filter dropped message: ChatType={message.Code.Type}");
|
|
return;
|
|
}
|
|
|
|
lock (_readLock)
|
|
{
|
|
using var cmd = Connection.CreateCommand();
|
|
cmd.CommandText =
|
|
@"
|
|
INSERT INTO messages (
|
|
Id, Receiver, ContentId, Date, ChatType, SourceKind, TargetKind,
|
|
Sender, Content, SenderSource, ContentSource, ExtraChatChannel, Deleted
|
|
) VALUES (
|
|
$Id, $Receiver, $ContentId, $Date, $ChatType, $SourceKind, $TargetKind,
|
|
$Sender, $Content, $SenderSource, $ContentSource, $ExtraChatChannel, false
|
|
)
|
|
ON CONFLICT (id) DO UPDATE SET
|
|
Receiver = excluded.Receiver,
|
|
ContentId = excluded.ContentId,
|
|
Date = excluded.Date,
|
|
ChatType = excluded.ChatType,
|
|
SourceKind = excluded.SourceKind,
|
|
TargetKind = excluded.TargetKind,
|
|
Sender = excluded.Sender,
|
|
Content = excluded.Content,
|
|
SenderSource = excluded.SenderSource,
|
|
ContentSource = excluded.ContentSource,
|
|
ExtraChatChannel = excluded.ExtraChatChannel,
|
|
Deleted = false;
|
|
";
|
|
|
|
cmd.Parameters.AddWithValue("$Id", message.Id);
|
|
cmd.Parameters.AddWithValue("$Receiver", message.Receiver);
|
|
cmd.Parameters.AddWithValue("$ContentId", message.ContentId);
|
|
cmd.Parameters.AddWithValue("$Date", message.Date.ToUnixTimeMilliseconds());
|
|
cmd.Parameters.AddWithValue("$ChatType", message.Code.Type);
|
|
cmd.Parameters.AddWithValue("$SourceKind", message.Code.Source);
|
|
cmd.Parameters.AddWithValue("$TargetKind", message.Code.Target);
|
|
cmd.Parameters.AddWithValue(
|
|
"$Sender",
|
|
MessagePackSerializer.Serialize(message.Sender, MsgPackOptions)
|
|
);
|
|
cmd.Parameters.AddWithValue(
|
|
"$Content",
|
|
MessagePackSerializer.Serialize(message.Content, MsgPackOptions)
|
|
);
|
|
cmd.Parameters.AddWithValue(
|
|
"$SenderSource",
|
|
MessagePackSerializer.Serialize(message.SenderSource, MsgPackOptions)
|
|
);
|
|
cmd.Parameters.AddWithValue(
|
|
"$ContentSource",
|
|
MessagePackSerializer.Serialize(message.ContentSource, MsgPackOptions)
|
|
);
|
|
cmd.Parameters.AddWithValue("$ExtraChatChannel", message.ExtraChatChannel);
|
|
|
|
cmd.ExecuteNonQuery();
|
|
}
|
|
}
|
|
|
|
// Streams messages for export, sorted ascending by Date, excluding soft-deleted rows.
|
|
// Optional filters: chatTypes, from/to inclusive date range.
|
|
// Caller is responsible for disposing the enumerator.
|
|
// Lock caveat: lock guards command setup and ExecuteReader; the returned
|
|
// MessageEnumerator is iterated lazily by the caller outside the lock.
|
|
// Acceptable for v1.4.8 -- DbViewer iterates on its filter-worker Task and
|
|
// any clash with UpsertMessage on the primary Connection is rare and
|
|
// serialised by SQLite's own connection-level lock. v1.5.x DI cycle should
|
|
// address this with a snapshot-to-list or connection pool.
|
|
internal MessageEnumerator StreamForExport(
|
|
IReadOnlyCollection<int>? chatTypes,
|
|
DateTimeOffset? from,
|
|
DateTimeOffset? to
|
|
)
|
|
{
|
|
lock (_readLock)
|
|
{
|
|
var cmd = Connection.CreateCommand();
|
|
|
|
var clauses = new List<string> { "deleted = false" };
|
|
if (chatTypes is { Count: > 0 })
|
|
clauses.Add($"ChatType IN ({BindIntList(cmd, "exct", chatTypes)})");
|
|
if (from is not null)
|
|
clauses.Add("Date >= $From");
|
|
if (to is not null)
|
|
clauses.Add("Date <= $To");
|
|
|
|
cmd.CommandText =
|
|
@"
|
|
SELECT
|
|
Id, Receiver, ContentId, Date, ChatType, SourceKind, TargetKind,
|
|
Sender, Content, SenderSource, ContentSource, ExtraChatChannel
|
|
FROM messages
|
|
WHERE "
|
|
+ string.Join(" AND ", clauses)
|
|
+ @"
|
|
ORDER BY Date ASC;";
|
|
cmd.CommandTimeout = 600;
|
|
|
|
if (from is not null)
|
|
cmd.Parameters.AddWithValue("$From", from.Value.ToUnixTimeMilliseconds());
|
|
if (to is not null)
|
|
cmd.Parameters.AddWithValue("$To", to.Value.ToUnixTimeMilliseconds());
|
|
|
|
return new MessageEnumerator(
|
|
cmd.ExecuteReader(),
|
|
_loggerFactory.CreateLogger<MessageEnumerator>()
|
|
);
|
|
}
|
|
}
|
|
|
|
// Returns the most recent messages, oldest-first.
|
|
// receiver: filter by receiver ContentId (null = no filter)
|
|
// since: only include messages after this date (null = no filter)
|
|
// count: max rows to return, defaults to 10,000
|
|
// Lock caveat: same lazy-enumerator note as StreamForExport.
|
|
internal MessageEnumerator GetMostRecentMessages(
|
|
ulong? receiver = null,
|
|
DateTimeOffset? since = null,
|
|
int count = MessageQueryLimit
|
|
)
|
|
{
|
|
List<string> whereClauses = ["deleted = false"];
|
|
if (receiver != null)
|
|
whereClauses.Add("Receiver = $Receiver");
|
|
if (since != null)
|
|
whereClauses.Add("Date >= $Since");
|
|
|
|
var whereClause = "WHERE " + string.Join(" AND ", whereClauses);
|
|
|
|
lock (_readLock)
|
|
{
|
|
var cmd = Connection.CreateCommand();
|
|
// Select last N by date DESC, then reverse to ascending order.
|
|
cmd.CommandText =
|
|
@"
|
|
SELECT *
|
|
FROM (
|
|
SELECT
|
|
Id, Receiver, ContentId, Date, ChatType, SourceKind, TargetKind,
|
|
Sender, Content, SenderSource, ContentSource, ExtraChatChannel
|
|
FROM messages
|
|
"
|
|
+ whereClause
|
|
+ @"
|
|
ORDER BY Date DESC
|
|
LIMIT $Count
|
|
)
|
|
ORDER BY Date ASC;
|
|
";
|
|
cmd.CommandTimeout = 120;
|
|
|
|
if (receiver != null)
|
|
cmd.Parameters.AddWithValue("$Receiver", receiver);
|
|
if (since != null)
|
|
cmd.Parameters.AddWithValue("$Since", since.Value.ToUnixTimeMilliseconds());
|
|
|
|
cmd.Parameters.AddWithValue("$Count", count);
|
|
|
|
return new MessageEnumerator(
|
|
cmd.ExecuteReader(),
|
|
_loggerFactory.CreateLogger<MessageEnumerator>()
|
|
);
|
|
}
|
|
}
|
|
|
|
// Returns up to `limit` tells exchanged with the named player, oldest-first.
|
|
// SQL narrows by Receiver + ChatType via the (Receiver, Date) index, then
|
|
// the client-side loop runs PlayerPayload comparison and breaks once
|
|
// `limit` partner matches accumulate. Earlier versions had a hardcoded
|
|
// 500-row scan cap that cut less-frequent pinned partners off the back of
|
|
// the window in chatty sessions; removed in v1.4.10.
|
|
internal IReadOnlyList<Message> GetTellHistoryWithSender(
|
|
ulong receiver,
|
|
string senderName,
|
|
uint senderWorld,
|
|
int limit
|
|
)
|
|
{
|
|
if (limit <= 0)
|
|
return [];
|
|
|
|
lock (_readLock)
|
|
{
|
|
using var cmd = Connection.CreateCommand();
|
|
cmd.CommandText =
|
|
@"
|
|
SELECT
|
|
Id, Receiver, ContentId, Date, ChatType, SourceKind, TargetKind,
|
|
Sender, Content, SenderSource, ContentSource, ExtraChatChannel
|
|
FROM messages
|
|
WHERE deleted = false
|
|
AND Receiver = $Receiver
|
|
AND ChatType IN ($TellIncoming, $TellOutgoing)
|
|
ORDER BY Date DESC;
|
|
";
|
|
cmd.CommandTimeout = 60;
|
|
cmd.Parameters.AddWithValue("$Receiver", receiver);
|
|
cmd.Parameters.AddWithValue("$TellIncoming", (int)ChatType.TellIncoming);
|
|
cmd.Parameters.AddWithValue("$TellOutgoing", (int)ChatType.TellOutgoing);
|
|
|
|
var collected = new List<Message>();
|
|
using var enumerator = new MessageEnumerator(
|
|
cmd.ExecuteReader(),
|
|
_loggerFactory.CreateLogger<MessageEnumerator>()
|
|
);
|
|
foreach (var message in enumerator)
|
|
{
|
|
if (!ChunkUtil.MatchesSender(message, senderName, senderWorld))
|
|
continue;
|
|
|
|
collected.Add(message);
|
|
if (collected.Count >= limit)
|
|
break;
|
|
}
|
|
|
|
// SQL was DESC (newest-first); reverse to oldest-first for tab display.
|
|
collected.Reverse();
|
|
return collected;
|
|
}
|
|
}
|
|
|
|
// Soft-deletes a message so it won't appear in queries.
|
|
internal void DeleteMessage(Guid id)
|
|
{
|
|
lock (_readLock)
|
|
{
|
|
using var cmd = Connection.CreateCommand();
|
|
cmd.CommandText = "UPDATE messages SET Deleted = true WHERE Id = $Id;";
|
|
cmd.Parameters.AddWithValue("$Id", id);
|
|
cmd.ExecuteNonQuery();
|
|
}
|
|
}
|
|
|
|
internal long CountDateRange(
|
|
DateTime after,
|
|
DateTime before,
|
|
IEnumerable<byte> channels,
|
|
ulong? receiver = null
|
|
)
|
|
{
|
|
lock (_readLock)
|
|
{
|
|
using var cmd = Connection.CreateCommand();
|
|
|
|
List<string> whereClauses = ["deleted = false"];
|
|
if (receiver != null)
|
|
whereClauses.Add("Receiver = $Receiver");
|
|
|
|
whereClauses.Add("Date BETWEEN $After AND $Before");
|
|
whereClauses.Add(
|
|
$"ChatType IN ({BindIntList(cmd, "cdr", channels.Select(c => (int)c))})"
|
|
);
|
|
|
|
var whereClause = "WHERE " + string.Join(" AND ", whereClauses);
|
|
|
|
cmd.CommandText =
|
|
@"
|
|
SELECT COUNT(*)
|
|
FROM messages
|
|
" + whereClause;
|
|
|
|
if (receiver != null)
|
|
cmd.Parameters.AddWithValue("$Receiver", receiver);
|
|
|
|
cmd.Parameters.AddWithValue("$After", ((DateTimeOffset)after).ToUnixTimeMilliseconds());
|
|
cmd.Parameters.AddWithValue(
|
|
"$Before",
|
|
((DateTimeOffset)before).ToUnixTimeMilliseconds()
|
|
);
|
|
cmd.CommandTimeout = 120;
|
|
|
|
return (long)cmd.ExecuteScalar()!;
|
|
}
|
|
}
|
|
|
|
// Lock caveat: same lazy-enumerator note as StreamForExport.
|
|
internal MessageEnumerator GetDateRange(
|
|
DateTime after,
|
|
DateTime before,
|
|
IEnumerable<byte> channels,
|
|
ulong? receiver = null
|
|
)
|
|
{
|
|
lock (_readLock)
|
|
{
|
|
var cmd = Connection.CreateCommand();
|
|
|
|
List<string> whereClauses = ["deleted = false"];
|
|
if (receiver != null)
|
|
whereClauses.Add("Receiver = $Receiver");
|
|
|
|
whereClauses.Add("Date BETWEEN $After AND $Before");
|
|
whereClauses.Add(
|
|
$"ChatType IN ({BindIntList(cmd, "gdr", channels.Select(c => (int)c))})"
|
|
);
|
|
|
|
var whereClause = $"WHERE {string.Join(" AND ", whereClauses)}";
|
|
|
|
cmd.CommandText =
|
|
@"
|
|
SELECT
|
|
Id, Receiver, ContentId, Date, ChatType, SourceKind, TargetKind,
|
|
Sender, Content, SenderSource, ContentSource, ExtraChatChannel
|
|
FROM messages
|
|
" + whereClause;
|
|
cmd.CommandTimeout = 120;
|
|
|
|
if (receiver != null)
|
|
cmd.Parameters.AddWithValue("$Receiver", receiver);
|
|
|
|
cmd.Parameters.AddWithValue("$After", ((DateTimeOffset)after).ToUnixTimeMilliseconds());
|
|
cmd.Parameters.AddWithValue(
|
|
"$Before",
|
|
((DateTimeOffset)before).ToUnixTimeMilliseconds()
|
|
);
|
|
|
|
return new MessageEnumerator(
|
|
cmd.ExecuteReader(),
|
|
_loggerFactory.CreateLogger<MessageEnumerator>()
|
|
);
|
|
}
|
|
}
|
|
|
|
// Lock caveat: same lazy-enumerator note as StreamForExport.
|
|
internal MessageEnumerator GetPagedDateRange(
|
|
DateTime after,
|
|
DateTime before,
|
|
IEnumerable<byte> channels,
|
|
ulong? receiver = null,
|
|
int page = 0
|
|
)
|
|
{
|
|
lock (_readLock)
|
|
{
|
|
var cmd = Connection.CreateCommand();
|
|
|
|
List<string> whereClauses = ["deleted = false"];
|
|
if (receiver != null)
|
|
whereClauses.Add("Receiver = $Receiver");
|
|
|
|
whereClauses.Add("Date BETWEEN $After AND $Before");
|
|
whereClauses.Add(
|
|
$"ChatType IN ({BindIntList(cmd, "pdr", channels.Select(c => (int)c))})"
|
|
);
|
|
|
|
var whereClause = $"WHERE {string.Join(" AND ", whereClauses)}";
|
|
|
|
cmd.CommandText =
|
|
@"
|
|
SELECT
|
|
Id, Receiver, ContentId, Date, ChatType, SourceKind, TargetKind,
|
|
Sender, Content, SenderSource, ContentSource, ExtraChatChannel
|
|
FROM messages
|
|
"
|
|
+ whereClause
|
|
+ @"
|
|
ORDER BY Date
|
|
LIMIT $Offset, $OffsetCount;
|
|
";
|
|
cmd.CommandTimeout = 120;
|
|
|
|
if (receiver != null)
|
|
cmd.Parameters.AddWithValue("$Receiver", receiver);
|
|
|
|
cmd.Parameters.AddWithValue("$After", ((DateTimeOffset)after).ToUnixTimeMilliseconds());
|
|
cmd.Parameters.AddWithValue(
|
|
"$Before",
|
|
((DateTimeOffset)before).ToUnixTimeMilliseconds()
|
|
);
|
|
cmd.Parameters.AddWithValue("$Offset", DbViewer.RowPerPage * page);
|
|
cmd.Parameters.AddWithValue("$OffsetCount", DbViewer.RowPerPage);
|
|
|
|
return new MessageEnumerator(
|
|
cmd.ExecuteReader(),
|
|
_loggerFactory.CreateLogger<MessageEnumerator>()
|
|
);
|
|
}
|
|
}
|
|
|
|
// Builds a "$prefix0,$prefix1,..." placeholder list and binds values to the command.
|
|
// SQLite has no native array parameter, so placeholders are generated per entry.
|
|
private static string BindIntList(SqliteCommand cmd, string prefix, IEnumerable<int> values)
|
|
{
|
|
var names = new List<string>();
|
|
var index = 0;
|
|
foreach (var value in values)
|
|
{
|
|
var name = $"${prefix}{index}";
|
|
cmd.Parameters.AddWithValue(name, value);
|
|
names.Add(name);
|
|
index++;
|
|
}
|
|
return string.Join(",", names);
|
|
}
|
|
}
|
|
|
|
internal class MessageEnumerator(DbDataReader reader, ILogger<MessageEnumerator> logger)
|
|
: IEnumerable<Message>,
|
|
IDisposable,
|
|
IAsyncDisposable
|
|
{
|
|
private const int MaxErrorLogs = 10;
|
|
|
|
private readonly ILogger<MessageEnumerator> _logger = logger;
|
|
private readonly List<Guid> FailedIds = [];
|
|
private int FailedCount;
|
|
public bool DidError => FailedCount > 0;
|
|
|
|
public IEnumerator<Message> GetEnumerator()
|
|
{
|
|
while (reader.Read())
|
|
{
|
|
var id = Guid.Empty;
|
|
Message msg;
|
|
try
|
|
{
|
|
// GetGuid up-front so we have an id for the failure log even
|
|
// when the rest of the deserialisation throws downstream.
|
|
id = reader.GetGuid(0);
|
|
msg = MessageStore.ReadMessageRow(reader);
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
if (FailedCount < MaxErrorLogs)
|
|
_logger.LogError($"Exception while reading message '{id}' from database: {e}");
|
|
FailedCount++;
|
|
if (FailedCount == MaxErrorLogs)
|
|
_logger.LogError("Further parsing errors will not be logged");
|
|
if (id != Guid.Empty)
|
|
FailedIds.Add(id);
|
|
|
|
continue;
|
|
}
|
|
|
|
yield return msg;
|
|
}
|
|
}
|
|
|
|
IEnumerator IEnumerable.GetEnumerator()
|
|
{
|
|
return GetEnumerator();
|
|
}
|
|
|
|
public IReadOnlyList<Guid> FailedMessageIds()
|
|
{
|
|
return FailedIds;
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
reader.Dispose();
|
|
}
|
|
|
|
public async ValueTask DisposeAsync()
|
|
{
|
|
await reader.DisposeAsync();
|
|
}
|
|
}
|