using Confluent.Kafka;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Net;
using System.Reflection;
using System.Text;
using System.Text.RegularExpressions;
namespace VOL.Core.KafkaManager
{
///
/// 辅助类
///
public class KafkaHelper
{
///
/// 获取当前应用程式名称(仅控制台应用程序和Windows应用程序可用)
///
///
public static string GetApplicationName()
{
try
{
return Assembly.GetEntryAssembly().GetName().Name;
}
catch
{
return "Kafka_Test";
}
}
///
/// 获取服务器名称
///
///
public static string GetServerName()
{
return Dns.GetHostName();
}
///
/// 获取服务器IP
///
///
public static string GetServerIp()
{
IPHostEntry ips = Dns.GetHostEntry(Dns.GetHostName());
foreach (var ip in ips.AddressList)
{
if (Regex.IsMatch(ip.ToString(), @"^10\.((25[0-5]|2[0-4]\d|1\d{2}|\d?\d)\.){2}(25[0-5]|2[0-4]\d|1\d{2}|\d?\d)$"))
{
return ip.ToString();
};
}
return "127.0.0.1";
}
///
/// 将c# DateTime时间格式转换为Unix时间戳格式(毫秒级)
///
/// long
public static long GetTimeStamp()
{
DateTime time = DateTime.Now;
long t = (time.Ticks - 621356256000000000) / 10000;
return t;
}
}
#region 实现消息序列化和反序列化
public class KafkaConverter : ISerializer
{
///
/// 序列化数据成字节
///
///
///
///
public byte[] Serialize(T data, SerializationContext context)
{
var json = JsonConvert.SerializeObject(data);
return Encoding.UTF8.GetBytes(json);
}
}
public class KafkaDConverter : IDeserializer
{
///
/// 反序列化字节数据成实体数据
///
///
///
///
///
public T Deserialize(ReadOnlySpan data, bool isNull, SerializationContext context)
{
if (isNull) return default(T);
var json = Encoding.UTF8.GetString(data.ToArray());
try
{
return JsonConvert.DeserializeObject(json);
}
catch
{
return default(T);
}
}
}
#endregion
#region 日志类
///
/// 默认日志类 可自行构造使用
///
public class KafkaLogModel
{
///
/// 构造默认日志类(设置默认值 ServerIp,ServerName,TimeStamp,ApplicationVersion)
///
public KafkaLogModel()
{
ServerIp = KafkaHelper.GetServerIp();
ServerName = KafkaHelper.GetServerName();
TimeStamp = DateTime.Now;
ApplicationName = KafkaHelper.GetApplicationName();
ApplicationVersion = "V1.0.0";
}
///
/// 程式名称(默认获取当前程式名称,Web应用 默认为 ISD_Kafka)
///
public string ApplicationName { get; set; }
///
/// 程式版本(默认为V1.0.0)
///
public string ApplicationVersion { get; set; }
///
/// 发生时间(默认为当前时间)
///
public DateTime TimeStamp { get; set; }
///
/// 开始时间
///
public DateTime BeginDate { get; set; }
///
/// 结束时间
///
public DateTime EndDate { get; set; }
///
/// 服务器IP(默认抓取当前服务器IP)
///
public string ServerIp { get; set; }
///
/// 服务器名称(默认抓取当前服务器名称)
///
public string ServerName { get; set; }
///
/// 客户端IP
///
public string ClientIp { get; set; }
///
/// 模块(页面路径)
///
public string Module { get; set; }
///
/// 操作人
///
public string Operator { get; set; }
///
/// 操作类型 如:Query,Add,Update,Delete,Export等,可自定义
///
public string OperationType { get; set; }
///
/// 操作状态 如:http请求使用200,404,503等,其他操作 1:成功,0失败等 可自定义
///
public string Status { get; set; }
///
/// 其他信息
///
public string Message { get; set; }
}
#endregion
}