开发者

C#使用channel实现Plc异步任务之间的通信

channel 通信的例子:

using ConsoleApp2;
using System.Collections.Concurrent;
using System.Threading.Channels;

var queue = new blockingCollection<Message>(new ConcurrentQueue<Message>());

var opt = new BoundedChannelOptions(10)
{
    FullMode = BoundedChannelFullMode.Wait,
    SingleReader = true,
    SingleWriter = true,
    Capacity = 100 //最大容量
};

//有限的
var channelTest = Channel.CreateBounded<Message>(opt);


//无限的
var channel = Channel.CreateUnbounded<Message>();

var sender1 = SendMessageThreadAsync(channel.Writer, 1);
var sender2 = SendMessageThreadAsync(channel.Writer, 2);
var receiver1 = ReceiveMessageThreadAsync(channel.Reader, 3);
var receiver2 = ReceiveMessageThreadAsync(channel.Reader, 4);
//await sender;
// make sure all messages are received

await Task.WhenAll(sender1, sender2);

channel.Writer.Complete();

await Task.WhenAll(receiver1, receiver2);

//await receiver;

Console.WriteLine("Press any key to exit...");
Console.ReadKey();

async Task SendMessageThreadAsync(ChannelWriter<Message> writer, int id)
{
    for (int i = 0; i < 20; i++)
    {
        await writer.WriteAsync(new Message(id, i.ToString()));
        Console.WriteLine($"Thread {id} sent {i}");
        await Task.Delay(100);
    }
}

async Task ReceiveMessageThreadAsync(ChannelReader<Message> reader, int id)
{

    //try
    //{
    //    while (!reader.Completion.IsCompleted)
    //    {
    //        var message = await reader.ReadAsync();
    //        Console.WriteLine($"Thread {id} received {message.Content}");
    //    }
    //}
    //catch (Exception ex)
    //{
    //    Console.WriteLine($"Thread {id} channel closed:{ex.Message}");
    //}

    await foreach (var message in reader.ReadAllAsync())
    {
        Console.WriteLine($"Thread {id} received {message.Content}");
    }
}

record Message(int FromId, string Content);






改造为Plc的实例

record PlcDataMessage  
{  
    public bool IsConnected { get; init; }  
    public DbData DbData { get; init; }  
    // 可以添加其他需要传递的信息  
}
// 创建一个无边界的Channel来发送和接收消息  
var plcDataChannel = Channel.CreateUnbounded<PlcDataMessage>();  
  
// 启动一个新的任务来模拟PLC数据读取  
Task.Factory.StartNew(async () =>  
{  
    var cts = new CancellationTokenSource(); // 假设您已经有了取消令牌源  
    while (!cts.IsCancellationRequested)  
    {  
        try  
        {  
            // ... 省略了连接PLC的代码,这部分逻辑保持不变 ...  
  
            if (MyIsConnected)  
            {  
                DbData dbDataTemp = await s7Plc.ReadClassAsync<DbData>(42, 0);  
  
                // 心跳和其他操作...  
  
                // 构造消息并发送到Channel  
                var message = new PlcDataMessage  
                {  
                    IsConnected = MyIsConnected,  
                    DbData = dbDataTemp  
                };  
                await plcDataChannel.Writer.WriteAsync(message, cts.Token);  
            }  
  
            // ... 其他逻辑保持不变 ...  
        }  
        catch (Exception ex)  
        {  
            // 处理异常并重新连接PLC(如果需要)  
            // ...  
  
            // 可以通过Channel发送一个特殊的消息来表示连接已断开或发生了错误  
            // 这里省略了这部分逻辑  
  
            // www.devze.com休眠一段时间后再重试  
            await Task.Delay(2000, cts.Token);  
        }  
    }  
  
    // 完成后通知Channel不再发送更多数据  
    plcDataChannel.Writer.Complete();  
}, cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);  
  
// 在另一个任务或线程中读取Channel中的数据  
Task.Run(async () =>  
{  
    aw编程客栈ait foreach (var message in plcDataChannel.Reader.ReadAllAsync(cts.Token))  
    {  
        if (message.IsConnected)  
        {  
            lock (lockObj)  
            {  
                // 更新dbData,这里假设dbData是一个线程安全的对象或结构  
                dbData.Str_S = message.DbData.Str_S.Trim();  
                // ... 更新其他属性 ...  
            }  
            // 处理读取到的数据...  
        }  
        else  
        {  
            // 处理PLC断开连接的情况...  
        }  
    }  
  
    // 读取完成,Channel已关闭  
    Console.WriteLine("PLC数据读取完毕。");  
}, cts.Token);  
  
// ... 其他代码,如等待所有任务完成、处理取消逻辑等 ...
using System;  
using System.Threading;  
using System.Threading.Channels;  
using System.Threading.Tasks;  
  
// ... 其他必要的引用和类型定义 ...  
  
// 创建一个无边界编程客栈的Channel来发送和接收消息  
var plcDataChannel = Channel.CreateUnbounded<PlcDataMessage>();  
  
// 取消令牌源  
var cts = new CancellationTokenSource();  
  
// 启动一个新的任务来模拟PLC数据读取  
Task.Run(async () =>  
{  
    Plc s7Plc = null;  
    bool MyIsConnected = false;  
    int errorTimes = 0;  
  
    try  
    {  
        while (!cts.IsCancellationRequested)  
        {  
            if (s7Plc == null || !MyIsConnected)  
            {  
                // 尝试连接PLC(略去具体实现)  
                // ...  
  
                if (MyIsConnected)  
                {  
                    // 连接成功,发送连接成功消息(如果需要)  
                    // ...  
                }  
            }  
            else  
            {  
                try  
                {  
                    // 读取PLC数据(略去具体实现)  
                    DbData dbDataTemp = await s7Plc.ReadClassAsync<DbData>(42, 0, cts.Token);  
  
                    // 心跳和其他操作...  
  
                    // 构造消息并发送到Channel  
                    var message = new PlcDataMessage { IsConnected = MyIsConnected, DbData = dbDataTemp };  
                    await plcDataChannel.Writer.WriteAsync(message, cts.Token);  
  
                    errorTimes = 0; // 重置错误计数器  
                }  
                catch (Exception ex)  
                {  
   编程客栈                 errorTimes++;  
                    // 处理异常(例如记录日志)  
                    // ...  
  
                    // 在达到一定错误次数后,关闭PLC连接并重置  
                    if (errorTimes > someThreshold)  
                    {  
                        s7Plc?.Close();  
                        s7Plc = null;  
                        MyIsConnected = false;  
                        // 可以选择发送一个断开连接的消息到Channel  
                    }  
  
                    // 休眠一段时间后再重试  
                    await Task.Delay(2000, cts.Token);  
                }  
            }  
  
            // 可以添加一些延时来减少循环的频率  
            await Task.Delay(somePollingInterval, cts.Token);  
        }  
    }  
    catch (OperationCanceledExceptionandroid)  
    {  
        // 取消是预期的,不需要额外处理  
    }  
    finally  
    {  
        // 确保关闭PLC连接和Channel写入器  
        s7Plc?.Close();  
        plcDataChannel.Writer.Complete();  
    }  
}, cts.Token);  
  
// 在另一个任务或线程中读取Channel中的数据  
Task.Run(async () =>  
{  
    await foreach (var message in plcDataChannel.Reader.ReadAllAsync(cts.Token))  
    {  
        if (message.IsConnected)  
        {  
            // 更新dbData(这里假设dbData是一个线程安全的对象或结构)  
            // 根据需要添加适当的同步机制  
            // ...  
  
            // 处理读取到的数据...  
        }  
        else  
        {  
            // 处理PLC断开连接的情况...  
        }  
    }  
  
    // 读取完成,Channel已关闭  
    Console.WriteLine("PLC数据读取完毕。");  
}, cts.Token);  
  
// ... 其他代码,如等待所有任务完成、处理取消逻辑等 ...  
  
// 在某个适当的时刻取消任务  
// cts.Cancel();  
  
// 等待所有任务完成(如果需要

拓展:C# Channel实现线程间通信

C# Channel实现线程间通信

同步方式实现:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    public class ChannelDemo
    {
        static Channel<Message> channel1 = Channel.CreateUnbounded<Message>();
        public static void Main2()
        {
            sender.Start(1);
            receive1.Start(2);
            receive2.Start(3);
            sender.Join();
            Thread.Sleep(3000);
            receive1.Interrupt();
            receive2.Interrupt();

            receive1.Join();
            receive2.Join();


            Console.ReadKey();
        }
        static Thread sender = new Thread(SendMsg);

        static Thread receive1 = new Thread(ReceiveMsg);
        static Thread receive2 = new Thread(ReceiveMsg);

        static void SendMsg(object id)
        {
            for (int i = 0; i < 20; i++)
            {
                if (channel1.Writer.TryWrite(new Message((int)id, i.ToString())))
                {
                    Console.WriteLine($"【线程{id}】发送了【{i}】");
                }
            }
        }

        static void ReceiveMsg(object id)
        {
            try
            {
                while (true)
                {
                    if (channel1.Reader.TryRead(out Message message))
                    {
                        Console.WriteLine($"【线程{id}】从【线程{message.id}】接收了【{message.content}】");

                    }
                    Thread.Sleep(1);
                }
            }
            catch (ThreadInterruptedException ex)
            {
                Console.WriteLine($"接收结束");
            }
        }
    }
}


异步方式:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.Remoting.Channels;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    public class ChannelDemo2
    {
        static Channel<Message> channel1 = Channel.CreateUnbounded<Message>();
        
         public static async void Main2()
        {
            await Task.WhenAll(sender, sender2);
            channel1.Writer.Complete();
            await Task.WhenAll(receive1, receive2);
           
            Console.ReadKey();
        }

        static Task sender = SendMsgAsync(channel1.Writer, 1);
        static Task sender2 = SendMsgAsync(channel1.Writer, 4);
        static Task receive1 = ReceiveMsgAsync(channel1.Reader, 2);
        static Task receive2 = ReceiveMsgAsync(channel1.Reader, 3);

        static async  Task SendMsgAsync(ChannelWriter<Message> writer, int id)
        {
            for (int i = 0; i < 20; i++)
            {
                await writer.WriteAsync(new Message((int)id, i.ToString()));
                Console.WriteLine($"【线程{id}】发送了【{i}】");
            }
        }

        static async Task ReceiveMsgAsync(ChannelReader<Message> reader,int id)
        {
            try
            {
                while (!reader.Completion.IsCompleted)
                {
                    Message message = await reader.ReadAsync();           
                    Console.WriteLine($"【线程{id}】从【线程{message.id}】接收了【{message.content}】");
                   
                }
            }
            catch (ChannelClosedException ex)
            {
                Console.WriteLine($"ChannelClosed 接收结束");
            }
        }

    }
}


在对Channel进行实例化的时候,也可以传递一个Options,这里面可以对消息容量,是否多个发送者和接受者进行定义。

以上就是C#使用channel实现Plc异步任务之间的通信的详细内容,更多关于C# channel Plc异步通信的资料请关注编程客栈(www.devze.com)其它相关文章!

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新开发

开发排行榜