Событие добавления нового члена в коллекцию FIFO - C#
Формулировка задачи:
Всем добрый день (утро/вечер)!
Суть такова. Принимаю большое количество данных, записываю их в FIFO. При записи в FIFO выдаю событие. Один из подписчиков события вынимает из FIFO сообщение и начинает длительную обработку. В итоге начинаются проблемы, так как подписчик не "отпускает" событие пока не выполнит обработку => часть передаваемых сообщений теряется (передаются с МК, у него буфер программный переполняется если долго не принимать сообщения). Игрался с потоками, всё едино. Можно сделать for() опрос коллекции например в режиме sleep(5), но это некрасиво.
Итого цель: сделать постоянный приём сообщений, и оповещение о приёме сообщения которое не блочит приём (плюс обработка сообщений в порядке их прихода).
Как сделать событие которое оповестит подписчика и сразу же вернёт управление назад? Может тут какой-то паттерн придуман? тестовый вариант проблемы ниже.
//очередь новых сообщений static Queue<object> qe = new Queue<object>(); //оповещение о новом сообщение public delegate void StatisticMessage(); public static event StatisticMessage newMessage; private static Thread thread1; private static Thread thread2; static void Main(string[] args) { newMessage +=messageProcessor; Thread thread1 = new Thread(AddToFIFO); thread1.Start(); Console.ReadKey(); } //добавляю в очередь 10 сообщений public static void AddToFIFO() { for (var i = 10; i > 0; i--) { string message = i.ToString();; //"в очередь, сукины дети, в очередь" EnqueueQueueMessage(message); } } //функция добавления + оповещение о добавлении public static void EnqueueQueueMessage(object message) { qe.Enqueue(message); try { newMessage(); } catch (Exception e) { Console.WriteLine(e.Message + " " + e.TargetSite + " " + e.StackTrace + "\n"); } } //подписчик оповещения о добавлении public static void messageProcessor() { processing(qe.Dequeue); // в таком виде обрабатывается по порядку, но пока //обработчик 3 секунды не поспит, новое сообщение не принимается /* если сделать тут поток, то вроде всё ок, но последовательность обработки сообщений сбивается. thread2=new Thread(Processing); thread2.Start(qe.Dequeue) предполагаю что нужно делать какую-то блокировку потоков, чтобы они обрабатывали сообщения в том порядке в каком поток вызывается, а не как им вздумается. */ } //обработка сообщения public static void Processing(object message) { Thread.Sleep(3000); Console.WriteLine((string)message); }
Решение задачи: «Событие добавления нового члена в коллекцию FIFO»
textual
Листинг программы
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Producer_Consumer { public class ProducerConsumerQueue: IDisposable { EventWaitHandle wh = new AutoResetEvent(false); Thread worker; object locker = new object(); Queue<string> tasks = new Queue<string>(); public ProducerConsumerQueue() { worker = new Thread(Work); worker.Start(); } public void EnqueueTask(string task) { lock (locker) { tasks.Enqueue(task); wh.Set(); } } public void Dispose() { EnqueueTask(null); worker.Join(); wh.Close(); } void Work() { while (true) { string task = null; lock (locker) { if (tasks.Count > 0) { task = tasks.Dequeue(); if (task == null) return; } } if (task != null) { Console.WriteLine("Produce task: {0}", task.ToString()); Thread.Sleep(2000); } else wh.WaitOne(); } } } }
ИИ поможет Вам:
- решить любую задачу по программированию
- объяснить код
- расставить комментарии в коде
- и т.д