西门子PLC数据读取 Observer设计模式


当我听到这个需求的时候,我差点爆粗口(实际上可能已经爆了,不过我忘了)。

需求刚开始是:

C#连接PLC Modbus读取值。

我用C#写完了,觉得太简单了,还弄了个窗体。

接着是:

只读值那太简单了吧?这个不就一行代码的事情嘛,要监听值的变化并回调。

好的,我花一下午弄了出来,并整合到.NET项目里面。

又来了:

要监听值的变化,要定时读取值并回调,要比较值的大小,大于或小于并回调,动态订阅,并支持多台PLC设备,用JSON配置文件配置。

FUCK!

那能咋办呢,做呗。

设计思路

首先想到的就是线程池和订阅模式。

提供一个核心类,比如这个类叫做ModBusMonitor,在多台主机的情况下:

ModBusMonitor plc1 = new ModBusMonitor(“PLC1”)

ModBusMonitor plc2 = new ModBusMonitor(“PLC2”)

这种方式可以保证多台主机的隔离。

接着是订阅模式。

ModBusMonitor对外提供订阅接口,定义一个Observer接口,创建多个Subscriber实现Observer:

比如当值改变时订阅接口:ValueChangeSubscriber

值比较时的订阅接口:ValueComparSubscriber

同时每个订阅接口内部使用回调。

因为我考虑的是,一台主机可以对N个地址进行N个不同的订阅,所以这是比较灵活的实现方式。

所以回调隔离开,一个Subscriber一个Callback。

ValueChangeSubscriber:

   public class ValueChangeSubscriber : IObserver
    {

        public ushort Address;

        public IValueChangeCallBack callBack;

        public ValueChangeSubscriber(IValueChangeCallBack _callBack, ushort _address)
        {
            this.callBack = _callBack;
            this.Address = _address;
        }



        public void Reviced(string Name, ushort Address, ushort Value, bool ComparResult)
        {
        }

        public void Reviced(string Name, ushort Address, ushort NowValue)
        {
            callBack.OnChanged(Name, Address, NowValue);
        }
    }

ModBusMonitor实现:

using HslCommunication;
using HslCommunication.ModBus;
using Microsoft.Extensions.Configuration;
using PLC_TEST.PLC.Exception;
using PLC_TEST.PLC.Model;
using SD.PLC.Service.Model;
using System;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Text;
using System.Threading;

namespace PLC_TEST.PLC
{
    public class ModBusMonitor
    {

        private HashSet observers = new HashSet();

        private Dictionary valueBuff = new Dictionary();


        private string PLC_NAME;

        private Machine machine;

        private int READ_INTERVAL;

        private  int _IS_RUN = 1;

        private  ModbusTcpNet TCP_CLIENT = null;


        public ModBusMonitor(string pLC_NAME, int rEAD_INTERVAL)
        {
            PLC_NAME = pLC_NAME;
            READ_INTERVAL = rEAD_INTERVAL;
            //TODO check conn is already  exists  start thread to pool
            //
            machine = Global.GetMachine(PLC_NAME);

            try
            {
                TCP_CLIENT = new ModbusTcpNet(machine.ip, machine.port);
                OperateResult result = TCP_CLIENT.ConnectServer();
                if (result.IsSuccess)
                {
                  // TODO
                }
                else
                {
                    throw new System.Exception(result.Message);
                }
            }
            catch (SocketException SE)
            {
                throw SE;
            }
        }

        public string GetKey(int address, object o) {
            string str = "{0}|{1}";
            if (o.GetType() == typeof(ValueChangeSubscriber))
            {
                return String.Format(str, address, 0);
            }
            else if (o.GetType() == typeof(ValueComparSubscriber))
            {
                return String.Format(str, address, 1);
            }
            else if (o.GetType() == typeof(ValueMonitorSubscriber))
            {
                return String.Format(str, address, 2);
            }
            else {
                throw new UnknownTypeException("未知类型");
            }
        }

        public void CancelSubscribe(IObserver observer) {
            observers.Remove(observer);
        }


        public void Subscribe(IObserver observer) {
            if (observer.GetType() == typeof(ValueChangeSubscriber))
            {
                ValueChangeSubscriber subscriber = (ValueChangeSubscriber)observer;
                Thread thread = new Thread(new ParameterizedThreadStart(ValueChangeTask));
                thread.Start(subscriber);
                observers.Add(subscriber);
            }
            else if (observer.GetType() == typeof(ValueComparSubscriber))
            {
                ValueComparSubscriber subscriber = (ValueComparSubscriber)observer;
                Thread thread = new Thread(new ParameterizedThreadStart(ValueComparTask));
                thread.Start(subscriber);
                observers.Add(subscriber);
            }
            else if (observer.GetType() == typeof(ValueMonitorSubscriber))
            {
                ValueMonitorSubscriber subscriber = (ValueMonitorSubscriber)observer;
                Thread thread = new Thread(new ParameterizedThreadStart(ValueMonitorTask));
                thread.Start(subscriber);
                observers.Add(subscriber);
            }
            else {
                throw new UnknownTypeException("未知类型");
            }
        }

        public void ValueChangeTask(object obj) {
            ValueChangeSubscriber subscriber = obj as ValueChangeSubscriber;
            do
            {
                Thread.Sleep(READ_INTERVAL);
                ushort newValue = TCP_CLIENT.ReadUInt16(String.Format("s={0};{1}", machine.sid, subscriber.Address)).Content;
                string key = GetKey(subscriber.Address, subscriber);
                bool isExist = valueBuff.ContainsKey(key);
                if (isExist)
                {
                    ushort val = valueBuff[key];
                    if (val != newValue)
                    {
                        valueBuff[key] = newValue;
                        subscriber.Reviced(machine.name, subscriber.Address, newValue);
                    }
                }
                else {
                    valueBuff[key] = newValue;
                }
            } while (_IS_RUN == 1&&observers.Contains(subscriber));
        }

        public void ValueComparTask(object obj) {
            ValueComparSubscriber subscriber = obj as ValueComparSubscriber;
            do
            {
                Thread.Sleep(READ_INTERVAL);
                ushort newValue = TCP_CLIENT.ReadUInt16(String.Format("s={0};{1}", machine.sid, subscriber.Address)).Content;
                if (newValue > subscriber.Value)
                {
                    subscriber.Reviced(machine.name, subscriber.Address, newValue, true);
                }
                else {
                    subscriber.Reviced(machine.name, subscriber.Address, newValue, false);
                }
            } while (_IS_RUN == 1 && observers.Contains(subscriber));
        }

        public void ValueMonitorTask(object obj)
        {
            ValueMonitorSubscriber subscriber =  obj as ValueMonitorSubscriber;
            do
            {
                Thread.Sleep(subscriber.Interval);
                ushort newValue = TCP_CLIENT.ReadUInt16(String.Format("s={0};{1}", machine.sid, subscriber.Address)).Content;
                subscriber.Reviced(machine.name, subscriber.Address, newValue);
            } while (_IS_RUN == 1 && observers.Contains(subscriber));
        }



        public void Free() {
            _IS_RUN = 0;
            observers.Clear();
            TCP_CLIENT.Dispose();
        }

    }
}

每个Subscriber都是一个线程,提供取消订阅和释放主机。

使用时:

   static void Main(string[] args)
        {
            ModBusMonitor modbusMonitor = new ModBusMonitor("PLC1", 1000);
            ValueMonitorSubscriber v1 = new ValueMonitorSubscriber(new CallBack(), 1, 1000);
            modbusMonitor.Subscribe(v1);

            Thread.Sleep(2000);
            //modbusMonitor.CancelSubscribe(v1);
            modbusMonitor.Free();

        }
    }

    public class CallBack : IValueMonitorCallBack
    {
        public void OnBack(string name, int address, ushort val)
        {
            Console.WriteLine(val.ToString());
        }
    }

    public class CallBack1 : IValueChangeCallBack
    {
        public void OnChanged(string name, int address, ushort val)
        {
            Console.WriteLine("CALL BACK 1");
        }
    }

很完美,平平无奇却又有些精巧。

:)