开发者

Bulk insert/Update with Petapoco

I'm using the Save() method to insert or update records, but I would like to m开发者_JAVA技巧ake it perform a bulk insert and bulk update with only one database hit. How do I do this?


In my case, I took advantage of the database.Execute() method.

I created a SQL parameter that had the first part of my insert:

var sql = new Sql("insert into myTable(Name, Age, Gender) values");

for (int i = 0; i < pocos.Count ; ++i)
{
   var p = pocos[i];
   sql.Append("(@0, @1, @2)", p.Name, p.Age , p.Gender);
   if(i != pocos.Count -1)
     sql.Append(",");
}

Database.Execute(sql);


I tried two different methods for inserting a large quantity of rows faster than the default Insert (which is pretty slow when you have a lot of rows).

1) Making up a List<T> with the poco's first and then inserting them at once within a loop (and in a transaction):

using (var tr = PetaPocoDb.GetTransaction())
{
    foreach (var record in listOfRecords)
    {
        PetaPocoDb.Insert(record);
    }
    tr.Complete();
}

2) SqlBulkCopy a DataTable:

var bulkCopy = new SqlBulkCopy(connectionString, SqlBulkCopyOptions.TableLock);
bulkCopy.DestinationTableName = "SomeTable";
bulkCopy.WriteToServer(dt);

To get my List <T> to a DataTable I used Marc Gravells Convert generic List/Enumerable to DataTable? function which worked ootb for me (after I rearranged the Poco properties to be in the exact same order as the table fields in the db.)

The SqlBulkCopy was fastest, 50% or so faster than the transactions method in the (quick) perf tests I did with ~1000 rows.

Hth


Insert in one SQL query is much faster.

Here is a customer method for PetaPoco.Database class that adds ability to do a bulk insert of any collection:

public void BulkInsertRecords<T>(IEnumerable<T> collection)
        {
            try
            {
                OpenSharedConnection();
                using (var cmd = CreateCommand(_sharedConnection, ""))
                {
                    var pd = Database.PocoData.ForType(typeof(T));
                    var tableName = EscapeTableName(pd.TableInfo.TableName);
                    string cols = string.Join(", ", (from c in pd.QueryColumns select tableName + "." + EscapeSqlIdentifier(c)).ToArray());
                    var pocoValues = new List<string>();
                    var index = 0;
                    foreach (var poco in collection)
                    {
                        var values = new List<string>();
                        foreach (var i in pd.Columns)
                        {
                            values.Add(string.Format("{0}{1}", _paramPrefix, index++));
                            AddParam(cmd, i.Value.GetValue(poco), _paramPrefix);
                        }
                        pocoValues.Add("(" + string.Join(",", values.ToArray()) + ")");
                    }
                    var sql = string.Format("INSERT INTO {0} ({1}) VALUES {2}", tableName, cols, string.Join(", ", pocoValues));
                    cmd.CommandText = sql;
                    cmd.ExecuteNonQuery();
                }
            }
            finally
            {
                CloseSharedConnection();
            }
        }


Here is the updated verision of Steve Jansen answer that splits in chuncs of maximum 2100 pacos

I commented out the following code as it produces duplicates in the database...

                //using (var reader = cmd.ExecuteReader())
                //{
                //    while (reader.Read())
                //    {
                //        inserted.Add(reader[0]);
                //    }
                //}

Updated Code

    /// <summary>
    /// Performs an SQL Insert against a collection of pocos
    /// </summary>
    /// <param name="pocos">A collection of POCO objects that specifies the column values to be inserted.  Assumes that every POCO is of the same type.</param>
    /// <returns>An array of the auto allocated primary key of the new record, or null for non-auto-increment tables</returns>
    public object BulkInsert(IEnumerable<object> pocos)
    {
        Sql sql;
        IList<PocoColumn> columns = new List<PocoColumn>();
        IList<object> parameters;
        IList<object> inserted;
        PocoData pd;
        Type primaryKeyType;
        object template;
        string commandText;
        string tableName;
        string primaryKeyName;
        bool autoIncrement;

        int maxBulkInsert;

        if (null == pocos)
        {
            return new object[] { };
        }

        template = pocos.First<object>();

        if (null == template)
        {
            return null;
        }

        pd = PocoData.ForType(template.GetType());
        tableName = pd.TableInfo.TableName;
        primaryKeyName = pd.TableInfo.PrimaryKey;
        autoIncrement = pd.TableInfo.AutoIncrement;

        //Calculate the maximum chunk size
        maxBulkInsert = 2100 / pd.Columns.Count;
        IEnumerable<object> pacosToInsert = pocos.Take(maxBulkInsert);
        IEnumerable<object> pacosremaining = pocos.Skip(maxBulkInsert);

        try
        {
            OpenSharedConnection();
            try
            {
                var names = new List<string>();
                var values = new List<string>();
                var index = 0;

                foreach (var i in pd.Columns)
                {
                    // Don't insert result columns
                    if (i.Value.ResultColumn)
                        continue;

                    // Don't insert the primary key (except under oracle where we need bring in the next sequence value)
                    if (autoIncrement && primaryKeyName != null && string.Compare(i.Key, primaryKeyName, true) == 0)
                    {
                        primaryKeyType = i.Value.PropertyInfo.PropertyType;

                        // Setup auto increment expression
                        string autoIncExpression = _dbType.GetAutoIncrementExpression(pd.TableInfo);
                        if (autoIncExpression != null)
                        {
                            names.Add(i.Key);
                            values.Add(autoIncExpression);
                        }
                        continue;
                    }

                    names.Add(_dbType.EscapeSqlIdentifier(i.Key));
                    values.Add(string.Format("{0}{1}", _paramPrefix, index++));
                    columns.Add(i.Value);
                }

                string outputClause = String.Empty;
                if (autoIncrement)
                {
                    outputClause = _dbType.GetInsertOutputClause(primaryKeyName);
                }

                commandText = string.Format("INSERT INTO {0} ({1}){2} VALUES",
                                _dbType.EscapeTableName(tableName),
                                string.Join(",", names.ToArray()),
                                outputClause
                                );

                sql = new Sql(commandText);
                parameters = new List<object>();
                string valuesText = string.Concat("(", string.Join(",", values.ToArray()), ")");
                bool isFirstPoco = true;
                var parameterCounter = 0;

                foreach (object poco in pacosToInsert)
                {
                    parameterCounter++;
                    parameters.Clear();

                    foreach (PocoColumn column in columns)
                    {
                        parameters.Add(column.GetValue(poco));
                    }

                    sql.Append(valuesText, parameters.ToArray<object>());

                    if (isFirstPoco && pocos.Count() > 1)
                    {
                        valuesText = "," + valuesText;
                        isFirstPoco = false;
                    }
                }

                inserted = new List<object>();

                using (var cmd = CreateCommand(_sharedConnection, sql.SQL, sql.Arguments))
                {
                    if (!autoIncrement)
                    {
                        DoPreExecute(cmd);
                        cmd.ExecuteNonQuery();
                        OnExecutedCommand(cmd);

                        PocoColumn pkColumn;
                        if (primaryKeyName != null && pd.Columns.TryGetValue(primaryKeyName, out pkColumn))
                        {
                            foreach (object poco in pocos)
                            {
                                inserted.Add(pkColumn.GetValue(poco));
                            }
                        }

                        return inserted.ToArray<object>();
                    }

                    object id = _dbType.ExecuteInsert(this, cmd, primaryKeyName);

                    if (pacosremaining.Any())
                    {
                        return BulkInsert(pacosremaining);
                    }

                    return id;

                    //using (var reader = cmd.ExecuteReader())
                    //{
                    //    while (reader.Read())
                    //    {
                    //        inserted.Add(reader[0]);
                    //    }
                    //}

                    //object[] primaryKeys = inserted.ToArray<object>();

                    //// Assign the ID back to the primary key property
                    //if (primaryKeyName != null)
                    //{
                    //    PocoColumn pc;
                    //    if (pd.Columns.TryGetValue(primaryKeyName, out pc))
                    //    {
                    //        index = 0;
                    //        foreach (object poco in pocos)
                    //        {
                    //            pc.SetValue(poco, pc.ChangeType(primaryKeys[index]));
                    //            index++;
                    //        }
                    //    }
                    //}

                    //return primaryKeys;
                }
            }
            finally
            {
                CloseSharedConnection();
            }
        }
        catch (Exception x)
        {
            if (OnException(x))
                throw;
            return null;
        }
    }


Below is a BulkInsert method of PetaPoco that expands on taylonr's very clever idea to use the SQL technique of insert multiple rows via INSERT INTO tab(col1, col2) OUTPUT inserted.[ID] VALUES (@0, @1), (@2, 3), (@4, @5), ..., (@n-1, @n).

It also returns the auto-increment (identity) values of inserted records, which I don't believe happens in IvoTops' implementation.

NOTE: SQL Server 2012 (and below) has a limit of 2,100 parameters per query. (This is likely the source of the stack overflow exception referenced by Zelid's comment). You will need to manually split your batches up based on the number of columns that are not decorated as Ignore or Result. For example, a POCO with 21 columns should be sent in batch sizes of 99, or (2100 - 1) / 21. I may refactor this to dynamically split batches based on this limit for SQL Server; however, you will always see the best results by managing the batch size external to this method.

This method showed an approximate 50% gain in execution time over my previous technique of using a shared connection in a single transaction for all inserts.

This is one area where Massive really shines - Massive has a Save(params object[] things) that builds an array of IDbCommands, and executes each one on a shared connection. It works out of the box, and doesn't run into parameter limits.

/// <summary>
/// Performs an SQL Insert against a collection of pocos
/// </summary>
/// <param name="pocos">A collection of POCO objects that specifies the column values to be inserted.  Assumes that every POCO is of the same type.</param>
/// <returns>An array of the auto allocated primary key of the new record, or null for non-auto-increment tables</returns>
/// <remarks>
///     NOTE: As of SQL Server 2012, there is a limit of 2100 parameters per query.  This limitation does not seem to apply on other platforms, so 
///           this method will allow more than 2100 parameters.  See http://msdn.microsoft.com/en-us/library/ms143432.aspx
///     The name of the table, it's primary key and whether it's an auto-allocated primary key are retrieved from the attributes of the first POCO in the collection
/// </remarks>
public object[] BulkInsert(IEnumerable<object> pocos)
{
    Sql sql;
    IList<PocoColumn> columns = new List<PocoColumn>();
    IList<object> parameters;
    IList<object> inserted;
    PocoData pd;
    Type primaryKeyType;
    object template;
    string commandText;
    string tableName;
    string primaryKeyName;
    bool autoIncrement;


    if (null == pocos)
        return new object[] {};

    template = pocos.First<object>();

    if (null == template)
        return null;

    pd = PocoData.ForType(template.GetType());
    tableName = pd.TableInfo.TableName;
    primaryKeyName = pd.TableInfo.PrimaryKey;
    autoIncrement = pd.TableInfo.AutoIncrement;

    try
    {
        OpenSharedConnection();
        try
        {
            var names = new List<string>();
            var values = new List<string>();
            var index = 0;
            foreach (var i in pd.Columns)
            {
                // Don't insert result columns
                if (i.Value.ResultColumn)
                    continue;

                // Don't insert the primary key (except under oracle where we need bring in the next sequence value)
                if (autoIncrement && primaryKeyName != null && string.Compare(i.Key, primaryKeyName, true) == 0)
                {
                    primaryKeyType = i.Value.PropertyInfo.PropertyType;

                    // Setup auto increment expression
                    string autoIncExpression = _dbType.GetAutoIncrementExpression(pd.TableInfo);
                    if (autoIncExpression != null)
                    {
                        names.Add(i.Key);
                        values.Add(autoIncExpression);
                    }
                    continue;
                }

                names.Add(_dbType.EscapeSqlIdentifier(i.Key));
                values.Add(string.Format("{0}{1}", _paramPrefix, index++));
                columns.Add(i.Value);
            }

            string outputClause = String.Empty;
            if (autoIncrement)
            {
                outputClause = _dbType.GetInsertOutputClause(primaryKeyName);
            }

            commandText = string.Format("INSERT INTO {0} ({1}){2} VALUES",
                            _dbType.EscapeTableName(tableName),
                            string.Join(",", names.ToArray()),
                            outputClause
                            );

            sql = new Sql(commandText);
            parameters = new List<object>();
            string valuesText = string.Concat("(", string.Join(",", values.ToArray()), ")");
            bool isFirstPoco = true;

            foreach (object poco in pocos)
            {
                parameters.Clear();
                foreach (PocoColumn column in columns)
                {
                    parameters.Add(column.GetValue(poco));
                }

                sql.Append(valuesText, parameters.ToArray<object>());

                if (isFirstPoco)
                {
                    valuesText = "," + valuesText;
                    isFirstPoco = false;
                }
            }

            inserted = new List<object>();

            using (var cmd = CreateCommand(_sharedConnection, sql.SQL, sql.Arguments))
            {
                if (!autoIncrement)
                {
                    DoPreExecute(cmd);
                    cmd.ExecuteNonQuery();
                    OnExecutedCommand(cmd);

                    PocoColumn pkColumn;
                    if (primaryKeyName != null && pd.Columns.TryGetValue(primaryKeyName, out pkColumn))
                    {
                        foreach (object poco in pocos)
                        {
                            inserted.Add(pkColumn.GetValue(poco));
                        }
                    }

                    return inserted.ToArray<object>();
                }

                // BUG: the following line reportedly causes duplicate inserts; need to confirm
                //object id = _dbType.ExecuteInsert(this, cmd, primaryKeyName);

                using(var reader = cmd.ExecuteReader())
                {
                    while (reader.Read())
                    {
                        inserted.Add(reader[0]);
                    }
                }

                object[] primaryKeys = inserted.ToArray<object>();

                // Assign the ID back to the primary key property
                if (primaryKeyName != null)
                {
                    PocoColumn pc;
                    if (pd.Columns.TryGetValue(primaryKeyName, out pc))
                    {
                        index = 0;
                        foreach(object poco in pocos)
                        {
                            pc.SetValue(poco, pc.ChangeType(primaryKeys[index]));
                            index++;
                        }
                    }
                }

                return primaryKeys;
            }
        }
        finally
        {
            CloseSharedConnection();
        }
    }
    catch (Exception x)
    {
        if (OnException(x))
            throw;
        return null;
    }
}


Here is the code for BulkInsert that you can add to v5.01 PetaPoco.cs

You can paste it somewhere close the regular insert at line 1098

You give it an IEnumerable of Pocos and it will send it to the database

in batches of x together. The code is 90% from the regular insert.

I do not have performance comparison, let me know :)

    /// <summary>
    /// Bulk inserts multiple rows to SQL
    /// </summary>
    /// <param name="tableName">The name of the table to insert into</param>
    /// <param name="primaryKeyName">The name of the primary key column of the table</param>
    /// <param name="autoIncrement">True if the primary key is automatically allocated by the DB</param>
    /// <param name="pocos">The POCO objects that specifies the column values to be inserted</param>
    /// <param name="batchSize">The number of POCOS to be grouped together for each database rounddtrip</param>        
    public void BulkInsert(string tableName, string primaryKeyName, bool autoIncrement, IEnumerable<object> pocos, int batchSize = 25)
    {
        try
        {
            OpenSharedConnection();
            try
            {
                using (var cmd = CreateCommand(_sharedConnection, ""))
                {
                    var pd = PocoData.ForObject(pocos.First(), primaryKeyName);
                    // Create list of columnnames only once
                    var names = new List<string>();
                    foreach (var i in pd.Columns)
                    {
                        // Don't insert result columns
                        if (i.Value.ResultColumn)
                            continue;

                        // Don't insert the primary key (except under oracle where we need bring in the next sequence value)
                        if (autoIncrement && primaryKeyName != null && string.Compare(i.Key, primaryKeyName, true) == 0)
                        {
                            // Setup auto increment expression
                            string autoIncExpression = _dbType.GetAutoIncrementExpression(pd.TableInfo);
                            if (autoIncExpression != null)
                            {
                                names.Add(i.Key);
                            }
                            continue;
                        }
                        names.Add(_dbType.EscapeSqlIdentifier(i.Key));
                    }
                    var namesArray = names.ToArray();

                    var values = new List<string>();
                    int count = 0;
                    do
                    {
                        cmd.CommandText = "";
                        cmd.Parameters.Clear();
                        var index = 0;
                        foreach (var poco in pocos.Skip(count).Take(batchSize))
                        {
                            values.Clear();
                            foreach (var i in pd.Columns)
                            {
                                // Don't insert result columns
                                if (i.Value.ResultColumn) continue;

                                // Don't insert the primary key (except under oracle where we need bring in the next sequence value)
                                if (autoIncrement && primaryKeyName != null && string.Compare(i.Key, primaryKeyName, true) == 0)
                                {
                                    // Setup auto increment expression
                                    string autoIncExpression = _dbType.GetAutoIncrementExpression(pd.TableInfo);
                                    if (autoIncExpression != null)
                                    {
                                        values.Add(autoIncExpression);
                                    }
                                    continue;
                                }

                                values.Add(string.Format("{0}{1}", _paramPrefix, index++));
                                AddParam(cmd, i.Value.GetValue(poco), i.Value.PropertyInfo);
                            }

                            string outputClause = String.Empty;
                            if (autoIncrement)
                            {
                                outputClause = _dbType.GetInsertOutputClause(primaryKeyName);
                            }

                            cmd.CommandText += string.Format("INSERT INTO {0} ({1}){2} VALUES ({3})", _dbType.EscapeTableName(tableName),
                                                             string.Join(",", namesArray), outputClause, string.Join(",", values.ToArray()));
                        }
                        // Are we done?
                        if (cmd.CommandText == "") break;
                        count += batchSize;
                        DoPreExecute(cmd);
                        cmd.ExecuteNonQuery();
                        OnExecutedCommand(cmd);
                    }
                    while (true);

                }
            }
            finally
            {
                CloseSharedConnection();
            }
        }
        catch (Exception x)
        {
            if (OnException(x))
                throw;
        }
    }


    /// <summary>
    /// Performs a SQL Bulk Insert
    /// </summary>
    /// <param name="pocos">The POCO objects that specifies the column values to be inserted</param>        
    /// <param name="batchSize">The number of POCOS to be grouped together for each database rounddtrip</param>        
    public void BulkInsert(IEnumerable<object> pocos, int batchSize = 25)
    {
        if (!pocos.Any()) return;
        var pd = PocoData.ForType(pocos.First().GetType());
        BulkInsert(pd.TableInfo.TableName, pd.TableInfo.PrimaryKey, pd.TableInfo.AutoIncrement, pocos);
    }


And in the same lines if you want BulkUpdate:

public void BulkUpdate<T>(string tableName, string primaryKeyName, IEnumerable<T> pocos, int batchSize = 25)
{
    try
    {
        object primaryKeyValue = null;

        OpenSharedConnection();
        try
        {
            using (var cmd = CreateCommand(_sharedConnection, ""))
            {
                var pd = PocoData.ForObject(pocos.First(), primaryKeyName);

                int count = 0;
                do
                {
                    cmd.CommandText = "";
                    cmd.Parameters.Clear();
                    var index = 0;

                    var cmdText = new StringBuilder();

                    foreach (var poco in pocos.Skip(count).Take(batchSize))
                    {
                        var sb = new StringBuilder();
                        var colIdx = 0;
                        foreach (var i in pd.Columns)
                        {
                            // Don't update the primary key, but grab the value if we don't have it
                            if (string.Compare(i.Key, primaryKeyName, true) == 0)
                            {
                                primaryKeyValue = i.Value.GetValue(poco);
                                continue;
                            }

                            // Dont update result only columns
                            if (i.Value.ResultColumn)
                                continue;

                            // Build the sql
                            if (colIdx > 0)
                                sb.Append(", ");
                            sb.AppendFormat("{0} = {1}{2}", _dbType.EscapeSqlIdentifier(i.Key), _paramPrefix,
                                            index++);

                            // Store the parameter in the command
                            AddParam(cmd, i.Value.GetValue(poco), i.Value.PropertyInfo);
                            colIdx++;
                        }

                        // Find the property info for the primary key
                        PropertyInfo pkpi = null;
                        if (primaryKeyName != null)
                        {
                            pkpi = pd.Columns[primaryKeyName].PropertyInfo;
                        }


                        cmdText.Append(string.Format("UPDATE {0} SET {1} WHERE {2} = {3}{4};\n",
                                                     _dbType.EscapeTableName(tableName), sb.ToString(),
                                                     _dbType.EscapeSqlIdentifier(primaryKeyName), _paramPrefix,
                                                     index++));
                        AddParam(cmd, primaryKeyValue, pkpi);
                    }

                    if (cmdText.Length == 0) break;

                    if (_providerName.IndexOf("oracle", StringComparison.OrdinalIgnoreCase) >= 0)
                    {
                        cmdText.Insert(0, "BEGIN\n");
                        cmdText.Append("\n END;");
                    }

                    DoPreExecute(cmd);

                    cmd.CommandText = cmdText.ToString();
                    count += batchSize;
                    cmd.ExecuteNonQuery();
                    OnExecutedCommand(cmd);

                } while (true);
            }
        }
        finally
        {
            CloseSharedConnection();
        }
    }
    catch (Exception x)
    {
        if (OnException(x))
            throw;
    }
}


Here's a nice 2018 update using FastMember from NuGet:

    private static async Task SqlBulkCopyPocoAsync<T>(PetaPoco.Database db, IEnumerable<T> data)
    {
        var pd = PocoData.ForType(typeof(T), db.DefaultMapper);
        using (var bcp = new SqlBulkCopy(db.ConnectionString))
        using (var reader = ObjectReader.Create(data)) 
        {
            // set up a mapping from the property names to the column names
            var propNames = typeof(T).GetProperties().Where(p => Attribute.IsDefined(p, typeof(ResultColumnAttribute)) == false).Select(propertyInfo => propertyInfo.Name).ToArray();
            foreach (var propName in propNames)
            {
                bcp.ColumnMappings.Add(propName, "[" + pd.GetColumnName(propName) + "]");
            }
            bcp.DestinationTableName = pd.TableInfo.TableName;
            await bcp.WriteToServerAsync(reader).ConfigureAwait(false);
        }
    }


You can just do a foreach on your records.

foreach (var record in records) {
    db.Save(record);
}
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜