西门子PLC数据读取 Observer设计模式
当我听到这个需求的时候,我差点爆粗口(实际上可能已经爆了,不过我忘了)。
需求刚开始是:
C#连接PLC Modbus读取值。
我用C#写完了,觉得太简单了,还弄了个窗体。
接着是:
只读值那太简单了吧?这个不就一行代码的事情嘛,要监听值的变化并回调。
好的,我花一下午弄了出来,并整合到.NET项目里面。
又来了:
要监听值的变化,要定时读取值并回调,要比较值的大小,大于或小于并回调,动态订阅,并支持多台PLC设备,用JSON配置文件配置。
FUCK!
那能咋办呢,做呗。
设计思路
首先想到的就是线程池和订阅模式。
提供一个核心类,比如这个类叫做ModBusMonitor,在多台主机的情况下:
ModBusMonitor plc1 = new ModBusMonitor(“PLC1”)
ModBusMonitor plc2 = new ModBusMonitor(“PLC2”)
这种方式可以保证多台主机的隔离。
接着是订阅模式。
ModBusMonitor对外提供订阅接口,定义一个Observer接口,创建多个Subscriber实现Observer:
比如当值改变时订阅接口:ValueChangeSubscriber
值比较时的订阅接口:ValueComparSubscriber
同时每个订阅接口内部使用回调。
因为我考虑的是,一台主机可以对N个地址进行N个不同的订阅,所以这是比较灵活的实现方式。
所以回调隔离开,一个Subscriber一个Callback。
ValueChangeSubscriber:
public class ValueChangeSubscriber : IObserver
{
public ushort Address;
public IValueChangeCallBack callBack;
public ValueChangeSubscriber(IValueChangeCallBack _callBack, ushort _address)
{
this.callBack = _callBack;
this.Address = _address;
}
public void Reviced(string Name, ushort Address, ushort Value, bool ComparResult)
{
}
public void Reviced(string Name, ushort Address, ushort NowValue)
{
callBack.OnChanged(Name, Address, NowValue);
}
}
ModBusMonitor实现:
using HslCommunication;
using HslCommunication.ModBus;
using Microsoft.Extensions.Configuration;
using PLC_TEST.PLC.Exception;
using PLC_TEST.PLC.Model;
using SD.PLC.Service.Model;
using System;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Text;
using System.Threading;
namespace PLC_TEST.PLC
{
public class ModBusMonitor
{
private HashSet observers = new HashSet();
private Dictionary valueBuff = new Dictionary();
private string PLC_NAME;
private Machine machine;
private int READ_INTERVAL;
private int _IS_RUN = 1;
private ModbusTcpNet TCP_CLIENT = null;
public ModBusMonitor(string pLC_NAME, int rEAD_INTERVAL)
{
PLC_NAME = pLC_NAME;
READ_INTERVAL = rEAD_INTERVAL;
//TODO check conn is already exists start thread to pool
//
machine = Global.GetMachine(PLC_NAME);
try
{
TCP_CLIENT = new ModbusTcpNet(machine.ip, machine.port);
OperateResult result = TCP_CLIENT.ConnectServer();
if (result.IsSuccess)
{
// TODO
}
else
{
throw new System.Exception(result.Message);
}
}
catch (SocketException SE)
{
throw SE;
}
}
public string GetKey(int address, object o) {
string str = "{0}|{1}";
if (o.GetType() == typeof(ValueChangeSubscriber))
{
return String.Format(str, address, 0);
}
else if (o.GetType() == typeof(ValueComparSubscriber))
{
return String.Format(str, address, 1);
}
else if (o.GetType() == typeof(ValueMonitorSubscriber))
{
return String.Format(str, address, 2);
}
else {
throw new UnknownTypeException("未知类型");
}
}
public void CancelSubscribe(IObserver observer) {
observers.Remove(observer);
}
public void Subscribe(IObserver observer) {
if (observer.GetType() == typeof(ValueChangeSubscriber))
{
ValueChangeSubscriber subscriber = (ValueChangeSubscriber)observer;
Thread thread = new Thread(new ParameterizedThreadStart(ValueChangeTask));
thread.Start(subscriber);
observers.Add(subscriber);
}
else if (observer.GetType() == typeof(ValueComparSubscriber))
{
ValueComparSubscriber subscriber = (ValueComparSubscriber)observer;
Thread thread = new Thread(new ParameterizedThreadStart(ValueComparTask));
thread.Start(subscriber);
observers.Add(subscriber);
}
else if (observer.GetType() == typeof(ValueMonitorSubscriber))
{
ValueMonitorSubscriber subscriber = (ValueMonitorSubscriber)observer;
Thread thread = new Thread(new ParameterizedThreadStart(ValueMonitorTask));
thread.Start(subscriber);
observers.Add(subscriber);
}
else {
throw new UnknownTypeException("未知类型");
}
}
public void ValueChangeTask(object obj) {
ValueChangeSubscriber subscriber = obj as ValueChangeSubscriber;
do
{
Thread.Sleep(READ_INTERVAL);
ushort newValue = TCP_CLIENT.ReadUInt16(String.Format("s={0};{1}", machine.sid, subscriber.Address)).Content;
string key = GetKey(subscriber.Address, subscriber);
bool isExist = valueBuff.ContainsKey(key);
if (isExist)
{
ushort val = valueBuff[key];
if (val != newValue)
{
valueBuff[key] = newValue;
subscriber.Reviced(machine.name, subscriber.Address, newValue);
}
}
else {
valueBuff[key] = newValue;
}
} while (_IS_RUN == 1&&observers.Contains(subscriber));
}
public void ValueComparTask(object obj) {
ValueComparSubscriber subscriber = obj as ValueComparSubscriber;
do
{
Thread.Sleep(READ_INTERVAL);
ushort newValue = TCP_CLIENT.ReadUInt16(String.Format("s={0};{1}", machine.sid, subscriber.Address)).Content;
if (newValue > subscriber.Value)
{
subscriber.Reviced(machine.name, subscriber.Address, newValue, true);
}
else {
subscriber.Reviced(machine.name, subscriber.Address, newValue, false);
}
} while (_IS_RUN == 1 && observers.Contains(subscriber));
}
public void ValueMonitorTask(object obj)
{
ValueMonitorSubscriber subscriber = obj as ValueMonitorSubscriber;
do
{
Thread.Sleep(subscriber.Interval);
ushort newValue = TCP_CLIENT.ReadUInt16(String.Format("s={0};{1}", machine.sid, subscriber.Address)).Content;
subscriber.Reviced(machine.name, subscriber.Address, newValue);
} while (_IS_RUN == 1 && observers.Contains(subscriber));
}
public void Free() {
_IS_RUN = 0;
observers.Clear();
TCP_CLIENT.Dispose();
}
}
}
每个Subscriber都是一个线程,提供取消订阅和释放主机。
使用时:
static void Main(string[] args)
{
ModBusMonitor modbusMonitor = new ModBusMonitor("PLC1", 1000);
ValueMonitorSubscriber v1 = new ValueMonitorSubscriber(new CallBack(), 1, 1000);
modbusMonitor.Subscribe(v1);
Thread.Sleep(2000);
//modbusMonitor.CancelSubscribe(v1);
modbusMonitor.Free();
}
}
public class CallBack : IValueMonitorCallBack
{
public void OnBack(string name, int address, ushort val)
{
Console.WriteLine(val.ToString());
}
}
public class CallBack1 : IValueChangeCallBack
{
public void OnChanged(string name, int address, ushort val)
{
Console.WriteLine("CALL BACK 1");
}
}
很完美,平平无奇却又有些精巧。
:)