Событие добавления нового члена в коллекцию FIFO - C#
Формулировка задачи:
Всем добрый день (утро/вечер)!
Суть такова. Принимаю большое количество данных, записываю их в FIFO. При записи в FIFO выдаю событие. Один из подписчиков события вынимает из FIFO сообщение и начинает длительную обработку. В итоге начинаются проблемы, так как подписчик не "отпускает" событие пока не выполнит обработку => часть передаваемых сообщений теряется (передаются с МК, у него буфер программный переполняется если долго не принимать сообщения). Игрался с потоками, всё едино. Можно сделать for() опрос коллекции например в режиме sleep(5), но это некрасиво.
Итого цель: сделать постоянный приём сообщений, и оповещение о приёме сообщения которое не блочит приём (плюс обработка сообщений в порядке их прихода).
Как сделать событие которое оповестит подписчика и сразу же вернёт управление назад? Может тут какой-то паттерн придуман? тестовый вариант проблемы ниже.
//очередь новых сообщений
static Queue<object> qe = new Queue<object>();
//оповещение о новом сообщение
public delegate void StatisticMessage();
public static event StatisticMessage newMessage;
private static Thread thread1;
private static Thread thread2;
static void Main(string[] args)
{
newMessage +=messageProcessor;
Thread thread1 = new Thread(AddToFIFO);
thread1.Start();
Console.ReadKey();
}
//добавляю в очередь 10 сообщений
public static void AddToFIFO()
{
for (var i = 10; i > 0; i--)
{
string message = i.ToString();;
//"в очередь, сукины дети, в очередь"
EnqueueQueueMessage(message);
}
}
//функция добавления + оповещение о добавлении
public static void EnqueueQueueMessage(object message)
{
qe.Enqueue(message);
try
{
newMessage();
}
catch (Exception e)
{
Console.WriteLine(e.Message + " " + e.TargetSite + " " + e.StackTrace + "\n");
}
}
//подписчик оповещения о добавлении
public static void messageProcessor()
{
processing(qe.Dequeue); // в таком виде обрабатывается по порядку, но пока
//обработчик 3 секунды не поспит, новое сообщение не принимается
/*
если сделать тут поток, то вроде всё ок, но последовательность обработки сообщений сбивается.
thread2=new Thread(Processing);
thread2.Start(qe.Dequeue)
предполагаю что нужно делать какую-то блокировку потоков, чтобы они обрабатывали
сообщения в том порядке в каком поток вызывается, а не как им вздумается.
*/
}
//обработка сообщения
public static void Processing(object message)
{
Thread.Sleep(3000);
Console.WriteLine((string)message);
}Решение задачи: «Событие добавления нового члена в коллекцию FIFO»
textual
Листинг программы
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Producer_Consumer {
public class ProducerConsumerQueue: IDisposable {
EventWaitHandle wh = new AutoResetEvent(false);
Thread worker;
object locker = new object();
Queue<string> tasks = new Queue<string>();
public ProducerConsumerQueue() {
worker = new Thread(Work);
worker.Start();
}
public void EnqueueTask(string task) {
lock (locker) {
tasks.Enqueue(task);
wh.Set();
}
}
public void Dispose() {
EnqueueTask(null);
worker.Join();
wh.Close();
}
void Work() {
while (true) {
string task = null;
lock (locker) {
if (tasks.Count > 0) {
task = tasks.Dequeue();
if (task == null)
return;
}
}
if (task != null) {
Console.WriteLine("Produce task: {0}", task.ToString());
Thread.Sleep(2000);
}
else
wh.WaitOne();
}
}
}
}