IObserver 与 DiagnosticSource


观察者模式 IObserver

  观察者(Observer)模式的定义:指多个对象间存在一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并被自动更新。这种模式有时又称作发布-订阅模式。

适用于基于推送通知的方案。其类图如下:

在 .NET 中,通过实现泛型 System.IObservable 和 System.IObserver 接口来应用观察者设计模式。具体的代码实现见MSDN。

DiagnosticSource

  DiagnosticSource 是观察者模式的一个实现,类库提过了一组API发出程序运行时的日志诊断记录,程序可以订阅这些日志记录。这些日志可以包含丰富的额外信息,帮助诊断程序内部发生的事情。

.net 类库 如 AspNetCore, EntityFrameworkCore, HttpClient, and SqlClient, 等第三方库cap,MassTransit,Elasticsearch 的.NET客户端都有DiagnosticSource 实现。

  DiagnosticSource是一个抽象类,所以真正使用的是DiagnosticListener类。

如何定义自己的DiagnosticSource

 1.创建DiagnosticListener。

 2.编写DiagnosticListener 事件的扩展方法。

   public static class LockNessDiagNosticListenerExtension
    {
        public const string DiagnosticListenerName = "NessClientDiagnosticListener";
        private const string NessClientPrefix = "Custmer.Ness.";
        public const string NessBeforeExecuteCommand = NessClientPrefix + nameof(WriteBefore);
        public const string NessAfterExecuteCommand = NessClientPrefix + nameof(WriteAfter);

        public static Guid WriteBefore(this DiagnosticListener @this, [CallerMemberName] string operation = "")
        {
            
            if (@this.IsEnabled(NessBeforeExecuteCommand))
            {
                Guid operationId = Guid.NewGuid();

                @this.Write(
                    NessBeforeExecuteCommand,
                    new
                    {
                        OperationId = operationId,
                        Operation = operation,
                        Command = "WriteCommandBefore"
                    });

                return operationId;
            }
            else
                return Guid.Empty;
        }

        public static void WriteAfter(this DiagnosticListener @this, Guid operationId,long bgtime, [CallerMemberName] string operation = "")
        {
            if (@this.IsEnabled(NessAfterExecuteCommand))
            {
                @this.Write(
                    NessAfterExecuteCommand,
                    new
                    {
                        OperationId = operationId,
                        Operation = operation,
                        Command = "WriteCommandAfter",
                        Timestamp = Stopwatch.GetTimestamp(),
                        excuttime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - bgtime
                    }) ;;
            }
        }
    }

    public class NessExcutes
    {
        private static readonly DiagnosticListener _diagnosticListener = new DiagnosticListener(LockNessDiagNosticListenerExtension.DiagnosticListenerName);

        public async Task Excue()
        {
            var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
            var gid= _diagnosticListener.WriteBefore();
            await Task.Delay(5000);
            _diagnosticListener.WriteAfter(gid, now);  
        }      
    }

 3.实现事件的订阅

    public class NessDiagnosticObserver : IObserver,
          IObserverstring, object>>
    {
        private readonly List _subscriptions = new List();

        public void OnCompleted()
        {
        }

        public void OnError(Exception error)
        {
        }

        public void OnNext(KeyValuePair<string, object> value)
        {
            Console.WriteLine($"data is initializing {value} ");
            Write(value.Key, value.Value);
        }

        public void OnNext(DiagnosticListener value)
        {
            if (value.Name == "NessClientDiagnosticListener")
            {
                var subscription = value.Subscribe(this);
                _subscriptions.Add(subscription);
            }
        }

        private readonly AsyncLocal _stopwatch = new AsyncLocal();

        private void Write(string name, object value)
        {
            switch (name)
            {
                case "Custmer.Ness.WriteBefore":
                    {
                        _stopwatch.Value = Stopwatch.StartNew();
                        break;
                    }
                case "Custmer.Ness.WriteAfter":
                    {
                        var stopwatch = _stopwatch.Value;
                        stopwatch.Stop();
                        Console.WriteLine($"Elapsed: {stopwatch.Elapsed}");
                        Console.WriteLine();
                        break;
                    }
            }
        }
    }
public static async Task Main()
        {
            var observer = new NessDiagnosticObserver();
            IDisposable subscription = DiagnosticListener.AllListeners.Subscribe(observer);
            OwnLibDiagNostic.NessExcutes nessExcutes = new OwnLibDiagNostic.NessExcutes();
            await nessExcutes.Excue();
        }