Производительный приём данных в отдельном потоке - C#

Узнай цену своей работы

Формулировка задачи:

Как грамотно организовать приём данных от внешнего устройства? Внешнее устройство Cypress FX3 USB 3.0 режим Bulk, способно выдавать данные со скоростью до 200MB/s, источник данных умеет притормаживать выдачу, если буфер в Cypress FX3 занят. Приём данных идет в отдельном потоке с помощью конкурентной очереди. Скорость не удовлетворительная и сильно прыгает от 10 до 70MB/s.
        public void ListenerThread()
        {
            inBuf = new byte[BulkInEndPt.MaxPktSize];
            BulkInEndPt.TimeOut = CyConst.INFINITE;
 
            Print.BeginInvoke("ADC Listener started. MaxPktSize = " + BulkInEndPt.MaxPktSize + ", Address = 0x" + BulkInEndPt.Address.ToString("X2"), null, this);
            startedEvent.Set();
 
            while (true)
            {
                buf_size = inBuf.Length;
                bResult = BulkInEndPt.XferData(ref inBuf, ref buf_size);
 
                if (bResult && buf_size != 0)
                {
                    Program.afifo.Enqueue(inBuf.Take(buf_size).ToArray()); // copy
 
                    NewData.BeginInvoke(pack_cnt++, null, null, this);
                    //Print.BeginInvoke("ADC listener recieved " + buf_size + " bytes. Read result: " + bResult, null, this);
                }
            }
        }
Основной поток обработки дергается асинхронно с помощью BeginInvoke и выбирает данные из FIFO, если они там есть (аналог прерывания от потока слушателя). Изначально я пробовал использовать BeginInvoke и lock в основном потоке, но оказалось, что пакеты таким способом путаются и приходят в основной поток не в порядке их прихода по USB. Конкурентное FiFO объявлено таким образом:
public static ConcurrentQueue<byte[]> afifo = new ConcurrentQueue<byte[]>();
Возможности задавать начальное capacity у класса нет - но оно хранит только ссылки на массивы байт

Решение задачи: «Производительный приём данных в отдельном потоке»

textual
Листинг программы
                if (bResult && buf_size != 0)
                {
                    byte[] bytes = new byte[buf_size];
                    Array.Copy(inBuf, bytes, buf_size);
                    Program.afifo.Enqueue(bytes);
 
                    NewData.BeginInvoke(pack_cnt++, null, null, this);
                }

ИИ поможет Вам:


  • решить любую задачу по программированию
  • объяснить код
  • расставить комментарии в коде
  • и т.д
Попробуйте бесплатно

Оцени полезность:

10   голосов , оценка 4 из 5
Похожие ответы