Событие добавления нового члена в коллекцию FIFO - C#

Узнай цену своей работы

Формулировка задачи:

Всем добрый день (утро/вечер)! Суть такова. Принимаю большое количество данных, записываю их в FIFO. При записи в FIFO выдаю событие. Один из подписчиков события вынимает из FIFO сообщение и начинает длительную обработку. В итоге начинаются проблемы, так как подписчик не "отпускает" событие пока не выполнит обработку => часть передаваемых сообщений теряется (передаются с МК, у него буфер программный переполняется если долго не принимать сообщения). Игрался с потоками, всё едино. Можно сделать for() опрос коллекции например в режиме sleep(5), но это некрасиво. Итого цель: сделать постоянный приём сообщений, и оповещение о приёме сообщения которое не блочит приём (плюс обработка сообщений в порядке их прихода). Как сделать событие которое оповестит подписчика и сразу же вернёт управление назад? Может тут какой-то паттерн придуман? тестовый вариант проблемы ниже.
//очередь новых сообщений
static Queue<object> qe = new Queue<object>();
 
//оповещение  о новом сообщение
public delegate void StatisticMessage();
public static event StatisticMessage newMessage;
 
private static Thread thread1;
private static Thread thread2;
 
static void Main(string[] args)
{
     newMessage +=messageProcessor;
     Thread thread1 = new Thread(AddToFIFO);
     thread1.Start();
     Console.ReadKey();
}
 
//добавляю в очередь 10 сообщений
public static void AddToFIFO()
{
     for (var i = 10; i > 0; i--)
     {
          string message = i.ToString();;
 
          //"в очередь, сукины дети, в очередь"
          EnqueueQueueMessage(message);
     }
}
 
//функция добавления + оповещение о добавлении
public static void EnqueueQueueMessage(object message)
{
     qe.Enqueue(message); 
 
     try
     {
          newMessage();
     }
     catch (Exception e)
     {
          Console.WriteLine(e.Message + " " + e.TargetSite + " " + e.StackTrace + "\n");
     }
}
 
//подписчик оповещения о добавлении
public static void messageProcessor()
{
     processing(qe.Dequeue); // в таком виде обрабатывается по порядку, но пока 
//обработчик  3 секунды не поспит, новое сообщение не принимается
 
     /*
     если сделать тут поток, то вроде всё ок, но последовательность обработки сообщений сбивается.
 
     thread2=new Thread(Processing);
     thread2.Start(qe.Dequeue)
 
     предполагаю что нужно делать какую-то блокировку потоков, чтобы они обрабатывали 
     сообщения в том порядке в каком поток вызывается, а не как им вздумается.
     */
}
 
//обработка сообщения
public static void Processing(object message)
{
     Thread.Sleep(3000);
     Console.WriteLine((string)message);
}

Решение задачи: «Событие добавления нового члена в коллекцию FIFO»

textual
Листинг программы
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
 
namespace Producer_Consumer {
    public class ProducerConsumerQueue: IDisposable {
        EventWaitHandle wh = new AutoResetEvent(false);
        Thread worker;
        object locker = new object();
        Queue<string> tasks = new Queue<string>();
 
        public ProducerConsumerQueue() {
            worker = new Thread(Work);
            worker.Start();
        }
        public void EnqueueTask(string task) {
            lock (locker) {
                tasks.Enqueue(task);
                wh.Set();
            }
        }
        public void Dispose() {
            EnqueueTask(null);
            worker.Join();
            wh.Close();
        }
        void Work() {
            while (true) {
                string task = null;
                lock (locker) {
                    if (tasks.Count > 0) {
                        task = tasks.Dequeue();
                        if (task == null)
                            return;
                    }
                }
                if (task != null) {
                    Console.WriteLine("Produce task: {0}", task.ToString());
                    Thread.Sleep(2000);
                }
                else
                    wh.WaitOne();
            }
        }
    }
}

ИИ поможет Вам:


  • решить любую задачу по программированию
  • объяснить код
  • расставить комментарии в коде
  • и т.д
Попробуйте бесплатно

Оцени полезность:

14   голосов , оценка 4.143 из 5
Похожие ответы