You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

256 lines
11 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

//using Confluent.Kafka;
//using System;
//using System.Collections.Generic;
//using System.Text;
//using System.Threading.Tasks;
//using VOL.Core.Configuration;
//using VOL.Core.Enums;
//using VOL.Core.KafkaManager.IService;
//using VOL.Core.Services;
//namespace VOL.Core.KafkaManager.Service
//{
// /// <summary>
// /// 消费者 (Message.Key的数据类型为string、Message.Value的数据类型为string
// /// 消费者实现三种消费方式1.订阅回调模式 2.批量消费模式 3.单笔消费模式
// /// </summary>
// /// <typeparam name="TKey">Message.Key 的数据类型</typeparam>
// /// <typeparam name="TValue">Message.Value 的数据类型</typeparam>
// public class KafkaConsumer<TKey, TValue> : KafkaConfig, IKafkaConsumer<TKey, TValue>
// {
// /// <summary>
// /// Kafka地址(包含端口号)
// /// </summary>
// public string Servers
// {
// get
// {
// return ConsumerConfig.BootstrapServers;
// }
// set
// {
// ConsumerConfig.BootstrapServers = value;
// }
// }
// /// <summary>
// /// 消费者群组
// /// </summary>
// public string GroupId
// {
// get
// {
// return ConsumerConfig.GroupId;
// }
// set
// {
// ConsumerConfig.GroupId = value;
// }
// }
// /// <summary>
// /// 自动提交 默认为 false
// /// </summary>
// public bool EnableAutoCommit
// {
// get
// {
// return ConsumerConfig.EnableAutoCommit ?? false;
// }
// set
// {
// ConsumerConfig.EnableAutoCommit = value;
// }
// }
// /// <summary>
// /// 订阅回调模式-消费(持续订阅)
// /// </summary>
// /// <param name="Func">回调函数,若配置为非自动提交(默认为否),则通过回调函数的返回值判断是否提交</param>
// /// <param name="Topic">主题</param>
// public void Consume(Func<ConsumeResult<TKey, TValue>, bool> Func, string Topic)
// {
// Task.Factory.StartNew(() =>
// {
// var builder = new ConsumerBuilder<TKey, TValue>(ConsumerConfig);
// //设置反序列化方式
// builder.SetValueDeserializer(new KafkaDConverter<TValue>());
// builder.SetErrorHandler((_, e) =>
// {
// Logger.Error(LoggerType.KafkaException, null, null, $"Error:{e.Reason}");
// }).SetStatisticsHandler((_, json) =>
// {
// Console.WriteLine($"-{DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中..");
// }).SetPartitionsAssignedHandler((c, partitions) =>
// {
// string partitionsStr = string.Join(", ", partitions);
// Console.WriteLine($"-分配的kafka分区:{partitionsStr}");
// }).SetPartitionsRevokedHandler((c, partitions) =>
// {
// string partitionsStr = string.Join(", ", partitions);
// Console.WriteLine($"-回收了kafka的分区:{partitionsStr}");
// });
// using var consumer = builder.Build();
// consumer.Subscribe(Topic);
// while (AppSetting.Kafka.IsConsumerSubscribe) //true
// {
// ConsumeResult<TKey, TValue> result = null;
// try
// {
// result = consumer.Consume();
// if (result.IsPartitionEOF) continue;
// if (Func(result))
// {
// if (!(bool)ConsumerConfig.EnableAutoCommit)
// {
// //手动提交如果上面的EnableAutoCommit=true表示自动提交则无需调用Commit方法
// consumer.Commit(result);
// }
// }
// }
// catch (ConsumeException ex)
// {
// Logger.Error(LoggerType.KafkaException, $"Topic:{Topic},{ex.Error.Reason}", null, ex.Message + ex.StackTrace);
// }
// catch (Exception ex)
// {
// Logger.Error(LoggerType.KafkaException, $"Topic:{result.Topic}", null, ex.Message + ex.StackTrace);
// }
// }
// });
// }
// /// <summary>
// /// 批量订阅回调模式-消费(持续订阅)
// /// </summary>
// /// <param name="Func">回调函数,若配置为非自动提交(默认为否),则通过回调函数的返回值判断是否提交</param>
// /// <param name="Topic">主题</param>
// public void ConsumeBatch(Func<ConsumeResult<TKey, TValue>, bool> Func, List<string> Topics)
// {
// Task.Factory.StartNew(() =>
// {
// var builder = new ConsumerBuilder<TKey, TValue>(ConsumerConfig);
// //设置反序列化方式
// builder.SetValueDeserializer(new KafkaDConverter<TValue>());
// builder.SetErrorHandler((_, e) =>
// {
// Logger.Error(LoggerType.KafkaException, null, null, $"Error:{e.Reason}");
// }).SetStatisticsHandler((_, json) =>
// {
// Console.WriteLine($"-{DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中..");
// }).SetPartitionsAssignedHandler((c, partitions) =>
// {
// string partitionsStr = string.Join(", ", partitions);
// Console.WriteLine($"-分配的kafka分区:{partitionsStr}");
// }).SetPartitionsRevokedHandler((c, partitions) =>
// {
// string partitionsStr = string.Join(", ", partitions);
// Console.WriteLine($"-回收了kafka的分区:{partitionsStr}");
// });
// using var consumer = builder.Build();
// consumer.Subscribe(Topics);
// while (AppSetting.Kafka.IsConsumerSubscribe) //true
// {
// ConsumeResult<TKey, TValue> result = null;
// try
// {
// result = consumer.Consume();
// if (result.IsPartitionEOF) continue;
// if (Func(result))
// {
// if (!(bool)ConsumerConfig.EnableAutoCommit)
// {
// //手动提交如果上面的EnableAutoCommit=true表示自动提交则无需调用Commit方法
// consumer.Commit(result);
// }
// }
// }
// catch (ConsumeException ex)
// {
// Logger.Error(LoggerType.KafkaException, $"Topic:{Topics.ToArray()},{ex.Error.Reason}", null, ex.Message + ex.StackTrace);
// }
// catch (Exception ex)
// {
// Logger.Error(LoggerType.KafkaException, $"Topic:{result.Topic}", null, ex.Message + ex.StackTrace);
// }
// }
// });
// }
// /// <summary>
// /// 批量消费模式-单次消费(消费出当前Kafka缓存的所有数据,并持续监听 300ms,如无新数据生产,则返回(最多一次消费 100条)
// /// </summary>
// /// <param name="Topic">主题</param>
// /// <param name="TimeOut">持续监听时间,单位ms 默认值:300ms</param>
// /// <param name="MaxRow">最多单次消费行数 默认值:100行</param>
// /// <returns>待消费数据</returns>
// public List<ConsumeResult<TKey, TValue>> ConsumeOnce(string Topic, int TimeOut = 300, int MaxRow = 100)
// {
// var builder = new ConsumerBuilder<TKey, TValue>(ConsumerConfig);
// //设置反序列化方式
// builder.SetValueDeserializer(new KafkaDConverter<TValue>());
// using var consumer = builder.Build();
// consumer.Subscribe(Topic);
// List<ConsumeResult<TKey, TValue>> Res = new List<ConsumeResult<TKey, TValue>>();
// while (true)
// {
// try
// {
// var result = consumer.Consume(TimeSpan.FromMilliseconds(TimeOut));
// if (result == null) break;
// else
// {
// Res.Add(result);
// //手动提交如果上面的EnableAutoCommit=true表示自动提交则无需调用Commit方法
// consumer.Commit();
// }
// if (Res.Count > MaxRow) break;
// }
// catch (Exception ex)
// {
// Logger.Error(LoggerType.KafkaException, $"Topic:{Topic}", null, ex.Message + ex.StackTrace);
// return null;
// }
// }
// return Res;
// }
// /// <summary>
// /// 单笔消费模式-单行消费
// /// </summary>
// /// <param name="Topic">主题</param>
// /// <param name="TimeOut">持续监听时间,单位ms 默认值:300ms</param>
// /// <returns>待消费数据</returns>
// public ConsumeResult<TKey, TValue> ConsumeOneRow(string Topic, int TimeOut = 300)
// {
// var builder = new ConsumerBuilder<TKey, TValue>(ConsumerConfig);
// //设置反序列化方式
// builder.SetValueDeserializer(new KafkaDConverter<TValue>());
// using var consumer = builder.Build();
// consumer.Subscribe(Topic);
// try
// {
// var result = consumer.Consume(TimeSpan.FromMilliseconds(TimeOut));
// if (result != null)
// {
// //手动提交如果上面的EnableAutoCommit=true表示自动提交则无需调用Commit方法
// consumer.Commit();
// }
// return result;
// }
// catch (Exception ex)
// {
// Logger.Error(LoggerType.KafkaException, $"Topic:{Topic}", null, ex.Message + ex.StackTrace);
// return null;
// }
// }
// public void Dispose()
// {
// //if (_cache != null)
// // _cache.Dispose();
// GC.SuppressFinalize(this);
// }
// }
//}