You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
198 lines
5.5 KiB
198 lines
5.5 KiB
using Confluent.Kafka;
|
|
using Newtonsoft.Json;
|
|
using System;
|
|
using System.Collections.Generic;
|
|
using System.Net;
|
|
using System.Reflection;
|
|
using System.Text;
|
|
using System.Text.RegularExpressions;
|
|
|
|
namespace VOL.Core.KafkaManager
|
|
{
|
|
/// <summary>
|
|
/// 辅助类
|
|
/// </summary>
|
|
public class KafkaHelper
|
|
{
|
|
/// <summary>
|
|
/// 获取当前应用程式名称(仅控制台应用程序和Windows应用程序可用)
|
|
/// </summary>
|
|
/// <returns></returns>
|
|
public static string GetApplicationName()
|
|
{
|
|
try
|
|
{
|
|
return Assembly.GetEntryAssembly().GetName().Name;
|
|
}
|
|
catch
|
|
{
|
|
return "Kafka_Test";
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// 获取服务器名称
|
|
/// </summary>
|
|
/// <returns></returns>
|
|
public static string GetServerName()
|
|
{
|
|
return Dns.GetHostName();
|
|
}
|
|
|
|
/// <summary>
|
|
/// 获取服务器IP
|
|
/// </summary>
|
|
/// <returns></returns>
|
|
public static string GetServerIp()
|
|
{
|
|
IPHostEntry ips = Dns.GetHostEntry(Dns.GetHostName());
|
|
foreach (var ip in ips.AddressList)
|
|
{
|
|
if (Regex.IsMatch(ip.ToString(), @"^10\.((25[0-5]|2[0-4]\d|1\d{2}|\d?\d)\.){2}(25[0-5]|2[0-4]\d|1\d{2}|\d?\d)$"))
|
|
{
|
|
return ip.ToString();
|
|
};
|
|
}
|
|
return "127.0.0.1";
|
|
}
|
|
|
|
/// <summary>
|
|
/// 将c# DateTime时间格式转换为Unix时间戳格式(毫秒级)
|
|
/// </summary>
|
|
/// <returns>long</returns>
|
|
public static long GetTimeStamp()
|
|
{
|
|
DateTime time = DateTime.Now;
|
|
long t = (time.Ticks - 621356256000000000) / 10000;
|
|
return t;
|
|
}
|
|
}
|
|
|
|
#region 实现消息序列化和反序列化
|
|
public class KafkaConverter<T> : ISerializer<T>
|
|
{
|
|
/// <summary>
|
|
/// 序列化数据成字节
|
|
/// </summary>
|
|
/// <param name="data"></param>
|
|
/// <param name="context"></param>
|
|
/// <returns></returns>
|
|
public byte[] Serialize(T data, SerializationContext context)
|
|
{
|
|
var json = JsonConvert.SerializeObject(data);
|
|
return Encoding.UTF8.GetBytes(json);
|
|
}
|
|
}
|
|
|
|
public class KafkaDConverter<T> : IDeserializer<T>
|
|
{
|
|
/// <summary>
|
|
/// 反序列化字节数据成实体数据
|
|
/// </summary>
|
|
/// <param name="data"></param>
|
|
/// <param name="isNull"></param>
|
|
/// <param name="context"></param>
|
|
/// <returns></returns>
|
|
public T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
|
|
{
|
|
if (isNull) return default(T);
|
|
|
|
var json = Encoding.UTF8.GetString(data.ToArray());
|
|
try
|
|
{
|
|
return JsonConvert.DeserializeObject<T>(json);
|
|
}
|
|
catch
|
|
{
|
|
return default(T);
|
|
}
|
|
}
|
|
}
|
|
#endregion
|
|
|
|
#region 日志类
|
|
/// <summary>
|
|
/// 默认日志类 可自行构造使用
|
|
/// </summary>
|
|
public class KafkaLogModel
|
|
{
|
|
/// <summary>
|
|
/// 构造默认日志类(设置默认值 ServerIp,ServerName,TimeStamp,ApplicationVersion)
|
|
/// </summary>
|
|
public KafkaLogModel()
|
|
{
|
|
ServerIp = KafkaHelper.GetServerIp();
|
|
ServerName = KafkaHelper.GetServerName();
|
|
TimeStamp = DateTime.Now;
|
|
ApplicationName = KafkaHelper.GetApplicationName();
|
|
ApplicationVersion = "V1.0.0";
|
|
}
|
|
|
|
/// <summary>
|
|
/// 程式名称(默认获取当前程式名称,Web应用 默认为 ISD_Kafka)
|
|
/// </summary>
|
|
public string ApplicationName { get; set; }
|
|
|
|
/// <summary>
|
|
/// 程式版本(默认为V1.0.0)
|
|
/// </summary>
|
|
public string ApplicationVersion { get; set; }
|
|
|
|
/// <summary>
|
|
/// 发生时间(默认为当前时间)
|
|
/// </summary>
|
|
public DateTime TimeStamp { get; set; }
|
|
|
|
/// <summary>
|
|
/// 开始时间
|
|
/// </summary>
|
|
public DateTime BeginDate { get; set; }
|
|
|
|
/// <summary>
|
|
/// 结束时间
|
|
/// </summary>
|
|
public DateTime EndDate { get; set; }
|
|
|
|
/// <summary>
|
|
/// 服务器IP(默认抓取当前服务器IP)
|
|
/// </summary>
|
|
public string ServerIp { get; set; }
|
|
|
|
/// <summary>
|
|
/// 服务器名称(默认抓取当前服务器名称)
|
|
/// </summary>
|
|
public string ServerName { get; set; }
|
|
|
|
/// <summary>
|
|
/// 客户端IP
|
|
/// </summary>
|
|
public string ClientIp { get; set; }
|
|
|
|
/// <summary>
|
|
/// 模块(页面路径)
|
|
/// </summary>
|
|
public string Module { get; set; }
|
|
|
|
/// <summary>
|
|
/// 操作人
|
|
/// </summary>
|
|
public string Operator { get; set; }
|
|
|
|
/// <summary>
|
|
/// 操作类型 如:Query,Add,Update,Delete,Export等,可自定义
|
|
/// </summary>
|
|
public string OperationType { get; set; }
|
|
|
|
/// <summary>
|
|
/// 操作状态 如:http请求使用200,404,503等,其他操作 1:成功,0失败等 可自定义
|
|
/// </summary>
|
|
public string Status { get; set; }
|
|
|
|
/// <summary>
|
|
/// 其他信息
|
|
/// </summary>
|
|
public string Message { get; set; }
|
|
}
|
|
#endregion
|
|
}
|