середа, 20 червня 2012 р.

Синхронізація даних в MSSql і MySql на основі NNibernate

На етапі послідовного переходу з MS SQL Server на MySql постала задача синхронізації даних. Необхідно було узгоджувати окремі таблиці, вибірки і визначені записи. Найзручнішим продуктом для вирішення задачі на початку був продукт Cross-Database Studio (trial) від DBBalance, але згодом необхідно було автоматично узгоджувати записи.

Проект СУНП "Універсум" від звичайних stored procedure повільно переходив на NHibernate. Ця ORM виявилася необхідним і достатнім інструментом для посталеної задачі.

Всі об`єкти, які представляють таблиці даних, реалізовують інтерфейс IItem:

/// <summary>
/// Інтерфейс найпростішого запису
/// </summary>
public interface IItem
{
    /// <summary>
    /// Ідентифікатор
    /// </summary>
    int Id { get; set; }
    /// <summary>
    /// Назва
    /// </summary>
    string Name { get; set; }
    /// <summary>
    /// Стан запису
    /// </summary>
    ItemState State { get; set; }
}
/// <summary>
/// Види бази даних
/// </summary>
public enum DbType
{
    MySql,
    MsSql
}
/// <summary>
/// Стан запису
/// </summary>
public enum ItemState : byte
{
    /// <summary>
    /// Невідомо
    /// </summary>
    Unknown,
    /// <summary>
    /// Для видалення
    /// </summary>
    ForDelete,
    /// <summary>
    /// Для вставки або оновлення
    /// </summary>
    ForUpdate,
    /// <summary>
    /// Видалений
    /// </summary>
    Deleted,
    /// <summary>
    /// Відмічений
    /// </summary>
    Checked,
    /// <summary>
    /// Знайдений, існуючий
    /// </summary>
    Existed,
    /// <summary>
    /// Збережений
    /// </summary>
    Saved,
    /// <summary>
    /// Новий запис
    /// </summary>
    New,
    /// <summary>
    /// Доданий
    /// </summary>
    Added
}
/// <summary>
/// Синхронізує об`єкт типу T
/// </summary>
/// <typeparam name="T">Тип об`єкта (назва таблиці)</typeparam>
/// <param name="type">База даних, яка є призначенням (в яку відбувається синхронізація)</param>
public static void Sync<T>(DbType type) where T : IItem
{
    //
    IDictionary<int, T> source = new Dictionary<int, T>();
    IDictionary<int, T> destination = new Dictionary<int, T>();

    //вибираємо всі властивості об`єкта (записа таблиці даних)
    var allProps = typeof(T).GetProperties().ToList();
    //вибираємо лише властивості, які змінюються (властивості,  які відповідають полям у базі даних)
   var props = allProps.Where(i => i.PropertyType.Attributes.HasFlag(TypeAttributes.Sealed) && !i.PropertyType.IsEnum).ToList();

    IList<T> data = new List<T>();
    //сесія бази, яка є джерелом
    NHibernate.ISession sourceDb = null;
    //сесія бази, яка є призначенням
    NHibernate.ISession destinDb = null;

    if (type == DbType.MySql)
    {
        sourceDb = DbServer.GetMsSqlSession();
        destinDb = DbServer.GetMySqlSession();
    }
    else
    {
        sourceDb = DbServer.GetMySqlSession();
        destinDb = DbServer.GetMsSqlSession();
    }
    //завантажуємо дані в словник джерела
    source = sourceDb.Query<T>().ToDictionary(i => i.Id, i => i);

    if (source.Count == 0)
        return;

    destination = destinDb.Query<T>().ToDictionary(i => i.Id, i => i);

    try
    {
        DateTime start = DateTime.Now;
        source.Values.AsParallel().ForAll(item =>
        {
            if (!destination.ContainsKey(item.Id))
            {
                //якщо запис відсутній, позначаємо, що він буде новий
                item.State = ItemState.New;
                data.Add(item);
            }
            else
            {
                //знаходимо записи, що відрізняються
                foreach (var p in props)
                {
                    var svalue = p.GetValue(item, null);
                    var dvalue = p.GetValue(destination[item.Id], null);
                    //порівнюємо властивості (поля) джерела і призначення
                    if (!Equals(svalue, dvalue))
                    {
                        if (svalue != null && dvalue != null && svalue.Equals(0) && dvalue.Equals(0))
                        {

                        }
                        else
                        {
                            //позначаємо запис
                            item.State = ItemState.ForUpdate;
                            data.Add(item);
                            break;
                        }
                    }
                }
            }
        });

        //пошук відсутніх записів та записів, що відрізняються
        destination.Values.AsParallel().ForAll(item =>
        {
            if (!source.ContainsKey(item.Id))
            {
                //зайві записи
                item.State = ItemState.ForDelete;
                data.Add(item);
            }
        });
        DateTime end = DateTime.Now;
        var duration = (end - start);
    }
    catch (Exception ex)
    {
        ex.Save();
    }
    destinDb.Dispose();
    sourceDb.Dispose();

    if (data.Count == 0)
        return;

    //відкриваємо сесію для синхронізації
    if (type == DbType.MySql)
        destinDb = DbServer.GetMySqlSession();
    else
        destinDb = DbServer.GetMsSqlSession();

    data = data.OrderBy(i => i.Id).ToList();
    using (var trn = destinDb.BeginTransaction())
    {
        try
        {
            foreach (var item in data)
            {
                if (item.State == ItemState.ForDelete)
                    destinDb.Delete(item);
                else if (item.State == ItemState.ForUpdate)
                    destinDb.Update(item);
                else if (item.State == ItemState.New)
                    destinDb.Save(item, item.Id);
            }
            trn.Commit();
        }
        catch (Exception ex)
        {
            trn.Rollback();
            ex.Save();
        }
    }
    destinDb.Dispose();
}

Для синхронізації вибірок, необхідно створити список даних IList<T> data, додати в нього записи, які відрізняються і передати його методу Sync<T>:

/// <summary>
/// Синхронізовує список даних у двох базах
/// </summary>
/// <typeparam name="T">Тип даних</typeparam>
/// <param name="data">Список записів, які відрізняються у базах даних</param>
/// <param name="type">База даних</param>
public static bool Sync<T>(IList<T> data, DbType type = DbType.MySql) where T : IItem
{
    bool result = false;
    if (data == null || data.Count == 0)
        return true;

    ISession db = null;
    if (type == DbType.MySql)
        db = DbServer.GetMySqlSession();
    else
        db = DbServer.GetMsSqlSession();

    using (var trn = db.BeginTransaction())
    {
        try
        {
           foreach (var item in data)
           {
                if (item.State == ItemState.ForDelete)
                {
                    var t = item.GetType();
                    var row = db.Get(t, item.Id);
                    if (row != null)
                       db.Delete(row);
                }
                else if (item.State == ItemState.ForUpdate)
                    db.Update(item);
                else if (item.State == ItemState.New)
                    db.Save(item, item.Id);
                else if (item.State == ItemState.Saved)
                    db.SaveOrUpdate(item);
            }
            trn.Commit();
            result = true;
        }
        catch (Exception ex)
        {
            trn.Rollback();
            ex.Save();
        }
    }
    db.Dispose();
    return result;
}

Наприклад, синхронізувати таблицю спеціальностей:
Sync<ProfessionsRow>(DbType.MySql);

Розпаралелення процесу порівняння трохи додає швидкодії. Порівняння близько 400 тис. записів оцінок студентів відбувалося орієнтовно 6 секунд на ноутбуці Intel Core i3. Це у 1,5 рази швидше послідовного перебору даних.

СУНП "Універсум" повністю переведено на MySql і потреби в синхронізаторі вже немає. Наведений текст програм достаній для локального вирішення задачі синхронізації даних в двох базах.