Implement some todos from #27
This commit is contained in:
@@ -84,27 +84,41 @@ internal class LegacyMessageImporterEligibility
|
||||
}
|
||||
}
|
||||
|
||||
internal LegacyMessageImporter StartImport(MessageStore targetStore, bool noLog = false)
|
||||
internal LegacyMessageImporter StartImport(MessageStore targetStore, bool noLog = false, Plugin? plugin = null)
|
||||
{
|
||||
if (Status != LegacyMessageImporterEligibilityStatus.Eligible)
|
||||
throw new InvalidOperationException($"Migration not eligible: status is {Status}");
|
||||
|
||||
return new LegacyMessageImporter(targetStore, originalDbPath: OriginalDbPath, migrationDbPath: MigrationDbPath, noLog: noLog);
|
||||
return new LegacyMessageImporter(targetStore, originalDbPath: OriginalDbPath, migrationDbPath: MigrationDbPath, noLog: noLog, plugin);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Makes the migration ineligible so the user won't be asked again.
|
||||
/// </summary>
|
||||
internal void RenameOldDatabase()
|
||||
internal bool RenameOldDatabase()
|
||||
{
|
||||
File.Move(OriginalDbPath, MigrationDbPath);
|
||||
Status = LegacyMessageImporterEligibilityStatus.IneligibleMigrationDbExists;
|
||||
AdditionalIneligibilityInfo = "User chose to rename the old database file";
|
||||
try
|
||||
{
|
||||
File.Move(OriginalDbPath, MigrationDbPath);
|
||||
Status = LegacyMessageImporterEligibilityStatus.IneligibleMigrationDbExists;
|
||||
AdditionalIneligibilityInfo = "User chose to rename the old database file";
|
||||
return true;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Plugin.Log.Error(ex, "Unable to move the old database");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
internal class LegacyMessageImporter : IDisposable
|
||||
internal class LegacyMessageImporter : IAsyncDisposable
|
||||
{
|
||||
private readonly Plugin? Plugin;
|
||||
|
||||
private readonly CancellationTokenSource CancellationToken = new();
|
||||
private Thread? WorkingThread = null;
|
||||
|
||||
internal const string MessagesCollection = "messages";
|
||||
private const int MaxFailedMessageLogs = 10;
|
||||
|
||||
@@ -130,16 +144,17 @@ internal class LegacyMessageImporter : IDisposable
|
||||
// This can be set by the user to limit the rate at which messages are
|
||||
// imported. If the rate exceeds this value, the importer will sleep for the
|
||||
// remainder of the second.
|
||||
internal int MaxMessageRate { get; set; } = 250; // start low
|
||||
internal int MaxMessageRate = 250; // start low
|
||||
|
||||
// Do not call this directly, use
|
||||
// LegacyMessageImporterEligibility.StartImport instead.
|
||||
internal LegacyMessageImporter(MessageStore targetStore, string? originalDbPath = null, string? migrationDbPath = null, bool noLog = false)
|
||||
internal LegacyMessageImporter(MessageStore targetStore, string? originalDbPath = null, string? migrationDbPath = null, bool noLog = false, Plugin? plugin = null)
|
||||
{
|
||||
_targetStore = targetStore;
|
||||
originalDbPath ??= Path.Join(Plugin.Interface.ConfigDirectory.FullName, "chat.db");
|
||||
migrationDbPath ??= migrationDbPath ?? Path.Join(Plugin.Interface.ConfigDirectory.FullName, "chat-litedb.db");
|
||||
_log = noLog ? null : Plugin.Log;
|
||||
Plugin = plugin;
|
||||
|
||||
_log?.Info($"[Migration] Moving '{originalDbPath}' to '{migrationDbPath}'");
|
||||
File.Move(originalDbPath, migrationDbPath);
|
||||
@@ -147,12 +162,30 @@ internal class LegacyMessageImporter : IDisposable
|
||||
_database = Connect(migrationDbPath);
|
||||
|
||||
ImportStart = Environment.TickCount64;
|
||||
new Thread(DoImport).Start();
|
||||
WorkingThread = new Thread(() => DoImport(CancellationToken.Token));
|
||||
WorkingThread.Start();
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
// TODO: cancel thread and wait for it to close
|
||||
_database?.Dispose();
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
await CancellationToken.CancelAsync();
|
||||
|
||||
var timeout = 10_000; // 10s
|
||||
while (WorkingThread != null && timeout > 0)
|
||||
{
|
||||
if (!WorkingThread.IsAlive)
|
||||
break;
|
||||
|
||||
timeout -= 100;
|
||||
await Task.Delay(100);
|
||||
Plugin.Log.Information("Sleeping because thread still alive");
|
||||
}
|
||||
|
||||
_database?.Dispose();
|
||||
}
|
||||
|
||||
@@ -248,7 +281,7 @@ internal class LegacyMessageImporter : IDisposable
|
||||
return conn;
|
||||
}
|
||||
|
||||
private void DoImport()
|
||||
private void DoImport(CancellationToken token)
|
||||
{
|
||||
var importRateTimer = Stopwatch.StartNew();
|
||||
var messagesInLastSecond = 0;
|
||||
@@ -261,6 +294,9 @@ internal class LegacyMessageImporter : IDisposable
|
||||
var messages = messagesCollection.Query().OrderBy(msg => msg.Date).ToDocuments();
|
||||
foreach (var messageDoc in messages)
|
||||
{
|
||||
if (token.IsCancellationRequested)
|
||||
return;
|
||||
|
||||
try
|
||||
{
|
||||
var message = BsonDocumentToMessage(messageDoc);
|
||||
@@ -271,8 +307,7 @@ internal class LegacyMessageImporter : IDisposable
|
||||
{
|
||||
FailedMessages++;
|
||||
if (FailedMessages <= MaxFailedMessageLogs)
|
||||
_log?.Error(
|
||||
$"[Migration] Failed to import message '{messageDoc["_id"].AsObjectId}' (usually due to corruption): {e}");
|
||||
_log?.Error($"[Migration] Failed to import message '{messageDoc["_id"].AsObjectId}' (usually due to corruption): {e}");
|
||||
if (FailedMessages == MaxFailedMessageLogs)
|
||||
_log?.Error("[Migration] Further failed message logs will be suppressed");
|
||||
}
|
||||
@@ -293,19 +328,19 @@ internal class LegacyMessageImporter : IDisposable
|
||||
|
||||
// Log every 1,000 messages
|
||||
if ((SuccessfulMessages + FailedMessages) % 1000 == 0)
|
||||
_log?.Information(
|
||||
$"[Migration] Progress: successfully imported {SuccessfulMessages}/{totalMessages} messages ({FailedMessages} failures)");
|
||||
_log?.Information($"[Migration] Progress: successfully imported {SuccessfulMessages}/{totalMessages} messages ({FailedMessages} failures)");
|
||||
}
|
||||
|
||||
_log?.Information($"[Migration] Imported {SuccessfulMessages}/{FailedMessages} messages, {FailedMessages} failed");
|
||||
|
||||
if (ProcessedMessages != totalMessages)
|
||||
_log?.Warning(
|
||||
$"[Migration] Total message count mismatch: expected {totalMessages}, got {SuccessfulMessages + FailedMessages}");
|
||||
_log?.Warning($"[Migration] Total message count mismatch: expected {totalMessages}, got {SuccessfulMessages + FailedMessages}");
|
||||
|
||||
ImportComplete = Environment.TickCount64;
|
||||
_database.Dispose();
|
||||
_database = null;
|
||||
|
||||
if (Plugin != null)
|
||||
Plugin.Framework.Run(() => Plugin.MessageManager.FilterAllTabs(false), token);
|
||||
}
|
||||
|
||||
private static Message BsonDocumentToMessage(BsonDocument doc)
|
||||
|
||||
Reference in New Issue
Block a user