using Confluent.Kafka; using Newtonsoft.Json; using System; using System.Collections.Generic; using System.Net; using System.Reflection; using System.Text; using System.Text.RegularExpressions; namespace VOL.Core.KafkaManager { /// /// 辅助类 /// public class KafkaHelper { /// /// 获取当前应用程式名称(仅控制台应用程序和Windows应用程序可用) /// /// public static string GetApplicationName() { try { return Assembly.GetEntryAssembly().GetName().Name; } catch { return "Kafka_Test"; } } /// /// 获取服务器名称 /// /// public static string GetServerName() { return Dns.GetHostName(); } /// /// 获取服务器IP /// /// public static string GetServerIp() { IPHostEntry ips = Dns.GetHostEntry(Dns.GetHostName()); foreach (var ip in ips.AddressList) { if (Regex.IsMatch(ip.ToString(), @"^10\.((25[0-5]|2[0-4]\d|1\d{2}|\d?\d)\.){2}(25[0-5]|2[0-4]\d|1\d{2}|\d?\d)$")) { return ip.ToString(); }; } return "127.0.0.1"; } /// /// 将c# DateTime时间格式转换为Unix时间戳格式(毫秒级) /// /// long public static long GetTimeStamp() { DateTime time = DateTime.Now; long t = (time.Ticks - 621356256000000000) / 10000; return t; } } #region 实现消息序列化和反序列化 public class KafkaConverter : ISerializer { /// /// 序列化数据成字节 /// /// /// /// public byte[] Serialize(T data, SerializationContext context) { var json = JsonConvert.SerializeObject(data); return Encoding.UTF8.GetBytes(json); } } public class KafkaDConverter : IDeserializer { /// /// 反序列化字节数据成实体数据 /// /// /// /// /// public T Deserialize(ReadOnlySpan data, bool isNull, SerializationContext context) { if (isNull) return default(T); var json = Encoding.UTF8.GetString(data.ToArray()); try { return JsonConvert.DeserializeObject(json); } catch { return default(T); } } } #endregion #region 日志类 /// /// 默认日志类 可自行构造使用 /// public class KafkaLogModel { /// /// 构造默认日志类(设置默认值 ServerIp,ServerName,TimeStamp,ApplicationVersion) /// public KafkaLogModel() { ServerIp = KafkaHelper.GetServerIp(); ServerName = KafkaHelper.GetServerName(); TimeStamp = DateTime.Now; ApplicationName = KafkaHelper.GetApplicationName(); ApplicationVersion = "V1.0.0"; } /// /// 程式名称(默认获取当前程式名称,Web应用 默认为 ISD_Kafka) /// public string ApplicationName { get; set; } /// /// 程式版本(默认为V1.0.0) /// public string ApplicationVersion { get; set; } /// /// 发生时间(默认为当前时间) /// public DateTime TimeStamp { get; set; } /// /// 开始时间 /// public DateTime BeginDate { get; set; } /// /// 结束时间 /// public DateTime EndDate { get; set; } /// /// 服务器IP(默认抓取当前服务器IP) /// public string ServerIp { get; set; } /// /// 服务器名称(默认抓取当前服务器名称) /// public string ServerName { get; set; } /// /// 客户端IP /// public string ClientIp { get; set; } /// /// 模块(页面路径) /// public string Module { get; set; } /// /// 操作人 /// public string Operator { get; set; } /// /// 操作类型 如:Query,Add,Update,Delete,Export等,可自定义 /// public string OperationType { get; set; } /// /// 操作状态 如:http请求使用200,404,503等,其他操作 1:成功,0失败等 可自定义 /// public string Status { get; set; } /// /// 其他信息 /// public string Message { get; set; } } #endregion }