Files
HellionChat/HellionChat/MessageStore.cs
T
JonKazama-Hellion c9dfd024b2 docs(comments): trim verbose dispose and thread rationale
Match the new HellionChat comment-length convention: 1-3 lines for
standard pitfall notes, 5+ only for non-trivial workarounds. The
previous Dispose comment was 14 lines of textbook prose, which veered
into AI-slop territory and would rot on the next refactor.
2026-05-07 01:10:50 +02:00

923 lines
34 KiB
C#

using System.Buffers;
using System.Collections;
using System.Data.Common;
using HellionChat.Code;
using HellionChat.Ui;
using HellionChat.Util;
using Dalamud.Game.Text.SeStringHandling;
using MessagePack;
using MessagePack.Formatters;
using MessagePack.Resolvers;
using Microsoft.Data.Sqlite;
using DalamudUtil = Dalamud.Utility.Util;
using Encoding = System.Text.Encoding;
namespace HellionChat;
internal static class DbExtensions
{
internal static void Execute(this DbConnection conn, string sql)
{
using var cmd = conn.CreateCommand();
cmd.CommandText = sql;
cmd.ExecuteNonQuery();
}
}
internal enum PayloadMessagePackType : byte
{
Achievement,
PartyFinder,
Uri,
Emote,
Other = 255,
}
public class PayloadMessagePackFormatter : IMessagePackFormatter<Payload?>
{
public void Serialize(ref MessagePackWriter writer, Payload? value, MessagePackSerializerOptions options)
{
if (value == null)
{
writer.WriteNil();
return;
}
writer.WriteArrayHeader(2);
switch (value)
{
case AchievementPayload achievementPayload:
writer.WriteUInt8((byte)PayloadMessagePackType.Achievement);
writer.WriteUInt32(achievementPayload.Id);
break;
case PartyFinderPayload partyFinderPayload:
writer.WriteUInt8((byte)PayloadMessagePackType.PartyFinder);
writer.WriteUInt32(partyFinderPayload.Id);
break;
case UriPayload uriPayload:
writer.WriteUInt8((byte)PayloadMessagePackType.Uri);
writer.WriteString(Encoding.UTF8.GetBytes(uriPayload.Uri.ToString()));
break;
case EmotePayload emotePayload:
writer.WriteUInt8((byte)PayloadMessagePackType.Emote);
writer.WriteString(Encoding.UTF8.GetBytes(emotePayload.Code));
break;
default:
writer.WriteUInt8((byte)PayloadMessagePackType.Other);
writer.Write(value.Encode());
break;
}
}
public Payload? Deserialize(ref MessagePackReader reader, MessagePackSerializerOptions options)
{
if (reader.TryReadNil())
return null;
if (reader.ReadArrayHeader() != 2)
throw new InvalidOperationException("Invalid array count for Payload object");
var type = (PayloadMessagePackType)reader.ReadByte();
switch (type)
{
case PayloadMessagePackType.Achievement:
return new AchievementPayload(reader.ReadUInt32());
case PayloadMessagePackType.PartyFinder:
return new PartyFinderPayload(reader.ReadUInt32());
case PayloadMessagePackType.Uri:
return new UriPayload(new Uri(reader.ReadString() ?? ""));
case PayloadMessagePackType.Emote:
return EmotePayload.ResolveEmote(reader.ReadString() ?? "");
case PayloadMessagePackType.Other:
default:
var bytes = reader.ReadBytes() ?? new ReadOnlySequence<byte>();
var binReader = new BinaryReader(new MemoryStream(bytes.ToArray()));
return Payload.Decode(binReader);
}
}
}
public class SeStringMessagePackFormatter : IMessagePackFormatter<SeString?>
{
public void Serialize(ref MessagePackWriter writer, SeString? value, MessagePackSerializerOptions options)
{
options.Resolver.GetFormatter<List<Payload>>()!.Serialize(ref writer, value?.Payloads ?? [], options);
}
public SeString Deserialize(ref MessagePackReader reader, MessagePackSerializerOptions options)
{
return new SeString(options.Resolver.GetFormatter<List<Payload>>()!.Deserialize(ref reader, options));
}
}
internal class MessageStore : IDisposable
{
private const int MessageQueryLimit = 10_000;
private string DbPath { get; }
private SqliteConnection Connection { get; set; }
internal static readonly MessagePackSerializerOptions MsgPackOptions = MessagePackSerializerOptions.Standard
.WithResolver(CompositeResolver.Create([new PayloadMessagePackFormatter(), new SeStringMessagePackFormatter()], [StandardResolver.Instance]));
internal MessageStore(string dbPath)
{
DbPath = dbPath;
Connection = Connect();
Migrate();
}
public void Dispose()
{
// Pooling=false (set in Connect) avoids ClearAllPools, which is
// provider-wide and would touch other plugins' SQLite connections.
// GC.Collect was here as a defensive flush; removed because explicit
// Close already releases everything we hold.
Connection.Close();
Connection.Dispose();
}
private SqliteConnection Connect()
{
var uriBuilder = new SqliteConnectionStringBuilder
{
DataSource = DbPath,
DefaultTimeout = 5,
Pooling = false,
Mode = SqliteOpenMode.ReadWriteCreate,
};
var conn = new SqliteConnection(uriBuilder.ToString());
conn.Open();
conn.Execute(@"PRAGMA journal_mode=WAL;");
conn.Execute(@"PRAGMA synchronous=NORMAL;");
if (DalamudUtil.IsWine())
conn.Execute(@"PRAGMA cache_size = 32768;");
return conn;
}
private void Migrate()
{
// Get current user_version.
using var cmd = Connection.CreateCommand();
cmd.CommandText = "PRAGMA user_version;";
var userVersion = Convert.ToInt32(cmd.ExecuteScalar());
var migrationsToDo = new List<Action>();
switch (userVersion)
{
case <= 0:
migrationsToDo.Add(Migrate0);
// Migration support was only added in version 1. Migrate 0 is
// idempotent.
migrationsToDo.Add(Migrate1);
migrationsToDo.Add(Migrate2);
migrationsToDo.Add(Migrate3);
break;
case 1:
migrationsToDo.Add(Migrate2);
migrationsToDo.Add(Migrate3);
break;
case 2:
migrationsToDo.Add(Migrate3);
break;
}
foreach (var migration in migrationsToDo)
migration();
}
private void Migrate0()
{
Plugin.Log.Information("Running migration 0: Creating tables");
Connection.Execute(@"
CREATE TABLE IF NOT EXISTS messages (
Id BLOB PRIMARY KEY NOT NULL, -- Guid
Receiver INTEGER NOT NULL, -- uint64 (first bits are always 0)
ContentId INTEGER NOT NULL, -- uint64 (first bits are always 0)
Date INTEGER NOT NULL, -- unix timestamp with millisecond precision
Code INTEGER NOT NULL, -- ChatCode encoding
Sender BLOB NOT NULL, -- Chunk[] msgpack
Content BLOB NOT NULL, -- Chunk[] msgpack
SenderSource BLOB NOT NULL, -- SeString
ContentSource BLOB NOT NULL, -- SeString
SortCode INTEGER NOT NULL, -- SortCode encoding
ExtraChatChannel BLOB NOT NULL -- Guid
);
CREATE INDEX IF NOT EXISTS idx_messages_receiver ON messages (Receiver);
CREATE INDEX IF NOT EXISTS idx_messages_date ON messages (Date);
");
SetMigrationVersion(0);
}
private void Migrate1()
{
Plugin.Log.Information("Running migration 1: Adding Deleted column");
Connection.Execute(@"
-- Migration 1: Add Deleted column
ALTER TABLE messages ADD COLUMN Deleted BOOLEAN NOT NULL DEFAULT false;
");
SetMigrationVersion(1);
}
private void Migrate2()
{
Plugin.Log.Information("Running migration 2: Adding Channel generated column");
Connection.Execute(@"
-- Migration 2: Add Channel generated column
ALTER TABLE messages ADD COLUMN Channel INTEGER GENERATED ALWAYS AS (Code & 0x7f) VIRTUAL;
CREATE INDEX IF NOT EXISTS idx_messages_channel ON messages (Channel);
");
SetMigrationVersion(2);
}
private bool ColumnExists(string table, string column)
{
// PRAGMA does not accept SQLite parameter bindings. The table name is
// a compile-time constant fed in from internal call sites, so the
// interpolation cannot be reached from any user-controlled path.
using var cmd = Connection.CreateCommand();
cmd.CommandText = $"PRAGMA table_info({table});";
using var reader = cmd.ExecuteReader();
while (reader.Read())
{
if (reader.GetString(1) == column)
return true;
}
return false;
}
private void Migrate3()
{
Plugin.Log.Information("Running migration 3: Fix log kinds to fit the new format");
// Recovery for partially-applied Migrate3: if the schema is already
// in its target shape (new columns exist, old Code column gone) but
// user_version was never bumped, just record the version and exit.
if (ColumnExists("messages", "ChatType") && !ColumnExists("messages", "Code"))
{
Plugin.Log.Information("Migration 3: schema already migrated, only bumping user_version");
SetMigrationVersion(3);
return;
}
Connection.Execute(@"
-- Migration 3: Fix log kinds to fit the new format
-- Add new ChatType, SourceKind, TargetKind (byte), SortCodeV2
-- Migrate OldChatColumn
-- ChatType = OldChatColumn & 0x7f
-- SourceKind = log2(1 << ((OldChatColumn >> 11) & 0xF))
-- TargetKind = trunc(log2(1 << ((OldChatColumn >> 7) & 0xF)))
-- Virtual SortCodeV2 = ChatType << 16 | SourceKind << 8 | TargetKind
-- Delete OldChatColumn, Virtual Channel
ALTER TABLE messages ADD COLUMN ChatType INTEGER;
CREATE INDEX IF NOT EXISTS idx_messages_chat_type ON messages (ChatType);
ALTER TABLE messages ADD COLUMN SourceKind INTEGER;
ALTER TABLE messages ADD COLUMN TargetKind INTEGER;
UPDATE messages SET
ChatType = Code & 0x7f,
SourceKind = trunc(log2(1 << ((Code >> 11) & 0xF))),
TargetKind = trunc(log2(1 << ((Code >> 7) & 0xF)))
WHERE true;
DROP INDEX idx_messages_channel;
ALTER TABLE messages DROP COLUMN Channel;
ALTER TABLE messages DROP COLUMN Code;
ALTER TABLE messages DROP COLUMN SortCode;
");
SetMigrationVersion(3);
}
private void SetMigrationVersion(int version)
{
Plugin.Log.Information($"Setting version {version}");
using var cmd = Connection.CreateCommand();
// PRAGMA does not accept SQLite parameter bindings, and there is no
// pragma_ function variant that can set the version either. The
// version is a compile-time int from the migration sequence, never
// user input.
cmd.CommandText = $"PRAGMA user_version = {version};";
cmd.ExecuteNonQuery();
}
internal void ClearMessages()
{
Connection.Execute("DELETE FROM messages;");
PerformMaintenance();
}
/// <summary>
/// Returns a (ChatType, count) snapshot over non-deleted messages.
/// Used by the Privacy tab to preview the impact of a retroactive
/// cleanup before the user confirms.
/// </summary>
internal Dictionary<int, long> GetMessageCountsByChatType()
{
var result = new Dictionary<int, long>();
using var cmd = Connection.CreateCommand();
cmd.CommandText = "SELECT ChatType, COUNT(*) FROM messages WHERE deleted = false GROUP BY ChatType;";
cmd.CommandTimeout = 120;
using var reader = cmd.ExecuteReader();
while (reader.Read())
{
var chatType = reader.GetInt32(0);
var count = reader.GetInt64(1);
result[chatType] = count;
}
return result;
}
/// <summary>
/// Deletes messages older than the per-channel retention window, with a
/// global default for channels not listed explicitly. Cutoffs are
/// computed from "now" at call time. Runs VACUUM only if anything was
/// removed. Returns the number of rows deleted.
/// </summary>
internal long DeleteByRetentionPolicy(IReadOnlyDictionary<int, int> chatTypeDaysMap, int defaultDays)
{
if (defaultDays < 0)
throw new ArgumentOutOfRangeException(nameof(defaultDays), "Negative retention is not allowed.");
foreach (var (_, days) in chatTypeDaysMap)
if (days < 0)
throw new ArgumentOutOfRangeException(nameof(chatTypeDaysMap), "Negative retention is not allowed.");
var nowMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
if (chatTypeDaysMap.Count == 0 && defaultDays <= 0)
return 0;
long deleted;
using (var cmd = Connection.CreateCommand())
{
var clauses = new List<string>();
var index = 0;
foreach (var (type, days) in chatTypeDaysMap)
{
var cutoff = nowMs - days * 86400000L;
var typeParam = $"$type{index}";
var cutoffParam = $"$cutoff{index}";
cmd.Parameters.AddWithValue(typeParam, type);
cmd.Parameters.AddWithValue(cutoffParam, cutoff);
clauses.Add($"(ChatType = {typeParam} AND Date < {cutoffParam})");
index++;
}
// Catch-all for channels without an explicit override. "0" is
// treated as "do not delete by default" — without an explicit
// user override, unmapped channels stay forever instead of
// getting wiped immediately.
if (defaultDays > 0)
{
var defaultCutoff = nowMs - defaultDays * 86400000L;
cmd.Parameters.AddWithValue("$defaultCutoff", defaultCutoff);
var explicitPlaceholders = chatTypeDaysMap.Count > 0
? BindIntList(cmd, "explicit", chatTypeDaysMap.Keys)
: "-1"; // empty list would produce invalid SQL
clauses.Add($"(ChatType NOT IN ({explicitPlaceholders}) AND Date < $defaultCutoff)");
}
if (clauses.Count == 0)
return 0;
cmd.CommandText = $"DELETE FROM messages WHERE {string.Join(" OR ", clauses)};";
cmd.CommandTimeout = 600;
deleted = cmd.ExecuteNonQuery();
}
if (deleted > 0)
PerformMaintenance();
return deleted;
}
/// <summary>
/// Hard-deletes every message whose ChatType is not in the supplied
/// allowlist, then VACUUMs the database to reclaim disk space.
/// Returns the number of rows deleted.
/// </summary>
internal long CleanupRetainOnly(IReadOnlyCollection<int> allowedTypes)
{
if (allowedTypes.Count == 0)
{
// Defensive: refuse a "delete everything" disguised as a filter.
// Use ClearMessages() if a full wipe is actually intended.
throw new InvalidOperationException("CleanupRetainOnly requires at least one allowed ChatType. Use ClearMessages for a full wipe.");
}
long deleted;
using (var cmd = Connection.CreateCommand())
{
var placeholders = BindIntList(cmd, "ct", allowedTypes);
cmd.CommandText = $"DELETE FROM messages WHERE ChatType NOT IN ({placeholders});";
cmd.CommandTimeout = 600;
deleted = cmd.ExecuteNonQuery();
}
PerformMaintenance();
return deleted;
}
internal void PerformMaintenance()
{
Connection.Execute(@"
VACUUM;
REINDEX messages;
ANALYZE;
");
}
private string LogPath => DbPath + "-wal";
internal long DatabaseSize() => !File.Exists(DbPath) ? 0 : new FileInfo(DbPath).Length;
internal long DatabaseLogSize() => !File.Exists(LogPath) ? 0 : new FileInfo(LogPath).Length;
internal int MessageCount()
{
using var cmd = Connection.CreateCommand();
cmd.CommandText = "SELECT COUNT(*) FROM messages;";
return Convert.ToInt32(cmd.ExecuteScalar());
}
internal void UpsertMessage(Message message)
{
// Hellion Chat privacy filter — drop disallowed ChatTypes before
// they reach the storage layer (single source of truth, also
// covers any future write paths e.g. webinterface backfill).
if (!Plugin.Config.IsAllowedForStorage(message.Code.Type))
{
// Verbose-only: this fires for every dropped message, which is
// the common case for users with a tight privacy whitelist. Keep
// it for diagnostics but stay out of the default xllog stream.
Plugin.Log.Verbose($"Privacy filter dropped message: ChatType={message.Code.Type}");
return;
}
using var cmd = Connection.CreateCommand();
cmd.CommandText = @"
INSERT INTO messages (
Id,
Receiver,
ContentId,
Date,
ChatType,
SourceKind,
TargetKind,
Sender,
Content,
SenderSource,
ContentSource,
ExtraChatChannel,
Deleted
) VALUES (
$Id,
$Receiver,
$ContentId,
$Date,
$ChatType,
$SourceKind,
$TargetKind,
$Sender,
$Content,
$SenderSource,
$ContentSource,
$ExtraChatChannel,
false
)
ON CONFLICT (id) DO UPDATE SET
Receiver = excluded.Receiver,
ContentId = excluded.ContentId,
Date = excluded.Date,
ChatType = excluded.ChatType,
SourceKind = excluded.SourceKind,
TargetKind = excluded.TargetKind,
Sender = excluded.Sender,
Content = excluded.Content,
SenderSource = excluded.SenderSource,
ContentSource = excluded.ContentSource,
ExtraChatChannel = excluded.ExtraChatChannel,
Deleted = false;
";
cmd.Parameters.AddWithValue("$Id", message.Id);
cmd.Parameters.AddWithValue("$Receiver", message.Receiver);
cmd.Parameters.AddWithValue("$ContentId", message.ContentId);
cmd.Parameters.AddWithValue("$Date", message.Date.ToUnixTimeMilliseconds());
cmd.Parameters.AddWithValue("$ChatType", message.Code.Type);
cmd.Parameters.AddWithValue("$SourceKind", message.Code.Source);
cmd.Parameters.AddWithValue("$TargetKind", message.Code.Target);
cmd.Parameters.AddWithValue("$Sender", MessagePackSerializer.Serialize(message.Sender, MsgPackOptions));
cmd.Parameters.AddWithValue("$Content", MessagePackSerializer.Serialize(message.Content, MsgPackOptions));
cmd.Parameters.AddWithValue("$SenderSource", MessagePackSerializer.Serialize(message.SenderSource, MsgPackOptions));
cmd.Parameters.AddWithValue("$ContentSource", MessagePackSerializer.Serialize(message.ContentSource, MsgPackOptions));
cmd.Parameters.AddWithValue("$ExtraChatChannel", message.ExtraChatChannel);
cmd.ExecuteNonQuery();
}
/// <summary>
/// Streams messages for export. Optional filters:
/// - <paramref name="chatTypes"/>: limit to these ChatTypes
/// - <paramref name="from"/> / <paramref name="to"/>: inclusive date range
/// Result is sorted ascending by Date and excludes soft-deleted rows.
/// Caller is responsible for disposing the enumerator.
/// </summary>
internal MessageEnumerator StreamForExport(
IReadOnlyCollection<int>? chatTypes,
DateTimeOffset? from,
DateTimeOffset? to)
{
var cmd = Connection.CreateCommand();
var clauses = new List<string> { "deleted = false" };
if (chatTypes is { Count: > 0 })
clauses.Add($"ChatType IN ({BindIntList(cmd, "exct", chatTypes)})");
if (from is not null)
clauses.Add("Date >= $From");
if (to is not null)
clauses.Add("Date <= $To");
cmd.CommandText = @"
SELECT
Id,
Receiver,
ContentId,
Date,
ChatType,
SourceKind,
TargetKind,
Sender,
Content,
SenderSource,
ContentSource,
ExtraChatChannel
FROM messages
WHERE " + string.Join(" AND ", clauses) + @"
ORDER BY Date ASC;";
cmd.CommandTimeout = 600;
if (from is not null)
cmd.Parameters.AddWithValue("$From", from.Value.ToUnixTimeMilliseconds());
if (to is not null)
cmd.Parameters.AddWithValue("$To", to.Value.ToUnixTimeMilliseconds());
return new MessageEnumerator(cmd.ExecuteReader());
}
/// <summary>
/// Get the most recent messages.
/// </summary>
/// <param name="receiver">The receiver content ID to filter by. If null, no filtering is performed.</param>
/// <param name="since">Only show messages since this date. If null, no filtering is performed.</param>
/// <param name="count">The amount to return. Defaults to 10,000.</param>
internal MessageEnumerator GetMostRecentMessages(ulong? receiver = null, DateTimeOffset? since = null, int count = MessageQueryLimit)
{
List<string> whereClauses = ["deleted = false"];
if (receiver != null)
whereClauses.Add("Receiver = $Receiver");
if (since != null)
whereClauses.Add("Date >= $Since");
var whereClause = "WHERE " + string.Join(" AND ", whereClauses);
var cmd = Connection.CreateCommand();
// Select last N messages by date DESC, but reverse the order to get
// them in ascending order.
cmd.CommandText = @"
SELECT *
FROM (
SELECT
Id,
Receiver,
ContentId,
Date,
ChatType,
SourceKind,
TargetKind,
Sender,
Content,
SenderSource,
ContentSource,
ExtraChatChannel
FROM messages
" + whereClause + @"
ORDER BY Date DESC
LIMIT $Count
)
ORDER BY Date ASC;
";
cmd.CommandTimeout = 120; // this could take a while on slow computers
if (receiver != null)
cmd.Parameters.AddWithValue("$Receiver", receiver);
if (since != null)
cmd.Parameters.AddWithValue("$Since", since.Value.ToUnixTimeMilliseconds());
cmd.Parameters.AddWithValue("$Count", count);
return new MessageEnumerator(cmd.ExecuteReader());
}
/// <summary>
/// Hellion Chat — Auto-Tell-Tabs history preload.
///
/// Returns up to <paramref name="limit"/> tells exchanged with the named
/// player, oldest-first, ready to be added to a freshly spawned auto
/// tell tab. The Sender column is a serialized chunk blob, so SQL on its
/// own cannot filter by player identity; we narrow with SQL on Receiver
/// + ChatType (cheap, indexed) and let the client do the final
/// PlayerPayload comparison on the result set.
///
/// <paramref name="sqlScanLimit"/> caps how many recent tells we scan
/// before giving up. 500 covers around 10 days for an active greeter
/// and stays well under the 20 ms budget required to keep the spawn on
/// the message-processing worker thread.
/// </summary>
internal IReadOnlyList<Message> GetTellHistoryWithSender(
ulong receiver,
string senderName,
uint senderWorld,
int limit,
int sqlScanLimit = 500)
{
if (limit <= 0)
{
return [];
}
using var cmd = Connection.CreateCommand();
cmd.CommandText = @"
SELECT
Id,
Receiver,
ContentId,
Date,
ChatType,
SourceKind,
TargetKind,
Sender,
Content,
SenderSource,
ContentSource,
ExtraChatChannel
FROM messages
WHERE deleted = false
AND Receiver = $Receiver
AND ChatType IN ($TellIncoming, $TellOutgoing)
ORDER BY Date DESC
LIMIT $ScanLimit;
";
cmd.CommandTimeout = 60;
cmd.Parameters.AddWithValue("$Receiver", receiver);
cmd.Parameters.AddWithValue("$TellIncoming", (int)ChatType.TellIncoming);
cmd.Parameters.AddWithValue("$TellOutgoing", (int)ChatType.TellOutgoing);
cmd.Parameters.AddWithValue("$ScanLimit", sqlScanLimit);
var collected = new List<Message>();
using var enumerator = new MessageEnumerator(cmd.ExecuteReader());
foreach (var message in enumerator)
{
if (!ChunkUtil.MatchesSender(message, senderName, senderWorld))
{
continue;
}
collected.Add(message);
if (collected.Count >= limit)
{
break;
}
}
// SQL was DESC (newest-first) so we hit the limit on the most
// recent matching tells. Reverse to oldest-first for chronological
// display in the tab.
collected.Reverse();
return collected;
}
/// <summary>
/// Marks a message as deleted so it won't get returned in queries.
/// </summary>
internal void DeleteMessage(Guid id)
{
using var cmd = Connection.CreateCommand();
cmd.CommandText = "UPDATE messages SET Deleted = true WHERE Id = $Id;";
cmd.Parameters.AddWithValue("$Id", id);
cmd.ExecuteNonQuery();
}
internal long CountDateRange(DateTime after, DateTime before, IEnumerable<byte> channels, ulong? receiver = null)
{
using var cmd = Connection.CreateCommand();
List<string> whereClauses = ["deleted = false"];
if (receiver != null)
whereClauses.Add("Receiver = $Receiver");
whereClauses.Add("Date BETWEEN $After AND $Before");
whereClauses.Add($"ChatType IN ({BindIntList(cmd, "cdr", channels.Select(c => (int)c))})");
var whereClause = "WHERE " + string.Join(" AND ", whereClauses);
// Select last N messages by date DESC, but reverse the order to get
// them in ascending order.
cmd.CommandText = @"
SELECT COUNT(*)
FROM messages
" + whereClause;
if (receiver != null)
cmd.Parameters.AddWithValue("$Receiver", receiver);
cmd.Parameters.AddWithValue("$After", ((DateTimeOffset) after).ToUnixTimeMilliseconds());
cmd.Parameters.AddWithValue("$Before", ((DateTimeOffset) before).ToUnixTimeMilliseconds());
cmd.CommandTimeout = 120; // this could take a while on slow computers
return (long) cmd.ExecuteScalar()!;
}
internal MessageEnumerator GetDateRange(DateTime after, DateTime before, IEnumerable<byte> channels, ulong? receiver = null)
{
var cmd = Connection.CreateCommand();
List<string> whereClauses = ["deleted = false"];
if (receiver != null)
whereClauses.Add("Receiver = $Receiver");
whereClauses.Add("Date BETWEEN $After AND $Before");
whereClauses.Add($"ChatType IN ({BindIntList(cmd, "gdr", channels.Select(c => (int)c))})");
var whereClause = $"WHERE {string.Join(" AND ", whereClauses)}";
// Select last N messages by date DESC, but reverse the order to get
// them in ascending order.
cmd.CommandText = @"
SELECT
Id,
Receiver,
ContentId,
Date,
ChatType,
SourceKind,
TargetKind,
Sender,
Content,
SenderSource,
ContentSource,
ExtraChatChannel
FROM messages
" + whereClause;
cmd.CommandTimeout = 120; // this could take a while on slow computers
if (receiver != null)
cmd.Parameters.AddWithValue("$Receiver", receiver);
cmd.Parameters.AddWithValue("$After", ((DateTimeOffset) after).ToUnixTimeMilliseconds());
cmd.Parameters.AddWithValue("$Before", ((DateTimeOffset) before).ToUnixTimeMilliseconds());
return new MessageEnumerator(cmd.ExecuteReader());
}
internal MessageEnumerator GetPagedDateRange(DateTime after, DateTime before, IEnumerable<byte> channels, ulong? receiver = null, int page = 0)
{
var cmd = Connection.CreateCommand();
List<string> whereClauses = ["deleted = false"];
if (receiver != null)
whereClauses.Add("Receiver = $Receiver");
whereClauses.Add("Date BETWEEN $After AND $Before");
whereClauses.Add($"ChatType IN ({BindIntList(cmd, "pdr", channels.Select(c => (int)c))})");
var whereClause = $"WHERE {string.Join(" AND ", whereClauses)}";
// Select last N messages by date DESC, but reverse the order to get
// them in ascending order.
cmd.CommandText = @"
SELECT
Id,
Receiver,
ContentId,
Date,
ChatType,
SourceKind,
TargetKind,
Sender,
Content,
SenderSource,
ContentSource,
ExtraChatChannel
FROM messages
" + whereClause + @"
ORDER BY Date
LIMIT $Offset, $OffsetCount;
";
cmd.CommandTimeout = 120; // this could take a while on slow computers
if (receiver != null)
cmd.Parameters.AddWithValue("$Receiver", receiver);
cmd.Parameters.AddWithValue("$After", ((DateTimeOffset) after).ToUnixTimeMilliseconds());
cmd.Parameters.AddWithValue("$Before", ((DateTimeOffset) before).ToUnixTimeMilliseconds());
cmd.Parameters.AddWithValue("$Offset", DbViewer.RowPerPage * page);
cmd.Parameters.AddWithValue("$OffsetCount", DbViewer.RowPerPage);
return new MessageEnumerator(cmd.ExecuteReader());
}
// Build "$prefix0,$prefix1,..." placeholder list and bind values to
// the command. SQLite has no native array parameter, so we generate
// the list at runtime and bind each entry under its own name. Used
// for IN-clauses and similar dynamic-arity SQL fragments.
private static string BindIntList(SqliteCommand cmd, string prefix, IEnumerable<int> values)
{
var names = new List<string>();
var index = 0;
foreach (var value in values)
{
var name = $"${prefix}{index}";
cmd.Parameters.AddWithValue(name, value);
names.Add(name);
index++;
}
return string.Join(",", names);
}
}
internal class MessageEnumerator(DbDataReader reader) : IEnumerable<Message>, IDisposable, IAsyncDisposable
{
private const int MaxErrorLogs = 10;
// FailedIds and FailedCount are separate, because messages might fail to
// even parse the ID field.
private readonly List<Guid> FailedIds = [];
private int FailedCount;
public bool DidError => FailedCount > 0;
public IEnumerator<Message> GetEnumerator()
{
while (reader.Read())
{
var id = Guid.Empty;
Message msg;
try
{
id = reader.GetGuid(0);
msg = new Message(
id,
(ulong)reader.GetInt64(1),
(ulong)reader.GetInt64(2),
DateTimeOffset.FromUnixTimeMilliseconds(reader.GetInt64(3)),
new ChatCode((byte)reader.GetInt32(4), (byte)reader.GetInt32(5), (byte)reader.GetInt32(6)),
MessagePackSerializer.Deserialize<List<Chunk>>(reader.GetFieldValue<byte[]>(7), MessageStore.MsgPackOptions),
MessagePackSerializer.Deserialize<List<Chunk>>(reader.GetFieldValue<byte[]>(8), MessageStore.MsgPackOptions),
MessagePackSerializer.Deserialize<SeString>(reader.GetFieldValue<byte[]>(9), MessageStore.MsgPackOptions),
MessagePackSerializer.Deserialize<SeString>(reader.GetFieldValue<byte[]>(10), MessageStore.MsgPackOptions),
reader.GetGuid(11)
);
}
catch (Exception e)
{
if (FailedCount < MaxErrorLogs)
Plugin.Log.Error($"Exception while reading message '{id}' from database: {e}");
FailedCount++;
if (FailedCount == MaxErrorLogs)
Plugin.Log.Error("Further parsing errors will not be logged");
if (id != Guid.Empty)
FailedIds.Add(id);
continue;
}
yield return msg;
}
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
public IReadOnlyList<Guid> FailedMessageIds()
{
return FailedIds;
}
public void Dispose()
{
reader.Dispose();
}
public async ValueTask DisposeAsync()
{
await reader.DisposeAsync();
}
}