//using Confluent.Kafka;
//using System;
//using System.Collections.Generic;
//using System.Text;
//using System.Threading.Tasks;
//using VOL.Core.Configuration;
//using VOL.Core.Enums;
//using VOL.Core.KafkaManager.IService;
//using VOL.Core.Services;
//namespace VOL.Core.KafkaManager.Service
//{
// ///
// /// 消费者 (Message.Key的数据类型为string、Message.Value的数据类型为string)
// /// 消费者实现三种消费方式:1.订阅回调模式 2.批量消费模式 3.单笔消费模式
// ///
// /// Message.Key 的数据类型
// /// Message.Value 的数据类型
// public class KafkaConsumer : KafkaConfig, IKafkaConsumer
// {
// ///
// /// Kafka地址(包含端口号)
// ///
// public string Servers
// {
// get
// {
// return ConsumerConfig.BootstrapServers;
// }
// set
// {
// ConsumerConfig.BootstrapServers = value;
// }
// }
// ///
// /// 消费者群组
// ///
// public string GroupId
// {
// get
// {
// return ConsumerConfig.GroupId;
// }
// set
// {
// ConsumerConfig.GroupId = value;
// }
// }
// ///
// /// 自动提交 默认为 false
// ///
// public bool EnableAutoCommit
// {
// get
// {
// return ConsumerConfig.EnableAutoCommit ?? false;
// }
// set
// {
// ConsumerConfig.EnableAutoCommit = value;
// }
// }
// ///
// /// 订阅回调模式-消费(持续订阅)
// ///
// /// 回调函数,若配置为非自动提交(默认为否),则通过回调函数的返回值判断是否提交
// /// 主题
// public void Consume(Func, bool> Func, string Topic)
// {
// Task.Factory.StartNew(() =>
// {
// var builder = new ConsumerBuilder(ConsumerConfig);
// //设置反序列化方式
// builder.SetValueDeserializer(new KafkaDConverter());
// builder.SetErrorHandler((_, e) =>
// {
// Logger.Error(LoggerType.KafkaException, null, null, $"Error:{e.Reason}");
// }).SetStatisticsHandler((_, json) =>
// {
// Console.WriteLine($"-{DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中..");
// }).SetPartitionsAssignedHandler((c, partitions) =>
// {
// string partitionsStr = string.Join(", ", partitions);
// Console.WriteLine($"-分配的kafka分区:{partitionsStr}");
// }).SetPartitionsRevokedHandler((c, partitions) =>
// {
// string partitionsStr = string.Join(", ", partitions);
// Console.WriteLine($"-回收了kafka的分区:{partitionsStr}");
// });
// using var consumer = builder.Build();
// consumer.Subscribe(Topic);
// while (AppSetting.Kafka.IsConsumerSubscribe) //true
// {
// ConsumeResult result = null;
// try
// {
// result = consumer.Consume();
// if (result.IsPartitionEOF) continue;
// if (Func(result))
// {
// if (!(bool)ConsumerConfig.EnableAutoCommit)
// {
// //手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
// consumer.Commit(result);
// }
// }
// }
// catch (ConsumeException ex)
// {
// Logger.Error(LoggerType.KafkaException, $"Topic:{Topic},{ex.Error.Reason}", null, ex.Message + ex.StackTrace);
// }
// catch (Exception ex)
// {
// Logger.Error(LoggerType.KafkaException, $"Topic:{result.Topic}", null, ex.Message + ex.StackTrace);
// }
// }
// });
// }
// ///
// /// 批量订阅回调模式-消费(持续订阅)
// ///
// /// 回调函数,若配置为非自动提交(默认为否),则通过回调函数的返回值判断是否提交
// /// 主题
// public void ConsumeBatch(Func, bool> Func, List Topics)
// {
// Task.Factory.StartNew(() =>
// {
// var builder = new ConsumerBuilder(ConsumerConfig);
// //设置反序列化方式
// builder.SetValueDeserializer(new KafkaDConverter());
// builder.SetErrorHandler((_, e) =>
// {
// Logger.Error(LoggerType.KafkaException, null, null, $"Error:{e.Reason}");
// }).SetStatisticsHandler((_, json) =>
// {
// Console.WriteLine($"-{DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中..");
// }).SetPartitionsAssignedHandler((c, partitions) =>
// {
// string partitionsStr = string.Join(", ", partitions);
// Console.WriteLine($"-分配的kafka分区:{partitionsStr}");
// }).SetPartitionsRevokedHandler((c, partitions) =>
// {
// string partitionsStr = string.Join(", ", partitions);
// Console.WriteLine($"-回收了kafka的分区:{partitionsStr}");
// });
// using var consumer = builder.Build();
// consumer.Subscribe(Topics);
// while (AppSetting.Kafka.IsConsumerSubscribe) //true
// {
// ConsumeResult result = null;
// try
// {
// result = consumer.Consume();
// if (result.IsPartitionEOF) continue;
// if (Func(result))
// {
// if (!(bool)ConsumerConfig.EnableAutoCommit)
// {
// //手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
// consumer.Commit(result);
// }
// }
// }
// catch (ConsumeException ex)
// {
// Logger.Error(LoggerType.KafkaException, $"Topic:{Topics.ToArray()},{ex.Error.Reason}", null, ex.Message + ex.StackTrace);
// }
// catch (Exception ex)
// {
// Logger.Error(LoggerType.KafkaException, $"Topic:{result.Topic}", null, ex.Message + ex.StackTrace);
// }
// }
// });
// }
// ///
// /// 批量消费模式-单次消费(消费出当前Kafka缓存的所有数据,并持续监听 300ms,如无新数据生产,则返回(最多一次消费 100条)
// ///
// /// 主题
// /// 持续监听时间,单位ms 默认值:300ms
// /// 最多单次消费行数 默认值:100行
// /// 待消费数据
// public List> ConsumeOnce(string Topic, int TimeOut = 300, int MaxRow = 100)
// {
// var builder = new ConsumerBuilder(ConsumerConfig);
// //设置反序列化方式
// builder.SetValueDeserializer(new KafkaDConverter());
// using var consumer = builder.Build();
// consumer.Subscribe(Topic);
// List> Res = new List>();
// while (true)
// {
// try
// {
// var result = consumer.Consume(TimeSpan.FromMilliseconds(TimeOut));
// if (result == null) break;
// else
// {
// Res.Add(result);
// //手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
// consumer.Commit();
// }
// if (Res.Count > MaxRow) break;
// }
// catch (Exception ex)
// {
// Logger.Error(LoggerType.KafkaException, $"Topic:{Topic}", null, ex.Message + ex.StackTrace);
// return null;
// }
// }
// return Res;
// }
// ///
// /// 单笔消费模式-单行消费
// ///
// /// 主题
// /// 持续监听时间,单位ms 默认值:300ms
// /// 待消费数据
// public ConsumeResult ConsumeOneRow(string Topic, int TimeOut = 300)
// {
// var builder = new ConsumerBuilder(ConsumerConfig);
// //设置反序列化方式
// builder.SetValueDeserializer(new KafkaDConverter());
// using var consumer = builder.Build();
// consumer.Subscribe(Topic);
// try
// {
// var result = consumer.Consume(TimeSpan.FromMilliseconds(TimeOut));
// if (result != null)
// {
// //手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
// consumer.Commit();
// }
// return result;
// }
// catch (Exception ex)
// {
// Logger.Error(LoggerType.KafkaException, $"Topic:{Topic}", null, ex.Message + ex.StackTrace);
// return null;
// }
// }
// public void Dispose()
// {
// //if (_cache != null)
// // _cache.Dispose();
// GC.SuppressFinalize(this);
// }
// }
//}