//using Confluent.Kafka;
//using System;
//using System.Collections.Generic;
//using System.Text;
//using System.Threading.Tasks;
//using VOL.Core.Enums;
//using VOL.Core.KafkaManager.IService;
//using VOL.Core.Services;
//namespace VOL.Core.KafkaManager.Service
//{
// ///
// /// 生产者 控制器或Service里面构造函数注入即可调用
// /// Message.Key的数据类型为string、Message.Value的数据类型为string
// ///
// /// Message.Key 的数据类型
// /// Message.Value 的数据类型
// public class KafkaProducer : KafkaConfig, IKafkaProducer
// {
// ///
// /// 构造生产者
// ///
// public KafkaProducer()
// {
// }
// ///
// /// Kafka地址(包含端口号)
// ///
// public string Servers
// {
// get
// {
// return ProducerConfig.BootstrapServers;
// }
// set
// {
// ProducerConfig.BootstrapServers = value;
// }
// }
// ///
// /// 生产
// ///
// /// Message.Key 做消息指定分区投放有用的
// /// Message.Value
// /// 主题
// public void Produce(TKey Key, TValue Value, string Topic)
// {
// var producerBuilder = new ProducerBuilder(ProducerConfig);
// producerBuilder.SetValueSerializer(new KafkaConverter());//设置序列化方式
// using var producer = producerBuilder.Build();
// try
// {
// producer.Produce(Topic, new Message
// {
// Key = Key,
// Value = Value
// }, (result) =>
// {
// if (result.Error.IsError)
// Logger.Error(LoggerType.KafkaException, $"Topic:{Topic},ServerIp:{KafkaHelper.GetServerIp()},ServerName:{ KafkaHelper.GetServerName()}", null, $"Delivery Error:{result.Error.Reason}");
// });//Value = JsonConvert.SerializeObject(value)
// }
// catch (ProduceException ex)
// {
// Logger.Error(LoggerType.KafkaException, $"Topic:{Topic},Delivery failed: { ex.Error.Reason}", null, ex.Message + ex.StackTrace);
// }
// }
// ///
// /// 生产异步
// ///
// /// Message.Key
// /// Message.Value
// /// 主题
// ///
// public async Task ProduceAsync(TKey Key, TValue Value, string Topic)
// {
// var producerBuilder = new ProducerBuilder(ProducerConfig);
// producerBuilder.SetValueSerializer(new KafkaConverter());
// using var producer = producerBuilder.Build();
// try
// {
// var dr = await producer.ProduceAsync(Topic, new Message
// {
// Key = Key,
// Value = Value
// });
// //Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
// }
// catch (ProduceException ex)
// {
// Logger.Error(LoggerType.KafkaException, $"Topic:{Topic},ServerIp:{KafkaHelper.GetServerIp()},ServerName:{ KafkaHelper.GetServerName()},Delivery failed: { ex.Error.Reason}", null, ex.Message + ex.StackTrace);
// }
// }
// }
//}