Производительный приём данных в отдельном потоке - C#
Формулировка задачи:
Как грамотно организовать приём данных от внешнего устройства?
Внешнее устройство Cypress FX3 USB 3.0 режим Bulk, способно выдавать данные со скоростью до 200MB/s, источник данных умеет притормаживать выдачу, если буфер в Cypress FX3 занят.
Приём данных идет в отдельном потоке с помощью конкурентной очереди. Скорость не удовлетворительная и сильно прыгает от 10 до 70MB/s.
Основной поток обработки дергается асинхронно с помощью BeginInvoke и выбирает данные из FIFO, если они там есть (аналог прерывания от потока слушателя).
Изначально я пробовал использовать BeginInvoke и lock в основном потоке, но оказалось, что пакеты таким способом путаются и приходят в основной поток не в порядке их прихода по USB.
Конкурентное FiFO объявлено таким образом:
Возможности задавать начальное capacity у класса нет - но оно хранит только ссылки на массивы байт
public void ListenerThread()
{
inBuf = new byte[BulkInEndPt.MaxPktSize];
BulkInEndPt.TimeOut = CyConst.INFINITE;
Print.BeginInvoke("ADC Listener started. MaxPktSize = " + BulkInEndPt.MaxPktSize + ", Address = 0x" + BulkInEndPt.Address.ToString("X2"), null, this);
startedEvent.Set();
while (true)
{
buf_size = inBuf.Length;
bResult = BulkInEndPt.XferData(ref inBuf, ref buf_size);
if (bResult && buf_size != 0)
{
Program.afifo.Enqueue(inBuf.Take(buf_size).ToArray()); // copy
NewData.BeginInvoke(pack_cnt++, null, null, this);
//Print.BeginInvoke("ADC listener recieved " + buf_size + " bytes. Read result: " + bResult, null, this);
}
}
}public static ConcurrentQueue<byte[]> afifo = new ConcurrentQueue<byte[]>();
Решение задачи: «Производительный приём данных в отдельном потоке»
textual
Листинг программы
if (bResult && buf_size != 0)
{
byte[] bytes = new byte[buf_size];
Array.Copy(inBuf, bytes, buf_size);
Program.afifo.Enqueue(bytes);
NewData.BeginInvoke(pack_cnt++, null, null, this);
}