430 lines
17 KiB
C#
430 lines
17 KiB
C#
using System;
|
||
using System.Collections.Concurrent;
|
||
using System.Net.Sockets;
|
||
using System.Text;
|
||
using System.Threading.Tasks;
|
||
using GraphQL.Client.Http;
|
||
using GraphQL.Transport;
|
||
using GraphQL.Client.Serializer.Newtonsoft;
|
||
using GraphQL.Client.Abstractions;
|
||
using MQTTnet;
|
||
using MQTTnet.Client;
|
||
using System.Net.Http.Headers;
|
||
using LY.App.Common.Redis;
|
||
using LY.App.Model;
|
||
using Newtonsoft.Json;
|
||
using StackExchange.Redis;
|
||
using LY.App.Common.HttpUtil;
|
||
using LY.App.Device.Command;
|
||
|
||
namespace LY.App.Device
|
||
{
|
||
public enum ProtocolType
|
||
{
|
||
TCP,
|
||
UDP,
|
||
GraphQL,
|
||
MQTT
|
||
}
|
||
|
||
public class Device
|
||
{
|
||
public long Id { get; set; }
|
||
public ProtocolType Protocol { get; set; }
|
||
public string IpAddress { get; set; }
|
||
public int Port { get; set; }
|
||
public string GraphQlEndpoint { get; set; }
|
||
public bool IsConnected { get; set; }
|
||
public string BrokerAddress { get; set; } // MQTT Broker 地址
|
||
public string Topic { get; set; } // MQTT 主题
|
||
public string username { get; set; } // MQTT 用户名
|
||
public string password { get; set; } // MQTT 密码
|
||
/// <summary>
|
||
/// 数据接收事件
|
||
/// </summary>
|
||
public Action<string> DataReceived { get; set; }
|
||
}
|
||
|
||
public class DeviceManager
|
||
{
|
||
private readonly ConcurrentDictionary<long, Device> _devices = new();
|
||
private readonly ConcurrentDictionary<long, IMqttClient> _mqttClients = new(); // 维护 MQTT 客户端
|
||
private static DeviceManager _instance;
|
||
private readonly RedisService _redis = ServiceLocator.Instance.GetService<RedisService>();
|
||
private static readonly object _lock = new object();
|
||
|
||
private DeviceManager() { }
|
||
public static DeviceManager Instance
|
||
{
|
||
get
|
||
{
|
||
lock (_lock)
|
||
{
|
||
if (_instance == null)
|
||
{
|
||
_instance = new DeviceManager();
|
||
}
|
||
return _instance;
|
||
}
|
||
}
|
||
}
|
||
|
||
public void AddDevice(Device device)
|
||
{
|
||
if (_devices.TryAdd(device.Id, device))
|
||
{
|
||
switch (device.Protocol)
|
||
{
|
||
case ProtocolType.TCP:
|
||
Task.Run(() => HandleTcpDevice(device));
|
||
break;
|
||
case ProtocolType.UDP:
|
||
Task.Run(() => HandleUdpDevice(device));
|
||
break;
|
||
case ProtocolType.GraphQL:
|
||
Task.Run(() => HandleGraphQLDevice(device));
|
||
break;
|
||
case ProtocolType.MQTT:
|
||
Task.Run(() => HandleMqttDevice(device));
|
||
break;
|
||
}
|
||
}
|
||
}
|
||
|
||
public async void RemoveDevice(long deviceId)
|
||
{
|
||
if (_devices.TryRemove(deviceId, out var device))
|
||
{
|
||
device.IsConnected = false; // 终止接收数据的循环
|
||
|
||
// 对于 MQTT 协议,断开客户端连接并清理
|
||
if (device.Protocol == ProtocolType.MQTT && _mqttClients.TryRemove(deviceId, out var mqttClient))
|
||
{
|
||
try
|
||
{
|
||
if (mqttClient.IsConnected)
|
||
{
|
||
await mqttClient.DisconnectAsync();
|
||
}
|
||
mqttClient.Dispose();
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
Console.WriteLine($"设备 {deviceId} 断开 MQTT 连接时出错: {ex.Message}");
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
public ConcurrentDictionary<long, Device> GetAllDevices()
|
||
{
|
||
return _devices;
|
||
}
|
||
|
||
public async Task SendCommand(long deviceId, string command)
|
||
{
|
||
if (_devices.TryGetValue(deviceId, out var device))
|
||
{
|
||
var data = Encoding.UTF8.GetBytes(command);
|
||
switch (device.Protocol)
|
||
{
|
||
case ProtocolType.TCP:
|
||
using (var client = new TcpClient())
|
||
{
|
||
await client.ConnectAsync(device.IpAddress, device.Port);
|
||
var stream = client.GetStream();
|
||
await stream.WriteAsync(data, 0, data.Length);
|
||
}
|
||
break;
|
||
case ProtocolType.UDP:
|
||
using (var udpClient = new UdpClient())
|
||
{
|
||
await udpClient.SendAsync(data, data.Length, device.IpAddress, device.Port);
|
||
}
|
||
break;
|
||
case ProtocolType.GraphQL:
|
||
using (var graphQLClient = new GraphQLHttpClient(device.GraphQlEndpoint, new NewtonsoftJsonSerializer()))
|
||
{
|
||
var token = await _redis.GetAsync<string>(RedisKeyList.DeviceTokenById(deviceId));
|
||
var response = await Send<dynamic>(device.GraphQlEndpoint, token, command);
|
||
}
|
||
break;
|
||
case ProtocolType.MQTT:
|
||
if (_mqttClients.TryGetValue(deviceId, out var mqttClient) && mqttClient.IsConnected)
|
||
{
|
||
var message = new MqttApplicationMessageBuilder()
|
||
.WithTopic(device.Topic)
|
||
.WithPayload(data)
|
||
.Build();
|
||
await mqttClient.PublishAsync(message);
|
||
Console.WriteLine($"设备 {deviceId} (MQTT) 发送命令 {command} 成功");
|
||
}
|
||
else
|
||
{
|
||
throw new Exception("MQTT 客户端未连接");
|
||
}
|
||
break;
|
||
}
|
||
}
|
||
}
|
||
|
||
private async Task HandleTcpDevice(Device device)
|
||
{
|
||
try
|
||
{
|
||
using var client = new TcpClient();
|
||
await client.ConnectAsync(device.IpAddress, device.Port);
|
||
device.IsConnected = true;
|
||
var buffer = new byte[1024];
|
||
var stream = client.GetStream();
|
||
|
||
while (device.IsConnected)
|
||
{
|
||
var bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length);
|
||
if (bytesRead > 0)
|
||
{
|
||
var data = Encoding.UTF8.GetString(buffer, 0, bytesRead);
|
||
device.DataReceived?.Invoke(data);
|
||
}
|
||
}
|
||
}
|
||
catch (Exception)
|
||
{
|
||
device.IsConnected = false;
|
||
}
|
||
}
|
||
|
||
private async Task HandleUdpDevice(Device device)
|
||
{
|
||
using var udpClient = new UdpClient(device.Port);
|
||
device.IsConnected = true;
|
||
|
||
while (device.IsConnected)
|
||
{
|
||
var result = await udpClient.ReceiveAsync();
|
||
var data = Encoding.UTF8.GetString(result.Buffer);
|
||
device.DataReceived?.Invoke(data);
|
||
}
|
||
}
|
||
|
||
private async Task HandleGraphQLDevice(Device device)
|
||
{
|
||
try
|
||
{
|
||
var user = new
|
||
{
|
||
device.username,
|
||
device.password
|
||
};
|
||
var jsonstr = JsonConvert.SerializeObject(user);
|
||
//登陆 历正设备,获取token
|
||
var loginUrl = $"https://{device.IpAddress}/login";
|
||
var req = await RequestUtil.PostAsync(loginUrl, jsonstr);
|
||
if (req != null)
|
||
{
|
||
var userinfo = JsonConvert.DeserializeObject<Dto.LoginModel>(req);
|
||
if (userinfo != null)
|
||
{
|
||
Console.WriteLine($"设备 {device.Id} 连接成功");
|
||
string token = userinfo.token;
|
||
device.IsConnected = true;
|
||
await _redis.SetAsync<string>(RedisKeyList.DeviceTokenById(device.Id), token);
|
||
device.GraphQlEndpoint = $"https://{device.IpAddress}/rf/graphql";//更新graphql地址
|
||
using var graphQLClient = new GraphQLHttpClient(device.GraphQlEndpoint, new NewtonsoftJsonSerializer());
|
||
while (device.IsConnected)
|
||
{
|
||
var response = await Send<dynamic>(device.GraphQlEndpoint, token, GraphCommand.GetFindTarget());
|
||
device.DataReceived?.Invoke(response.ToString());
|
||
await Task.Delay(1000);
|
||
}
|
||
}
|
||
}
|
||
Console.WriteLine("登录失败");
|
||
}
|
||
catch (Exception)
|
||
{
|
||
device.IsConnected = false;
|
||
}
|
||
}
|
||
private async Task<T> Send<T>(string url, string token, string cmd)
|
||
{
|
||
try
|
||
{
|
||
var httpClientHandle = new HttpClientHandler();
|
||
httpClientHandle.ServerCertificateCustomValidationCallback = HttpClientHandler.DangerousAcceptAnyServerCertificateValidator;
|
||
var client = new HttpClient(httpClientHandle);
|
||
var opt = new GraphQLHttpClientOptions() { EndPoint = new Uri(url) };
|
||
var graphQLClient = new GraphQLHttpClient(opt, new NewtonsoftJsonSerializer(), client);
|
||
var request = new GraphQL.GraphQLRequest
|
||
{
|
||
Query = cmd
|
||
};
|
||
|
||
graphQLClient.HttpClient.DefaultRequestHeaders.UserAgent.Add(new ProductInfoHeaderValue("Safari", "537.36"));
|
||
graphQLClient.HttpClient.DefaultRequestHeaders.Add("Authorization", "Bearer " + token);
|
||
graphQLClient.HttpClient.DefaultRequestHeaders.Add("ContentType", "application/json");
|
||
var graphQLResponse = await graphQLClient.SendQueryAsync<T>(request);
|
||
return graphQLResponse.Data;
|
||
}
|
||
catch { }
|
||
return default;
|
||
}
|
||
private async Task HandleMqttDevice(Device device)
|
||
{
|
||
try
|
||
{
|
||
var factory = new MqttFactory();
|
||
var mqttClient = factory.CreateMqttClient();
|
||
|
||
var options = new MqttClientOptionsBuilder()
|
||
.WithTcpServer(device.IpAddress, device.Port)
|
||
.WithCredentials(device.username, device.password)
|
||
.WithClientId(Guid.NewGuid().ToString())
|
||
.Build();
|
||
|
||
mqttClient.ConnectedAsync += async e =>
|
||
{
|
||
Console.WriteLine($"设备 {device.Id} 连接成功");
|
||
await mqttClient.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(device.Topic).Build());
|
||
};
|
||
|
||
mqttClient.DisconnectedAsync += async e =>
|
||
{
|
||
Console.WriteLine($"设备 {device.Id} 掉线,尝试重连...");
|
||
device.IsConnected = false;
|
||
await HandleDeviceConnection(device);
|
||
};
|
||
|
||
mqttClient.ApplicationMessageReceivedAsync += e =>
|
||
{
|
||
// 接收到 MQTT 消息后,处理数据
|
||
var message = Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment.Array);
|
||
device.DataReceived?.Invoke(message);
|
||
return Task.CompletedTask;
|
||
};
|
||
|
||
await mqttClient.ConnectAsync(options);
|
||
if (mqttClient.IsConnected)
|
||
{
|
||
_mqttClients[device.Id] = mqttClient; // 保存 MQTT 客户端实例
|
||
device.IsConnected = true;
|
||
}
|
||
|
||
|
||
//while (device.isconnected)
|
||
//{
|
||
// await task.delay(1000);
|
||
//}
|
||
|
||
// 循环结束后断开连接
|
||
//if (!mqttClient.IsConnected)
|
||
//{
|
||
// await mqttClient.DisconnectAsync();
|
||
// mqttClient.Dispose();
|
||
//}
|
||
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
Console.WriteLine($"设备 {device.Id} 连接失败: {ex.Message}");
|
||
device.IsConnected = false;
|
||
}
|
||
}
|
||
private async Task HandleDeviceConnection(Device device)
|
||
{
|
||
int retryDelay = 1000; // 初始重连间隔(毫秒)
|
||
int maxDelay = 30000; // 最大重连间隔(30秒)
|
||
|
||
while (!device.IsConnected)
|
||
{
|
||
try
|
||
{
|
||
switch (device.Protocol)
|
||
{
|
||
case ProtocolType.TCP:
|
||
await HandleTcpDevice(device);
|
||
break;
|
||
case ProtocolType.UDP:
|
||
await HandleUdpDevice(device);
|
||
break;
|
||
case ProtocolType.GraphQL:
|
||
await HandleGraphQLDevice(device);
|
||
break;
|
||
case ProtocolType.MQTT:
|
||
await HandleMqttDevice(device);
|
||
break;
|
||
}
|
||
if (device.IsConnected)
|
||
{
|
||
// 连接成功,重置重连间隔
|
||
retryDelay = 1000;
|
||
}
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
Console.WriteLine($"设备 {device.Id} 连接失败,{ex.Message},{retryDelay}ms 后重试...");
|
||
}
|
||
|
||
Console.WriteLine($"设备 {device.Id} 连接失败,{retryDelay}ms 后重试...");
|
||
await Task.Delay(retryDelay);
|
||
// 指数退避算法,避免频繁重试
|
||
retryDelay = Math.Min(retryDelay * 2, maxDelay);
|
||
}
|
||
}
|
||
public async Task<bool> TestConnection(long deviceId)
|
||
{
|
||
if (_devices.TryGetValue(deviceId, out var device))
|
||
{
|
||
try
|
||
{
|
||
switch (device.Protocol)
|
||
{
|
||
case ProtocolType.TCP:
|
||
using (TcpClient client = new TcpClient())
|
||
{
|
||
await client.ConnectAsync(device.IpAddress, device.Port);
|
||
return true;
|
||
}
|
||
case ProtocolType.UDP:
|
||
using (var udpClient = new UdpClient())
|
||
{
|
||
await udpClient.SendAsync(new byte[] { 0 }, 1, device.IpAddress, device.Port);
|
||
return true;
|
||
}
|
||
case ProtocolType.GraphQL:
|
||
using (var graphQLClient = new GraphQLHttpClient(device.GraphQlEndpoint, new NewtonsoftJsonSerializer()))
|
||
{
|
||
var request = new GraphQL.GraphQLRequest { Query = "{ deviceStatus { id status } }" };
|
||
var response = await graphQLClient.SendQueryAsync<dynamic>(request);
|
||
return response.Data != null;
|
||
}
|
||
case ProtocolType.MQTT:
|
||
var factory = new MqttFactory();
|
||
var mqttClient = factory.CreateMqttClient();
|
||
var options = new MqttClientOptionsBuilder()
|
||
.WithTcpServer(device.BrokerAddress)
|
||
.WithClientId(Guid.NewGuid().ToString())
|
||
.Build();
|
||
await mqttClient.ConnectAsync(options);
|
||
await mqttClient.DisconnectAsync();
|
||
return true;
|
||
default:
|
||
return false;
|
||
}
|
||
}
|
||
catch (Exception)
|
||
{
|
||
return false;
|
||
}
|
||
}
|
||
return false;
|
||
}
|
||
|
||
public Device GetDevice(long deviceId)
|
||
{
|
||
_devices.TryGetValue(deviceId, out var device);
|
||
return device;
|
||
}
|
||
}
|
||
}
|