using Dapper; using MySqlConnector; using System; using System.Collections.Generic; using System.Data; using System.Data.SqlClient; using System.IO; using System.Linq; using System.Linq.Expressions; using System.Text; using System.Threading.Tasks; using VOL.Core.Const; using VOL.Core.DBManager; using VOL.Core.Enums; using VOL.Core.Extensions; namespace VOL.Core.Dapper { public class SqlDapper : ISqlDapper { private string _connectionString; private int? commandTimeout = null; private DbCurrentType _dbCurrentType; public SqlDapper() { _connectionString = DBServerProvider.GetConnectionString(); } public SqlDapper(string connKeyName, DbCurrentType dbCurrentType) { _dbCurrentType = dbCurrentType; _connectionString = DBServerProvider.GetConnectionString(connKeyName); } public SqlDapper(string connKeyName) { _connectionString = DBServerProvider.GetConnectionString(connKeyName); } private bool _transaction { get; set; } private IDbConnection _transactionConnection = null; /// /// 超时时间(秒) /// /// /// public ISqlDapper SetTimout(int timeout) { this.commandTimeout = timeout; return this; } private T Execute(Func func, bool beginTransaction = false) { if (_transaction|| dbTransaction!=null) { return func(_transactionConnection, dbTransaction); } if (beginTransaction) { return ExecuteTransaction(func); } using (var connection = DBServerProvider.GetDbConnection(_connectionString, _dbCurrentType)) { return func(connection, dbTransaction); } } private T ExecuteTransaction(Func func) { using (_transactionConnection = DBServerProvider.GetDbConnection(_connectionString, _dbCurrentType)) { try { _transactionConnection.Open(); dbTransaction = _transactionConnection.BeginTransaction(); T reslutT = func(_transactionConnection, dbTransaction); dbTransaction.Commit(); return reslutT; } catch (Exception ex) { dbTransaction?.Rollback(); throw new Exception(ex.Message,ex); } finally { dbTransaction?.Dispose(); } } } private async Task ExecuteAsync(Func> funcAsync, bool beginTransaction = false) { if (_transaction|| dbTransaction!=null) { return await funcAsync(_transactionConnection, dbTransaction); } if (beginTransaction) { return await ExecuteTransactionAsync(funcAsync); } using (var connection = DBServerProvider.GetDbConnection(_connectionString, _dbCurrentType)) { T reslutT = await funcAsync(connection, dbTransaction); if (!_transaction && dbTransaction != null) { dbTransaction.Commit(); } return reslutT; } } private async Task ExecuteTransactionAsync(Func> funcAsync) { using (var connection = DBServerProvider.GetDbConnection(_connectionString, _dbCurrentType)) { try { connection.Open(); dbTransaction = connection.BeginTransaction(); T reslutT = await funcAsync(connection, dbTransaction); if (!_transaction && dbTransaction != null) { dbTransaction.Commit(); } return reslutT; } catch (Exception ex) { dbTransaction?.Rollback(); throw new Exception(ex.Message,ex); } } } /// /// 2020.06.15增加Dapper事务处理 /// /// public void BeginTransaction(Func action, Action error) { _transaction = true; using (var connection = DBServerProvider.GetDbConnection(_connectionString, _dbCurrentType)) { try { _transactionConnection = connection; _transactionConnection.Open(); dbTransaction = _transactionConnection.BeginTransaction(); bool result = action(this); if (result) { dbTransaction?.Commit(); } else { dbTransaction?.Rollback(); } } catch (Exception ex) { dbTransaction?.Rollback(); error(ex); } finally { _transaction = false; dbTransaction?.Dispose(); } } } /// /// var p = new object(); // p.Add("@a", 11); //p.Add("@b", dbType: DbType.Int32, direction: ParameterDirection.Output); //p.Add("@c", dbType: DbType.Int32, direction: ParameterDirection.ReturnValue); // /// /// /// /// /// /// public List QueryList(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false) { return Execute((conn, dbTransaction) => { return conn.Query(cmd, param, dbTransaction, commandType: commandType ?? CommandType.Text, commandTimeout: commandTimeout).ToList(); }, beginTransaction); } public async Task> QueryListAsync(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false) { return await ExecuteAsync(async (conn, dbTransaction) => { return await conn.QueryAsync(cmd, param, dbTransaction, commandType: commandType ?? CommandType.Text, commandTimeout: commandTimeout); }, beginTransaction); } public async Task QueryFirstAsync(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false) where T : class { return await ExecuteAsync(async (conn, dbTransaction) => { return await conn.QueryFirstOrDefaultAsync(cmd, param, dbTransaction, commandType: commandType ?? CommandType.Text, commandTimeout: commandTimeout); }, beginTransaction); } public T QueryFirst(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false) where T : class { return Execute((conn, dbTransaction) => { return conn.QueryFirstOrDefault(cmd, param, dbTransaction, commandType: commandType ?? CommandType.Text, commandTimeout: commandTimeout); }, beginTransaction); } public async Task QueryDynamicFirstAsync(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false) { return await ExecuteAsync(async (conn, dbTransaction) => { return await conn.QueryFirstOrDefaultAsync(cmd, param, dbTransaction, commandType: commandType ?? CommandType.Text, commandTimeout: commandTimeout); }, beginTransaction); } public dynamic QueryDynamicFirst(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false) { return Execute((conn, dbTransaction) => { return conn.QueryFirstOrDefault(cmd, param, dbTransaction, commandType: commandType ?? CommandType.Text, commandTimeout: commandTimeout); }, beginTransaction); } public async Task QueryDynamicListAsync(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false) { return await ExecuteAsync(async (conn, dbTransaction) => { return await conn.QueryAsync(cmd, param, dbTransaction, commandType: commandType ?? CommandType.Text, commandTimeout: commandTimeout); }, beginTransaction); } public List QueryDynamicList(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false) { return Execute((conn, dbTransaction) => { return conn.Query(cmd, param, dbTransaction, commandType: commandType ?? CommandType.Text, commandTimeout: commandTimeout).ToList(); }, beginTransaction); } public async Task ExecuteScalarAsync(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false) { return await ExecuteAsync(async (conn, dbTransaction) => { return await conn.ExecuteScalarAsync(cmd, param, dbTransaction, commandType: commandType ?? CommandType.Text, commandTimeout: commandTimeout); }, beginTransaction); } public object ExecuteScalar(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false) { return Execute((conn, dbTransaction) => { return conn.ExecuteScalar(cmd, param, dbTransaction, commandType: commandType ?? CommandType.Text, commandTimeout: commandTimeout); }, beginTransaction); } public async Task ExcuteNonQueryAsync(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false) { return await ExecuteAsync(async (conn, dbTransaction) => { return await conn.ExecuteAsync(cmd, param, dbTransaction, commandType: commandType ?? CommandType.Text, commandTimeout: commandTimeout); }, beginTransaction); } public int ExcuteNonQuery(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false) { return Execute((conn, dbTransaction) => { return conn.Execute(cmd, param, dbTransaction, commandType: commandType ?? CommandType.Text, commandTimeout: commandTimeout); }, beginTransaction); } //public IDataReader ExecuteReader(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false) //{ // return Execute((conn, dbTransaction) => // { // return conn.ExecuteReader(cmd, param, dbTransaction, commandType: commandType ?? CommandType.Text, commandTimeout: commandTimeout); // }, beginTransaction); //} public SqlMapper.GridReader QueryMultiple(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false) { return Execute((conn, dbTransaction) => { return conn.QueryMultiple(cmd, param, dbTransaction, commandType: commandType ?? CommandType.Text, commandTimeout: commandTimeout); }, beginTransaction); } public async Task<(IEnumerable, IEnumerable)> QueryMultipleAsync(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false) { return await ExecuteAsync(async (conn, dbTransaction) => { using (SqlMapper.GridReader reader = await conn.QueryMultipleAsync(cmd, param, dbTransaction, commandType: commandType ?? CommandType.Text, commandTimeout: commandTimeout)) { return (await reader.ReadAsync(), await reader.ReadAsync()); } }, beginTransaction); } /// /// 获取output值 param.Get("@b"); /// /// /// /// /// /// /// public (List, List) QueryMultiple(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false) { return Execute((conn, dbTransaction) => { using (SqlMapper.GridReader reader = conn.QueryMultiple(cmd, param, dbTransaction, commandType: commandType ?? CommandType.Text, commandTimeout: commandTimeout)) { return (reader.Read().ToList(), reader.Read().ToList()); } }, beginTransaction); } public async Task<(IEnumerable, IEnumerable)> QueryDynamicMultipleAsync(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false) { return await ExecuteAsync(async (conn, dbTransaction) => { using (SqlMapper.GridReader reader = await conn.QueryMultipleAsync(cmd, param, dbTransaction, commandType: commandType ?? CommandType.Text, commandTimeout: commandTimeout)) { return (await reader.ReadAsync(), await reader.ReadAsync()); } }, beginTransaction); } public (List, List) QueryDynamicMultiple(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false) { return Execute((conn, dbTransaction) => { using (SqlMapper.GridReader reader = conn.QueryMultiple(cmd, param, dbTransaction, commandType: commandType ?? CommandType.Text, commandTimeout: commandTimeout)) { return (reader.Read().ToList(), reader.Read().ToList()); } }, beginTransaction); } public async Task<(IEnumerable, IEnumerable, IEnumerable)> QueryMultipleAsync(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false) { return await ExecuteAsync(async (conn, dbTransaction) => { using (SqlMapper.GridReader reader = await conn.QueryMultipleAsync(cmd, param, dbTransaction, commandType: commandType ?? CommandType.Text, commandTimeout: commandTimeout)) { return (await reader.ReadAsync(), await reader.ReadAsync(), await reader.ReadAsync()); } }, beginTransaction); } public (List, List, List) QueryMultiple(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false) { return Execute((conn, dbTransaction) => { using (SqlMapper.GridReader reader = conn.QueryMultiple(cmd, param, dbTransaction, commandType: commandType ?? CommandType.Text, commandTimeout: commandTimeout)) { return (reader.Read().ToList(), reader.Read().ToList(), reader.Read().ToList()); } }, beginTransaction); } public async Task<(IEnumerable, IEnumerable)> QueryDynamicMultipleAsync2(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false) { return await ExecuteAsync(async (conn, dbTransaction) => { using (SqlMapper.GridReader reader = await conn.QueryMultipleAsync(cmd, param, dbTransaction, commandType: commandType ?? CommandType.Text, commandTimeout: commandTimeout)) { return ( await reader.ReadAsync(), await reader.ReadAsync() ); } }, beginTransaction); } public (List, List) QueryDynamicMultiple2(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false) { return Execute((conn, dbTransaction) => { using (SqlMapper.GridReader reader = conn.QueryMultiple(cmd, param, dbTransaction, commandType: commandType ?? CommandType.Text, commandTimeout: commandTimeout)) { return ( reader.Read().ToList(), reader.Read().ToList() ); } }, beginTransaction); } public async Task<(IEnumerable, IEnumerable, IEnumerable)> QueryDynamicMultipleAsync3(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false) { return await ExecuteAsync(async (conn, dbTransaction) => { using (SqlMapper.GridReader reader = await conn.QueryMultipleAsync(cmd, param, dbTransaction, commandType: commandType ?? CommandType.Text, commandTimeout: commandTimeout)) { return ( await reader.ReadAsync(), await reader.ReadAsync(), await reader.ReadAsync() ); } }, beginTransaction); } public (List, List, List) QueryDynamicMultiple3(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false) { return Execute((conn, dbTransaction) => { using (SqlMapper.GridReader reader = conn.QueryMultiple(cmd, param, dbTransaction, commandType: commandType ?? CommandType.Text, commandTimeout: commandTimeout)) { return (reader.Read().ToList(), reader.Read().ToList(), reader.Read().ToList() ); } }, beginTransaction); } public async Task<(IEnumerable, IEnumerable, IEnumerable, IEnumerable, IEnumerable)> QueryDynamicMultipleAsync5(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false) { return await ExecuteAsync(async (conn, dbTransaction) => { using (SqlMapper.GridReader reader = await conn.QueryMultipleAsync(cmd, param, dbTransaction, commandType: commandType ?? CommandType.Text, commandTimeout: commandTimeout)) { return ( await reader.ReadAsync(), await reader.ReadAsync(), await reader.ReadAsync(), await reader.ReadAsync(), await reader.ReadAsync() ); } }, beginTransaction); } public (List, List, List, List, List) QueryDynamicMultiple5(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false) { return Execute((conn, dbTransaction) => { using (SqlMapper.GridReader reader = conn.QueryMultiple(cmd, param, dbTransaction, commandType: commandType ?? CommandType.Text, commandTimeout: commandTimeout)) { return (reader.Read().ToList(), reader.Read().ToList(), reader.Read().ToList(), reader.Read().ToList(), reader.Read().ToList() ); } }, beginTransaction); } public async Task<(IEnumerable, IEnumerable, IEnumerable, IEnumerable)> QueryMultipleAsync(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false) { return await ExecuteAsync(async (conn, dbTransaction) => { using (SqlMapper.GridReader reader = await conn.QueryMultipleAsync(cmd, param, dbTransaction, commandType: commandType ?? CommandType.Text, commandTimeout: commandTimeout)) { return (await reader.ReadAsync(), await reader.ReadAsync(), await reader.ReadAsync(), await reader.ReadAsync() ); } }, beginTransaction); } public (List, List, List, List) QueryMultiple(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false) { return Execute((conn, dbTransaction) => { using (SqlMapper.GridReader reader = conn.QueryMultiple(cmd, param, dbTransaction, commandType: commandType ?? CommandType.Text, commandTimeout: commandTimeout)) { return (reader.Read().ToList(), reader.Read().ToList(), reader.Read().ToList(), reader.Read().ToList() ); } }, beginTransaction); } public async Task<(IEnumerable, IEnumerable, IEnumerable, IEnumerable, IEnumerable)> QueryMultipleAsync(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false) { return await ExecuteAsync(async (conn, dbTransaction) => { using (SqlMapper.GridReader reader = await conn.QueryMultipleAsync(cmd, param, dbTransaction, commandType: commandType ?? CommandType.Text, commandTimeout: commandTimeout)) { return (await reader.ReadAsync(), await reader.ReadAsync(), await reader.ReadAsync(), await reader.ReadAsync(), await reader.ReadAsync() ); } }, beginTransaction); } public (List, List, List, List, List) QueryMultiple(string cmd, object param, CommandType? commandType = null, bool beginTransaction = false) { return Execute((conn, dbTransaction) => { using (SqlMapper.GridReader reader = conn.QueryMultiple(cmd, param, dbTransaction, commandType: commandType ?? CommandType.Text, commandTimeout: commandTimeout)) { return (reader.Read().ToList(), reader.Read().ToList(), reader.Read().ToList(), reader.Read().ToList(), reader.Read().ToList() ); } }, beginTransaction); } IDbTransaction dbTransaction = null; /// /// /// /// /// /// 指定插入的字段 /// 是否开启事务 /// public int Add(T entity, Expression> addFileds = null, bool beginTransaction = false) { return AddRange(new T[] { entity }, addFileds, beginTransaction); } /// /// /// /// /// /// 指定插入的字段 /// 是否开启事务 /// public int AddRange(IEnumerable entities, Expression> addFileds = null, bool beginTransaction = true) { Type entityType = typeof(T); var key = entityType.GetKeyProperty(); if (key == null) { throw new Exception("实体必须包括主键才能批量更新"); } string[] columns; //指定插入的字段 if (addFileds != null) { columns = addFileds.GetExpressionToArray(); } else { var properties = entityType.GetGenericProperties(); if (key.PropertyType != typeof(Guid)) { properties = properties.Where(x => x.Name != key.Name).ToArray(); } columns = properties.Select(x => x.Name).ToArray(); } string sql = null; if (DBType.Name == DbCurrentType.MySql.ToString()) { //mysql批量写入待优化 sql = $"insert into {entityType.GetEntityTableName()}({string.Join(",", columns)})" + $"values(@{string.Join(",@", columns)});"; } else if (DBType.Name == DbCurrentType.PgSql.ToString()) { //todo pgsql批量写入 待检查是否正确 sql = $"insert into {entityType.GetEntityTableName()}({"\"" + string.Join("\",\"", columns) + "\""})" + $"values(@{string.Join(",@", columns)});"; } else { //sqlserver通过临时表批量写入 sql = $"insert into {entityType.GetEntityTableName()}({string.Join(",", columns)})" + $"select {string.Join(",", columns)} from {EntityToSqlTempName.TempInsert};"; //2020.11.21修复sqlserver批量写入主键类型判断错误 sql = entities.GetEntitySql(key.PropertyType == typeof(Guid), sql, null, addFileds, null); } return Execute((conn, dbTransaction) => { //todo pgsql待实现 return conn.Execute(sql, (DBType.Name == DbCurrentType.MySql.ToString() || DBType.Name == DbCurrentType.PgSql.ToString()) ? entities.ToList() : null, dbTransaction); }, beginTransaction); } /// /// sqlserver使用的临时表参数化批量更新,mysql批量更新待发开 /// /// /// 实体必须带主键 /// 指定更新的字段x=new {x.a,x.b} /// 是否开启事务 /// public int Update(T entity, Expression> updateFileds = null, bool beginTransaction = false) { return UpdateRange(new T[] { entity }, updateFileds, beginTransaction); } /// ///(根据主键批量更新实体) sqlserver使用的临时表参数化批量更新,mysql待优化 /// /// /// 实体必须带主键 /// 批定更新字段 /// /// public int UpdateRange(IEnumerable entities, Expression> updateFileds = null, bool beginTransaction = false) { Type entityType = typeof(T); var key = entityType.GetKeyProperty(); if (key == null) { throw new Exception("实体必须包括主键才能批量更新"); } var properties = entityType.GetGenericProperties() .Where(x => x.Name != key.Name); if (updateFileds != null) { properties = properties.Where(x => updateFileds.GetExpressionToArray().Contains(x.Name)); } if (DBType.Name == DbCurrentType.MySql.ToString()) { List paramsList = new List(); foreach (var item in properties) { paramsList.Add(item.Name + "=@" + item.Name); } string sqltext = $@"UPDATE { entityType.GetEntityTableName()} SET {string.Join(",", paramsList)} WHERE {entityType.GetKeyName()} = @{entityType.GetKeyName()} ;"; return ExcuteNonQuery(sqltext, entities, CommandType.Text, beginTransaction); // throw new Exception("mysql批量更新未实现"); } string fileds = string.Join(",", properties.Select(x => $" a.{x.Name}=b.{x.Name}").ToArray()); string sql = $"update a set {fileds} from {entityType.GetEntityTableName()} as a inner join {EntityToSqlTempName.TempInsert.ToString()} as b on a.{key.Name}=b.{key.Name}"; sql = entities.ToList().GetEntitySql(true, sql, null, updateFileds, null); return ExcuteNonQuery(sql, null, CommandType.Text, beginTransaction); } /// /// 使用key批量删除 /// 调用方式: /// List keys = new List(); /// DBServerProvider.SqlDapper.DelWithKey(keys); /// /// /// /// public int DelWithKey(IEnumerable keys) { Type entityType = typeof(T); var keyProperty = entityType.GetKeyProperty(); string sql = $"DELETE FROM {entityType.GetEntityTableName() } where {keyProperty.Name} in @keys "; return ExcuteNonQuery(sql, new { keys }).GetInt(); } /// /// 通过Bulk批量插入 /// /// /// /// /// /// private int MSSqlBulkInsert(DataTable table, string tableName, SqlBulkCopyOptions sqlBulkCopyOptions = SqlBulkCopyOptions.UseInternalTransaction, string dbKeyName = null) { using (var Connection = DBServerProvider.GetDbConnection(_connectionString, _dbCurrentType)) { if (!string.IsNullOrEmpty(dbKeyName)) { Connection.ConnectionString = DBServerProvider.GetConnectionString(dbKeyName); } using (SqlBulkCopy sqlBulkCopy = new SqlBulkCopy(Connection.ConnectionString, sqlBulkCopyOptions)) { sqlBulkCopy.DestinationTableName = tableName; sqlBulkCopy.BatchSize = table.Rows.Count; for (int i = 0; i < table.Columns.Count; i++) { sqlBulkCopy.ColumnMappings.Add(table.Columns[i].ColumnName, table.Columns[i].ColumnName); } sqlBulkCopy.WriteToServer(table); return table.Rows.Count; } } } public int BulkInsert(List entities, string tableName = null, Expression> columns = null, SqlBulkCopyOptions? sqlBulkCopyOptions = null) { DataTable table = entities.ToDataTable(columns, false); return BulkInsert(table, tableName ?? typeof(T).GetEntityTableName(), sqlBulkCopyOptions); } public int BulkInsert(DataTable table, string tableName, SqlBulkCopyOptions? sqlBulkCopyOptions = null, string fileName = null, string tmpPath = null) { if (!string.IsNullOrEmpty(tmpPath)) { tmpPath = tmpPath.ReplacePath(); } if (DBType.Name == "MySql") { return MySqlBulkInsert(table, tableName, fileName, tmpPath); } if (DBType.Name == "PgSql") { PGSqlBulkInsert(table, tableName); return table.Rows.Count; } return MSSqlBulkInsert(table, tableName, sqlBulkCopyOptions ?? SqlBulkCopyOptions.KeepIdentity); } /// ///大批量数据插入,返回成功插入行数 //// /// /// 数据库连接字符串 /// 数据表 /// 返回成功插入行数 private int MySqlBulkInsert(DataTable table, string tableName, string fileName = null, string tmpPath = null) { if (table.Rows.Count == 0) return 0; // tmpPath = tmpPath ?? FileHelper.GetCurrentDownLoadPath(); int insertCount = 0; string csv = DataTableToCsv(table); string text = $"当前行:{table.Rows.Count}"; MemoryStream stream = null; try { using (var Connection = DBServerProvider.GetDbConnection(_connectionString, _dbCurrentType)) { if (Connection.State == ConnectionState.Closed) { Connection.Open(); } using (IDbTransaction tran = Connection.BeginTransaction()) { MySqlBulkLoader bulk = new MySqlBulkLoader(Connection as MySqlConnection) { LineTerminator = "\n", TableName = tableName, CharacterSet = "UTF8", FieldQuotationCharacter = '"', FieldQuotationOptional = true }; var array = Encoding.UTF8.GetBytes(csv); using (stream = new MemoryStream(array)) { stream = new MemoryStream(array); bulk.SourceStream = stream; //File.OpenRead(fileName); bulk.Columns.AddRange(table.Columns.Cast().Select(colum => colum.ColumnName).ToList()); insertCount = bulk.Load(); tran.Commit(); } } } } catch (Exception ex) { if (ex.Message.Contains("local data is disabled")) { try { DBServerProvider.SqlDapper.ExcuteNonQuery("set global local_infile = 'ON';", null); } catch (Exception e) { Console.WriteLine($"开启mysql日志写入异常:{e.Message}"); } } throw new Exception(ex.Message, ex.InnerException); } return insertCount; } /// ///将DataTable转换为标准的CSV /// /// 数据表 /// 返回标准的CSV private string DataTableToCsv(DataTable table) { //以半角逗号(即,)作分隔符,列为空也要表达其存在。 //列内容如存在半角逗号(即,)则用半角引号(即"")将该字段值包含起来。 //列内容如存在半角引号(即")则应替换成半角双引号("")转义,并用半角引号(即"")将该字段值包含起来。 StringBuilder sb = new StringBuilder(); DataColumn colum; Type typeString = typeof(string); Type typeDate = typeof(DateTime); foreach (DataRow row in table.Rows) { for (int i = 0; i < table.Columns.Count; i++) { colum = table.Columns[i]; if (i != 0) sb.Append("\t"); if (colum.DataType == typeString) { var data = $"\"{row[colum].ToString().Replace("\"", "\"\"")}\""; sb.Append(data); } else if (colum.DataType == typeDate) { //centos系统里把datatable里的日期转换成了10/18/18 3:26:15 PM格式 bool b = DateTime.TryParse(row[colum].ToString(), out DateTime dt); sb.Append(b ? dt.ToString("yyyy-MM-dd HH:mm:ss") : ""); } else sb.Append(row[colum].ToString()); } sb.Append("\n"); } return sb.ToString(); } /// /// 2020.08.07增加PGSQL批量写入 /// /// /// private void PGSqlBulkInsert(DataTable table, string tableName) { List columns = new List(); for (int i = 0; i < table.Columns.Count; i++) { columns.Add("\"" + table.Columns[i].ColumnName + "\""); } string copySql = $"copy \"public\".\"{tableName}\"({string.Join(',', columns)}) FROM STDIN (FORMAT BINARY)"; using (var conn = new Npgsql.NpgsqlConnection(_connectionString)) { conn.Open(); using (var writer = conn.BeginBinaryImport(copySql)) { foreach (DataRow row in table.Rows) { writer.StartRow(); for (int i = 0; i < table.Columns.Count; i++) { writer.Write(row[i]); } } writer.Complete(); } } } public DataTable QueryDataTable(string sql, object dbParameter, CommandType commandType = CommandType.Text) { return Execute((conn, dbTransaction) => { using var dataReader = conn.ExecuteReader(sql, dbParameter, dbTransaction, commandType: commandType, commandTimeout: commandTimeout); DataTable datatable = new DataTable(); for (int i = 0; i < dataReader.FieldCount; i++) { DataColumn myDataColumn = new DataColumn(); myDataColumn.ColumnName = dataReader.GetName(i); datatable.Columns.Add(myDataColumn); } while (dataReader.Read()) { DataRow myDataRow = datatable.NewRow(); for (int i = 0; i < dataReader.FieldCount; i++) { try { myDataRow[i] = dataReader[i].ToString(); } catch (Exception ex) { Console.WriteLine(ex.Message); } } datatable.Rows.Add(myDataRow); myDataRow = null; } return datatable; }, false); } /// /// 开启事务 /// /// public ISqlDapper BeginTrans() { _transaction = true; _transactionConnection = DBServerProvider.GetDbConnection(_connectionString, _dbCurrentType); _transactionConnection.Open(); dbTransaction = _transactionConnection.BeginTransaction(); return this; } /// /// 提交 /// public void Commit() { try { _transaction = false; dbTransaction.Commit(); } catch (Exception ex) { throw new Exception(ex.Message, ex); } finally { _transactionConnection?.Dispose(); dbTransaction?.Dispose(); } } /// /// 回滚 /// public void Rollback() { try { _transaction = false; dbTransaction?.Rollback(); } catch (Exception ex) { throw new Exception(ex.Message,ex); } finally { _transactionConnection?.Dispose(); dbTransaction?.Dispose(); } } } }