using GraphQL.Client.Http;
using GraphQL.Client.Serializer.Newtonsoft;
using LY.App.Common.HttpUtil;
using LY.App.Common.Redis;
using LY.App.Device.Command;
using LY.App.Model;
using LY.App.Service;
using MQTTnet;
using MQTTnet.Client;
using Newtonsoft.Json;
using System.Collections.Concurrent;
using System.Net.Http.Headers;
using System.Net.Sockets;
using System.Text;
namespace LY.App.Device
{
public enum ProtocolType
{
TCP,
UDP,
GraphQL,
MQTT
}
public class Device
{
public long Id { get; set; }
public ProtocolType Protocol { get; set; }
public string IpAddress { get; set; }
public int Port { get; set; }
public string GraphQlEndpoint { get; set; }
public bool IsConnected { get; set; }
public string BrokerAddress { get; set; } // MQTT Broker 地址
public string Topic { get; set; } // MQTT 主题
public string username { get; set; } // MQTT 用户名
public string password { get; set; } // MQTT 密码
///
/// 数据接收事件
///
public Action DataReceived { get; set; }
}
public class DeviceManager
{
private readonly ConcurrentDictionary _devices = new();
private readonly ConcurrentDictionary _mqttClients = new(); // 维护 MQTT 客户端
private static DeviceManager _instance;
private readonly RedisService _redis = ServiceLocator.Instance.GetService();
private static readonly object _lock = new object();
private readonly CancellationTokenSource _monitorCancellationTokenSource = new();
private DeviceManager()
{
Task.Run(() => MonitorDevices(_monitorCancellationTokenSource.Token));
}
public static DeviceManager Instance
{
get
{
lock (_lock)
{
if (_instance == null)
{
_instance = new DeviceManager();
}
return _instance;
}
}
}
public void AddDevice(Device device)
{
if (_devices.TryAdd(device.Id, device))
{
switch (device.Protocol)
{
case ProtocolType.TCP:
Task.Run(() => HandleTcpDevice(device));
break;
case ProtocolType.UDP:
Task.Run(() => HandleUdpDevice(device));
break;
case ProtocolType.GraphQL:
Task.Run(() => HandleGraphQLDevice(device));
break;
case ProtocolType.MQTT:
Task.Run(() => HandleMqttDevice(device));
break;
}
}
}
///
/// 删除
///
///
///
public async Task RemoveDevice(long deviceId)
{
if (_devices.TryRemove(deviceId, out var device))
{
device.IsConnected = false; // 终止接收数据的循环
// 对于 MQTT 协议,断开客户端连接并清理
if (device.Protocol == ProtocolType.MQTT && _mqttClients.TryRemove(deviceId, out var mqttClient))
{
try
{
if (mqttClient.IsConnected)
{
await mqttClient.DisconnectAsync();
}
mqttClient.Dispose();
}
catch (Exception ex)
{
Console.WriteLine($"设备 {deviceId} 断开 MQTT 连接时出错: {ex.Message}");
}
}
}
}
public ConcurrentDictionary GetAllDevices()
{
return _devices;
}
///
/// 下发命令
///
///
///
///
///
public async Task SendCommand(long deviceId, string command)
{
if (_devices.TryGetValue(deviceId, out var device))
{
var data = Encoding.UTF8.GetBytes(command);
switch (device.Protocol)
{
case ProtocolType.TCP:
using (var client = new TcpClient())
{
await client.ConnectAsync(device.IpAddress, device.Port);
var stream = client.GetStream();
await stream.WriteAsync(data, 0, data.Length);
}
break;
case ProtocolType.UDP:
using (var udpClient = new UdpClient())
{
await udpClient.SendAsync(data, data.Length, device.IpAddress, device.Port);
}
break;
case ProtocolType.GraphQL:
using (var graphQLClient = new GraphQLHttpClient(device.GraphQlEndpoint, new NewtonsoftJsonSerializer()))
{
var token = await _redis.GetAsync(RedisKeyList.DeviceTokenById(deviceId));
var response = await Send(device.GraphQlEndpoint, token, command);
}
break;
case ProtocolType.MQTT:
if (_mqttClients.TryGetValue(deviceId, out var mqttClient) && mqttClient.IsConnected)
{
var message = new MqttApplicationMessageBuilder()
.WithTopic(device.Topic)
.WithPayload(data)
.Build();
await mqttClient.PublishAsync(message);
Console.WriteLine($"设备 {deviceId} (MQTT) 发送命令 {command} 成功");
}
else
{
throw new Exception("MQTT 客户端未连接");
}
break;
}
}
}
private async Task HandleTcpDevice(Device device)
{
try
{
using var client = new TcpClient();
await client.ConnectAsync(device.IpAddress, device.Port);
device.IsConnected = true;
var buffer = new byte[1024];
var stream = client.GetStream();
while (device.IsConnected)
{
var bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length);
if (bytesRead > 0)
{
var data = Encoding.UTF8.GetString(buffer, 0, bytesRead);
device.DataReceived?.Invoke(data);
}
}
}
catch (Exception)
{
device.IsConnected = false;
}
}
private async Task HandleUdpDevice(Device device)
{
using var udpClient = new UdpClient(device.Port);
device.IsConnected = true;
while (device.IsConnected)
{
var result = await udpClient.ReceiveAsync();
var data = Encoding.UTF8.GetString(result.Buffer);
device.DataReceived?.Invoke(data);
}
}
private async Task HandleGraphQLDevice(Device device)
{
try
{
var user = new
{
device.username,
device.password
};
var jsonstr = JsonConvert.SerializeObject(user);
//登陆 历正设备,获取token
var loginUrl = $"https://{device.IpAddress}/login";
var req = await RequestUtil.PostAsync(loginUrl, jsonstr);
if (req != null)
{
var userinfo = JsonConvert.DeserializeObject(req);
if (userinfo != null)
{
Console.WriteLine($"设备 {device.Id} 连接成功");
string token = userinfo.token;
device.IsConnected = true;
await _redis.SetAsync(RedisKeyList.DeviceTokenById(device.Id), token);
device.GraphQlEndpoint = $"https://{device.IpAddress}/rf/graphql";//更新graphql地址
using var graphQLClient = new GraphQLHttpClient(device.GraphQlEndpoint, new NewtonsoftJsonSerializer());
while (device.IsConnected)
{
var response = await Send(device.GraphQlEndpoint, token, GraphCommand.GetFindTarget());
device.DataReceived?.Invoke(response.ToString());
await Task.Delay(1000);
}
}
}
Console.WriteLine("登录失败");
}
catch (Exception)
{
device.IsConnected = false;
}
}
private async Task Send(string url, string token, string cmd)
{
try
{
var httpClientHandle = new HttpClientHandler();
httpClientHandle.ServerCertificateCustomValidationCallback = HttpClientHandler.DangerousAcceptAnyServerCertificateValidator;
var client = new HttpClient(httpClientHandle);
var opt = new GraphQLHttpClientOptions() { EndPoint = new Uri(url) };
var graphQLClient = new GraphQLHttpClient(opt, new NewtonsoftJsonSerializer(), client);
var request = new GraphQL.GraphQLRequest
{
Query = cmd
};
graphQLClient.HttpClient.DefaultRequestHeaders.UserAgent.Add(new ProductInfoHeaderValue("Safari", "537.36"));
graphQLClient.HttpClient.DefaultRequestHeaders.Add("Authorization", "Bearer " + token);
graphQLClient.HttpClient.DefaultRequestHeaders.Add("ContentType", "application/json");
var graphQLResponse = await graphQLClient.SendQueryAsync(request);
return graphQLResponse.Data;
}
catch { }
return default;
}
private async Task HandleMqttDevice(Device device)
{
try
{
var factory = new MqttFactory();
var mqttClient = factory.CreateMqttClient();
var options = new MqttClientOptionsBuilder()
.WithTcpServer(device.IpAddress, device.Port)
.WithCredentials(device.username, device.password)
.WithClientId(Guid.NewGuid().ToString())
.Build();
mqttClient.ConnectedAsync += async e =>
{
Console.WriteLine($"设备 {device.Id} 连接成功");
await mqttClient.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(device.Topic).Build());
};
mqttClient.DisconnectedAsync += async e =>
{
Console.WriteLine($"设备 {device.Id} 掉线,尝试重连...");
device.IsConnected = false;
};
mqttClient.ApplicationMessageReceivedAsync += e =>
{
// 接收到 MQTT 消息后,处理数据
var message = Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment.Array);
device.DataReceived?.Invoke(message);
return Task.CompletedTask;
};
await mqttClient.ConnectAsync(options);
if (mqttClient.IsConnected)
{
_mqttClients[device.Id] = mqttClient; // 保存 MQTT 客户端实例
device.IsConnected = true;
}
}
catch (Exception ex)
{
Console.WriteLine($"设备 {device.Id} 连接失败: {ex.Message}");
device.IsConnected = false;
}
}
private async Task HandleDeviceConnection(Device device)
{
int retryDelay = 10000; // 初始重连间隔(1秒)
int maxDelay = 30000; // 最大重连间隔(30秒)
var _log = ServiceLocator.Instance.GetService();
await _log?.AddLog(new AddLog { Message = $"设备 {device.Id} 掉线,重新连接中...", Parameters = "", StackTrace = "", url = "" });
while (!device.IsConnected)
{
try
{
switch (device.Protocol)
{
case ProtocolType.TCP:
await HandleTcpDevice(device);
break;
case ProtocolType.UDP:
await HandleUdpDevice(device);
break;
case ProtocolType.GraphQL:
await HandleGraphQLDevice(device);
break;
case ProtocolType.MQTT:
await HandleMqttDevice(device);
break;
}
if (device.IsConnected)
{
retryDelay = 1000; // 连接成功后重置重连间隔
Console.WriteLine($"设备 {device.Id} 重新连接成功");
await _log.AddLog(new AddLog { Message = $"设备 {device.Id} 重新连接成功", Parameters = "", StackTrace = "", url = "" });
}
}
catch (Exception ex)
{
Console.WriteLine($"设备 {device.Id} 连接失败,{retryDelay}ms 后重试... 错误: {ex.Message}");
}
await Task.Delay(retryDelay);
retryDelay = Math.Min(retryDelay * 2, maxDelay); // 指数退避算法,避免频繁重试
}
}
///
/// 监控设备状态
///
///
///
private async Task MonitorDevices(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
foreach (var device in _devices.Values)
{
if (!device.IsConnected)
{
Console.WriteLine($"设备 {device.Id} 掉线,尝试重新连接...");
_ = Task.Run(() => HandleDeviceConnection(device)); // 异步执行设备重连
}
}
await Task.Delay(10000, cancellationToken); // 每 10 秒检查一次设备状态
}
}
///
/// 测试设备连接
///
///
///
public async Task TestConnection(long deviceId)
{
if (_devices.TryGetValue(deviceId, out var device))
{
try
{
switch (device.Protocol)
{
case ProtocolType.TCP:
using (TcpClient client = new TcpClient())
{
await client.ConnectAsync(device.IpAddress, device.Port);
return true;
}
case ProtocolType.UDP:
using (var udpClient = new UdpClient())
{
await udpClient.SendAsync(new byte[] { 0 }, 1, device.IpAddress, device.Port);
return true;
}
case ProtocolType.GraphQL:
using (var graphQLClient = new GraphQLHttpClient(device.GraphQlEndpoint, new NewtonsoftJsonSerializer()))
{
var request = new GraphQL.GraphQLRequest { Query = "{ deviceStatus { id status } }" };
var response = await graphQLClient.SendQueryAsync(request);
return response.Data != null;
}
case ProtocolType.MQTT:
var factory = new MqttFactory();
var mqttClient = factory.CreateMqttClient();
var options = new MqttClientOptionsBuilder()
.WithTcpServer(device.BrokerAddress)
.WithClientId(Guid.NewGuid().ToString())
.Build();
await mqttClient.ConnectAsync(options);
await mqttClient.DisconnectAsync();
return true;
default:
return false;
}
}
catch (Exception)
{
return false;
}
}
return false;
}
public Device GetDevice(long deviceId)
{
_devices.TryGetValue(deviceId, out var device);
return device;
}
}
}