430 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			C#
		
	
	
	
			
		
		
	
	
			430 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			C#
		
	
	
	
using System;
 | 
						||
using System.Collections.Concurrent;
 | 
						||
using System.Net.Sockets;
 | 
						||
using System.Text;
 | 
						||
using System.Threading.Tasks;
 | 
						||
using GraphQL.Client.Http;
 | 
						||
using GraphQL.Transport;
 | 
						||
using GraphQL.Client.Serializer.Newtonsoft;
 | 
						||
using GraphQL.Client.Abstractions;
 | 
						||
using MQTTnet;
 | 
						||
using MQTTnet.Client;
 | 
						||
using System.Net.Http.Headers;
 | 
						||
using LY.App.Common.Redis;
 | 
						||
using LY.App.Model;
 | 
						||
using Newtonsoft.Json;
 | 
						||
using StackExchange.Redis;
 | 
						||
using LY.App.Common.HttpUtil;
 | 
						||
using LY.App.Device.Command;
 | 
						||
 | 
						||
namespace LY.App.Device
 | 
						||
{
 | 
						||
    public enum ProtocolType
 | 
						||
    {
 | 
						||
        TCP,
 | 
						||
        UDP,
 | 
						||
        GraphQL,
 | 
						||
        MQTT
 | 
						||
    }
 | 
						||
 | 
						||
    public class Device
 | 
						||
    {
 | 
						||
        public long Id { get; set; }
 | 
						||
        public ProtocolType Protocol { get; set; }
 | 
						||
        public string IpAddress { get; set; }
 | 
						||
        public int Port { get; set; }
 | 
						||
        public string GraphQlEndpoint { get; set; }
 | 
						||
        public bool IsConnected { get; set; }
 | 
						||
        public string BrokerAddress { get; set; }  // MQTT Broker 地址
 | 
						||
        public string Topic { get; set; }          // MQTT 主题
 | 
						||
        public string username { get; set; }       // MQTT 用户名
 | 
						||
        public string password { get; set; }       // MQTT 密码
 | 
						||
        /// <summary>
 | 
						||
        /// 数据接收事件
 | 
						||
        /// </summary>
 | 
						||
        public Action<string> DataReceived { get; set; }
 | 
						||
    }
 | 
						||
 | 
						||
    public class DeviceManager
 | 
						||
    {
 | 
						||
        private readonly ConcurrentDictionary<long, Device> _devices = new();
 | 
						||
        private readonly ConcurrentDictionary<long, IMqttClient> _mqttClients = new(); // 维护 MQTT 客户端
 | 
						||
        private static DeviceManager _instance;
 | 
						||
        private readonly RedisService _redis = ServiceLocator.Instance.GetService<RedisService>();
 | 
						||
        private static readonly object _lock = new object();
 | 
						||
 | 
						||
        private DeviceManager() { }
 | 
						||
        public static DeviceManager Instance
 | 
						||
        {
 | 
						||
            get
 | 
						||
            {
 | 
						||
                lock (_lock)
 | 
						||
                {
 | 
						||
                    if (_instance == null)
 | 
						||
                    {
 | 
						||
                        _instance = new DeviceManager();
 | 
						||
                    }
 | 
						||
                    return _instance;
 | 
						||
                }
 | 
						||
            }
 | 
						||
        }
 | 
						||
 | 
						||
        public void AddDevice(Device device)
 | 
						||
        {
 | 
						||
            if (_devices.TryAdd(device.Id, device))
 | 
						||
            {
 | 
						||
                switch (device.Protocol)
 | 
						||
                {
 | 
						||
                    case ProtocolType.TCP:
 | 
						||
                        Task.Run(() => HandleTcpDevice(device));
 | 
						||
                        break;
 | 
						||
                    case ProtocolType.UDP:
 | 
						||
                        Task.Run(() => HandleUdpDevice(device));
 | 
						||
                        break;
 | 
						||
                    case ProtocolType.GraphQL:
 | 
						||
                        Task.Run(() => HandleGraphQLDevice(device));
 | 
						||
                        break;
 | 
						||
                    case ProtocolType.MQTT:
 | 
						||
                        Task.Run(() => HandleMqttDevice(device));
 | 
						||
                        break;
 | 
						||
                }
 | 
						||
            }
 | 
						||
        }
 | 
						||
 | 
						||
        public async void RemoveDevice(long deviceId)
 | 
						||
        {
 | 
						||
            if (_devices.TryRemove(deviceId, out var device))
 | 
						||
            {
 | 
						||
                device.IsConnected = false; // 终止接收数据的循环
 | 
						||
 | 
						||
                // 对于 MQTT 协议,断开客户端连接并清理
 | 
						||
                if (device.Protocol == ProtocolType.MQTT && _mqttClients.TryRemove(deviceId, out var mqttClient))
 | 
						||
                {
 | 
						||
                    try
 | 
						||
                    {
 | 
						||
                        if (mqttClient.IsConnected)
 | 
						||
                        {
 | 
						||
                            await mqttClient.DisconnectAsync();
 | 
						||
                        }
 | 
						||
                        mqttClient.Dispose();
 | 
						||
                    }
 | 
						||
                    catch (Exception ex)
 | 
						||
                    {
 | 
						||
                        Console.WriteLine($"设备 {deviceId} 断开 MQTT 连接时出错: {ex.Message}");
 | 
						||
                    }
 | 
						||
                }
 | 
						||
            }
 | 
						||
        }
 | 
						||
 | 
						||
        public ConcurrentDictionary<long, Device> GetAllDevices()
 | 
						||
        {
 | 
						||
            return _devices;
 | 
						||
        }
 | 
						||
 | 
						||
        public async Task SendCommand(long deviceId, string command)
 | 
						||
        {
 | 
						||
            if (_devices.TryGetValue(deviceId, out var device))
 | 
						||
            {
 | 
						||
                var data = Encoding.UTF8.GetBytes(command);
 | 
						||
                switch (device.Protocol)
 | 
						||
                {
 | 
						||
                    case ProtocolType.TCP:
 | 
						||
                        using (var client = new TcpClient())
 | 
						||
                        {
 | 
						||
                            await client.ConnectAsync(device.IpAddress, device.Port);
 | 
						||
                            var stream = client.GetStream();
 | 
						||
                            await stream.WriteAsync(data, 0, data.Length);
 | 
						||
                        }
 | 
						||
                        break;
 | 
						||
                    case ProtocolType.UDP:
 | 
						||
                        using (var udpClient = new UdpClient())
 | 
						||
                        {
 | 
						||
                            await udpClient.SendAsync(data, data.Length, device.IpAddress, device.Port);
 | 
						||
                        }
 | 
						||
                        break;
 | 
						||
                    case ProtocolType.GraphQL:
 | 
						||
                        using (var graphQLClient = new GraphQLHttpClient(device.GraphQlEndpoint, new NewtonsoftJsonSerializer()))
 | 
						||
                        {
 | 
						||
                            var token = await _redis.GetAsync<string>(RedisKeyList.DeviceTokenById(deviceId));
 | 
						||
                            var response = await Send<dynamic>(device.GraphQlEndpoint, token, command);
 | 
						||
                        }
 | 
						||
                        break;
 | 
						||
                    case ProtocolType.MQTT:
 | 
						||
                        if (_mqttClients.TryGetValue(deviceId, out var mqttClient) && mqttClient.IsConnected)
 | 
						||
                        {
 | 
						||
                            var message = new MqttApplicationMessageBuilder()
 | 
						||
                                .WithTopic(device.Topic)
 | 
						||
                                .WithPayload(data)
 | 
						||
                                .Build();
 | 
						||
                            await mqttClient.PublishAsync(message);
 | 
						||
                            Console.WriteLine($"设备 {deviceId} (MQTT) 发送命令 {command} 成功");
 | 
						||
                        }
 | 
						||
                        else
 | 
						||
                        {
 | 
						||
                            throw new Exception("MQTT 客户端未连接");
 | 
						||
                        }
 | 
						||
                        break;
 | 
						||
                }
 | 
						||
            }
 | 
						||
        }
 | 
						||
 | 
						||
        private async Task HandleTcpDevice(Device device)
 | 
						||
        {
 | 
						||
            try
 | 
						||
            {
 | 
						||
                using var client = new TcpClient();
 | 
						||
                await client.ConnectAsync(device.IpAddress, device.Port);
 | 
						||
                device.IsConnected = true;
 | 
						||
                var buffer = new byte[1024];
 | 
						||
                var stream = client.GetStream();
 | 
						||
 | 
						||
                while (device.IsConnected)
 | 
						||
                {
 | 
						||
                    var bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length);
 | 
						||
                    if (bytesRead > 0)
 | 
						||
                    {
 | 
						||
                        var data = Encoding.UTF8.GetString(buffer, 0, bytesRead);
 | 
						||
                        device.DataReceived?.Invoke(data);
 | 
						||
                    }
 | 
						||
                }
 | 
						||
            }
 | 
						||
            catch (Exception)
 | 
						||
            {
 | 
						||
                device.IsConnected = false;
 | 
						||
            }
 | 
						||
        }
 | 
						||
 | 
						||
        private async Task HandleUdpDevice(Device device)
 | 
						||
        {
 | 
						||
            using var udpClient = new UdpClient(device.Port);
 | 
						||
            device.IsConnected = true;
 | 
						||
 | 
						||
            while (device.IsConnected)
 | 
						||
            {
 | 
						||
                var result = await udpClient.ReceiveAsync();
 | 
						||
                var data = Encoding.UTF8.GetString(result.Buffer);
 | 
						||
                device.DataReceived?.Invoke(data);
 | 
						||
            }
 | 
						||
        }
 | 
						||
 | 
						||
        private async Task HandleGraphQLDevice(Device device)
 | 
						||
        {
 | 
						||
            try
 | 
						||
            {
 | 
						||
                var user = new
 | 
						||
                {
 | 
						||
                    device.username,
 | 
						||
                    device.password
 | 
						||
                };
 | 
						||
                var jsonstr = JsonConvert.SerializeObject(user);
 | 
						||
                //登陆 历正设备,获取token
 | 
						||
                var loginUrl = $"https://{device.IpAddress}/login";
 | 
						||
                var req = await RequestUtil.PostAsync(loginUrl, jsonstr);
 | 
						||
                if (req != null)
 | 
						||
                {
 | 
						||
                    var userinfo = JsonConvert.DeserializeObject<Dto.LoginModel>(req);
 | 
						||
                    if (userinfo != null)
 | 
						||
                    {
 | 
						||
                        Console.WriteLine($"设备 {device.Id} 连接成功");
 | 
						||
                        string token = userinfo.token;
 | 
						||
                        device.IsConnected = true;
 | 
						||
                        await _redis.SetAsync<string>(RedisKeyList.DeviceTokenById(device.Id), token);
 | 
						||
                        device.GraphQlEndpoint = $"https://{device.IpAddress}/rf/graphql";//更新graphql地址
 | 
						||
                        using var graphQLClient = new GraphQLHttpClient(device.GraphQlEndpoint, new NewtonsoftJsonSerializer());
 | 
						||
                        while (device.IsConnected)
 | 
						||
                        {
 | 
						||
                            var response = await Send<dynamic>(device.GraphQlEndpoint, token, GraphCommand.GetFindTarget());
 | 
						||
                            device.DataReceived?.Invoke(response.ToString());
 | 
						||
                            await Task.Delay(1000);
 | 
						||
                        }
 | 
						||
                    }
 | 
						||
                }
 | 
						||
                Console.WriteLine("登录失败");
 | 
						||
            }
 | 
						||
            catch (Exception)
 | 
						||
            {
 | 
						||
                device.IsConnected = false;
 | 
						||
            }
 | 
						||
        }
 | 
						||
        private async Task<T> Send<T>(string url, string token, string cmd)
 | 
						||
        {
 | 
						||
            try
 | 
						||
            {
 | 
						||
                var httpClientHandle = new HttpClientHandler();
 | 
						||
                httpClientHandle.ServerCertificateCustomValidationCallback = HttpClientHandler.DangerousAcceptAnyServerCertificateValidator;
 | 
						||
                var client = new HttpClient(httpClientHandle);
 | 
						||
                var opt = new GraphQLHttpClientOptions() { EndPoint = new Uri(url) };
 | 
						||
                var graphQLClient = new GraphQLHttpClient(opt, new NewtonsoftJsonSerializer(), client);
 | 
						||
                var request = new GraphQL.GraphQLRequest
 | 
						||
                {
 | 
						||
                    Query = cmd
 | 
						||
                };
 | 
						||
 | 
						||
                graphQLClient.HttpClient.DefaultRequestHeaders.UserAgent.Add(new ProductInfoHeaderValue("Safari", "537.36"));
 | 
						||
                graphQLClient.HttpClient.DefaultRequestHeaders.Add("Authorization", "Bearer " + token);
 | 
						||
                graphQLClient.HttpClient.DefaultRequestHeaders.Add("ContentType", "application/json");
 | 
						||
                var graphQLResponse = await graphQLClient.SendQueryAsync<T>(request);
 | 
						||
                return graphQLResponse.Data;
 | 
						||
            }
 | 
						||
            catch { }
 | 
						||
            return default;
 | 
						||
        }
 | 
						||
        private async Task HandleMqttDevice(Device device)
 | 
						||
        {
 | 
						||
            try
 | 
						||
            {
 | 
						||
                var factory = new MqttFactory();
 | 
						||
                var mqttClient = factory.CreateMqttClient();
 | 
						||
 | 
						||
                var options = new MqttClientOptionsBuilder()
 | 
						||
                    .WithTcpServer(device.IpAddress, device.Port)
 | 
						||
                    .WithCredentials(device.username, device.password)
 | 
						||
                    .WithClientId(Guid.NewGuid().ToString())
 | 
						||
                    .Build();
 | 
						||
 | 
						||
                mqttClient.ConnectedAsync += async e =>
 | 
						||
                {
 | 
						||
                    Console.WriteLine($"设备 {device.Id} 连接成功");
 | 
						||
                    await mqttClient.SubscribeAsync(new MqttTopicFilterBuilder().WithTopic(device.Topic).Build());
 | 
						||
                };
 | 
						||
 | 
						||
                mqttClient.DisconnectedAsync += async e =>
 | 
						||
                {
 | 
						||
                    Console.WriteLine($"设备 {device.Id} 掉线,尝试重连...");
 | 
						||
                    device.IsConnected = false;
 | 
						||
                    await HandleDeviceConnection(device);
 | 
						||
                };
 | 
						||
 | 
						||
                mqttClient.ApplicationMessageReceivedAsync += e =>
 | 
						||
                {
 | 
						||
                    // 接收到 MQTT 消息后,处理数据
 | 
						||
                    var message = Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment.Array);
 | 
						||
                    device.DataReceived?.Invoke(message);
 | 
						||
                    return Task.CompletedTask;
 | 
						||
                };
 | 
						||
 | 
						||
                await mqttClient.ConnectAsync(options);
 | 
						||
                if (mqttClient.IsConnected)
 | 
						||
                {
 | 
						||
                    _mqttClients[device.Id] = mqttClient; // 保存 MQTT 客户端实例
 | 
						||
                    device.IsConnected = true;
 | 
						||
                }
 | 
						||
 | 
						||
 | 
						||
                //while (device.isconnected)
 | 
						||
                //{
 | 
						||
                //    await task.delay(1000);
 | 
						||
                //}
 | 
						||
 | 
						||
                // 循环结束后断开连接
 | 
						||
                //if (!mqttClient.IsConnected)
 | 
						||
                //{
 | 
						||
                //    await mqttClient.DisconnectAsync();
 | 
						||
                //    mqttClient.Dispose();
 | 
						||
                //}
 | 
						||
 | 
						||
            }
 | 
						||
            catch (Exception ex)
 | 
						||
            {
 | 
						||
                Console.WriteLine($"设备 {device.Id} 连接失败: {ex.Message}");
 | 
						||
                device.IsConnected = false;
 | 
						||
            }
 | 
						||
        }
 | 
						||
        private async Task HandleDeviceConnection(Device device)
 | 
						||
        {
 | 
						||
            int retryDelay = 1000; // 初始重连间隔(毫秒)
 | 
						||
            int maxDelay = 30000;  // 最大重连间隔(30秒)
 | 
						||
 | 
						||
            while (!device.IsConnected)
 | 
						||
            {
 | 
						||
                try
 | 
						||
                {
 | 
						||
                    switch (device.Protocol)
 | 
						||
                    {
 | 
						||
                        case ProtocolType.TCP:
 | 
						||
                            await HandleTcpDevice(device);
 | 
						||
                            break;
 | 
						||
                        case ProtocolType.UDP:
 | 
						||
                            await HandleUdpDevice(device);
 | 
						||
                            break;
 | 
						||
                        case ProtocolType.GraphQL:
 | 
						||
                            await HandleGraphQLDevice(device);
 | 
						||
                            break;
 | 
						||
                        case ProtocolType.MQTT:
 | 
						||
                            await HandleMqttDevice(device);
 | 
						||
                            break;
 | 
						||
                    }
 | 
						||
                    if (device.IsConnected)
 | 
						||
                    {
 | 
						||
                        // 连接成功,重置重连间隔
 | 
						||
                        retryDelay = 1000;
 | 
						||
                    }
 | 
						||
                }
 | 
						||
                catch (Exception ex)
 | 
						||
                {
 | 
						||
                    Console.WriteLine($"设备 {device.Id} 连接失败,{ex.Message},{retryDelay}ms 后重试...");
 | 
						||
                }
 | 
						||
 | 
						||
                Console.WriteLine($"设备 {device.Id} 连接失败,{retryDelay}ms 后重试...");
 | 
						||
                await Task.Delay(retryDelay);
 | 
						||
                // 指数退避算法,避免频繁重试
 | 
						||
                retryDelay = Math.Min(retryDelay * 2, maxDelay);
 | 
						||
            }
 | 
						||
        }
 | 
						||
        public async Task<bool> TestConnection(long deviceId)
 | 
						||
        {
 | 
						||
            if (_devices.TryGetValue(deviceId, out var device))
 | 
						||
            {
 | 
						||
                try
 | 
						||
                {
 | 
						||
                    switch (device.Protocol)
 | 
						||
                    {
 | 
						||
                        case ProtocolType.TCP:
 | 
						||
                            using (TcpClient client = new TcpClient())
 | 
						||
                            {
 | 
						||
                                await client.ConnectAsync(device.IpAddress, device.Port);
 | 
						||
                                return true;
 | 
						||
                            }
 | 
						||
                        case ProtocolType.UDP:
 | 
						||
                            using (var udpClient = new UdpClient())
 | 
						||
                            {
 | 
						||
                                await udpClient.SendAsync(new byte[] { 0 }, 1, device.IpAddress, device.Port);
 | 
						||
                                return true;
 | 
						||
                            }
 | 
						||
                        case ProtocolType.GraphQL:
 | 
						||
                            using (var graphQLClient = new GraphQLHttpClient(device.GraphQlEndpoint, new NewtonsoftJsonSerializer()))
 | 
						||
                            {
 | 
						||
                                var request = new GraphQL.GraphQLRequest { Query = "{ deviceStatus { id status } }" };
 | 
						||
                                var response = await graphQLClient.SendQueryAsync<dynamic>(request);
 | 
						||
                                return response.Data != null;
 | 
						||
                            }
 | 
						||
                        case ProtocolType.MQTT:
 | 
						||
                            var factory = new MqttFactory();
 | 
						||
                            var mqttClient = factory.CreateMqttClient();
 | 
						||
                            var options = new MqttClientOptionsBuilder()
 | 
						||
                                .WithTcpServer(device.BrokerAddress)
 | 
						||
                                .WithClientId(Guid.NewGuid().ToString())
 | 
						||
                                .Build();
 | 
						||
                            await mqttClient.ConnectAsync(options);
 | 
						||
                            await mqttClient.DisconnectAsync();
 | 
						||
                            return true;
 | 
						||
                        default:
 | 
						||
                            return false;
 | 
						||
                    }
 | 
						||
                }
 | 
						||
                catch (Exception)
 | 
						||
                {
 | 
						||
                    return false;
 | 
						||
                }
 | 
						||
            }
 | 
						||
            return false;
 | 
						||
        }
 | 
						||
 | 
						||
        public Device GetDevice(long deviceId)
 | 
						||
        {
 | 
						||
            _devices.TryGetValue(deviceId, out var device);
 | 
						||
            return device;
 | 
						||
        }
 | 
						||
    }
 | 
						||
}
 |