[C#] Blazor练习 依赖注入2

2022-12-01 10:54:30 浏览数 (2)

[C#] Blazor练习 依赖注入

注册可注入依赖项

注入依赖

代码语言:javascript复制
using Microsoft.Extensions.Logging;
using MQTTnet;
using MQTTnet.Server;
using System;
using System.Text;
using System.Threading.Tasks;
using MQTTnet.Client.Receiving;
using System.Collections.Generic;


namespace MqttBrokerWithDashboard.MqttBroker
{
    public class MqttBrokerService : IMqttServerClientConnectedHandler, IMqttServerClientDisconnectedHandler, IMqttApplicationMessageReceivedHandler, IMqttServerClientMessageQueueInterceptor
    {
        readonly ILogger _log;

        public IMqttServer Server { get; set; }

        readonly object _thisLock = new();

        List<MqttMessage> _messages = new();
        public IReadOnlyList<MqttMessage> Messages
        {
            get
            {
                lock (_thisLock)
                {
                    return _messages?.AsReadOnly();
                }
            }
        }

         Dictionary<string, List<MqttMessage>> _messagesByTopic = new();
        // MemoryCache _messagesByTopic = new MemoryCache();

        public IReadOnlyDictionary<string, List<MqttMessage>> MessagesByTopic
        {
            get
            {
                lock (_thisLock)
                {
                    return _messagesByTopic as IReadOnlyDictionary<string, List<MqttMessage>>;
                }
            }
        }

        List<MqttClient> _connectedClients = new();
        public IReadOnlyList<MqttClient> ConnectedClients
        {
            get
            {
                lock (_thisLock)
                {
                    return _connectedClients?.AsReadOnly();
                }
            }
        }


        public event Action<MqttServerClientConnectedEventArgs> OnClientConnected;
        public event Action<MqttServerClientDisconnectedEventArgs> OnClientDisconnected;
        public event Action<MqttApplicationMessageReceivedEventArgs> OnMessageReceived;


        public MqttBrokerService(ILogger<MqttBrokerService> log) =>
            _log = log;


        Task IMqttServerClientConnectedHandler.HandleClientConnectedAsync(MqttServerClientConnectedEventArgs e)
        {
            lock (_thisLock) _connectedClients.Add(new MqttClient
            {
                TimeOfConnection = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"),
                ClientId = e.ClientId,
                AllowSend = true,
                AllowReceive = true,
            });

            _log.LogInformation($"Client connected: {e.ClientId}");

            OnClientConnected?.Invoke(e);
            return Task.CompletedTask;
        }

        Task IMqttServerClientDisconnectedHandler.HandleClientDisconnectedAsync(MqttServerClientDisconnectedEventArgs e)
        {
            lock (_thisLock)
            {
                var client = _connectedClients.Find(x => x.ClientId == e.ClientId);
                if (client == null)
                {
                    _log.LogError($"Unkownd client disconnected: {e.ClientId}");
                    return Task.CompletedTask;
                }

                _connectedClients.Remove(client);
            }

            _log.LogInformation($"Client disconnected: {e.ClientId}");

            OnClientDisconnected?.Invoke(e);
            return Task.CompletedTask;
        }

        Task IMqttApplicationMessageReceivedHandler.HandleApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs e)
        {
            var topic = e.ApplicationMessage.Topic;
            var payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);

            lock (_thisLock)
            {
                var client = _connectedClients.Find(x => x.ClientId == e.ClientId);
                var message = new MqttMessage
                {
                    Timestamp = DateTime.Now,
                    Client = client,
                    Topic = topic,
                    Payload = payload,
                    Original = e.ApplicationMessage,
                };
                if (_messages.Count >100){
                    _messages.RemoveAt(_messages.Count-1);
                }
                _messages.Insert(0, message);



                if (_messagesByTopic.ContainsKey(topic)){
                    if (_messagesByTopic[topic].Count >100){
                    _messagesByTopic[topic].RemoveAt(_messagesByTopic[topic].Count-1);
                    }
                    _messagesByTopic[topic].Insert(0,  message);
                    }
                else
                    _messagesByTopic[topic] = new List<MqttMessage> { message };


            }

            _log.LogInformation($"OnMessageReceived: {topic} {payload}");

            OnMessageReceived?.Invoke(e);
            return Task.CompletedTask;
        }

        Task IMqttServerClientMessageQueueInterceptor.InterceptClientMessageQueueEnqueueAsync(MqttClientMessageQueueInterceptorContext context)
        {

            return Task.CompletedTask;
        }


        public void Publish(MqttApplicationMessage message) =>
            _ = Server?.PublishAsync(message);

        public void Publish(string topic, byte[] payload, bool retain) =>
            Publish(new MqttApplicationMessageBuilder()
                .WithTopic(topic)
                .WithPayload(payload)
                .WithRetainFlag(retain)
                .Build());

        public void Publish(string topic, string payload, bool retain) =>
            Publish(new MqttApplicationMessageBuilder()
                .WithTopic(topic)
                .WithPayload(payload)
                .WithRetainFlag(retain)
                .Build());
    }
}

0 人点赞