Синхронизация потоков - C#

Узнай цену своей работы

Формулировка задачи:

В общем идея была в том, что создаётся список потоков List threads;. Все эти потоки стартуют выполнять одну и ту же функцию. Абстрактная модель, что должно получиться:
for1(i)
{
	for11(j)
	{
		... у каждого потока свой j (какому потоку какой j - определяется динамически)
	}
	... тут все потоки останавливаются, пока все потоки не закончат работать с for11(j).
}
... тут все потоки останавливаются, пока все потоки не закончат работать с for1(i).
for2(i)
{
	for22(j)
	{
		... у каждого потока свой j (какому потоку какой j - определяется динамически)
	}
	... тут все потоки останавливаются, пока все потоки не закончат работать с for22(j).
}
... тут все потоки останавливаются, пока все потоки не закончат работать с for2(i).
Задача потоков динамически распределять между собой итерации цикла

for11

и

for22

. принцип: выдавать каждому потоку некоторую «порцию» работы (в данном случае j), сделав которую, он получит следующую порцию, если несделанная работа ещё останется к тому времени. Затем дождаться когда все потоки закончат выполнять этот цикл. Для ещё более наглядного понимания, напишу рабочую модель на примере Parallel.For, но только надо решить при ручном создании потоков Thread по выше указанной модели:
        public void Gauss4()
        {
            double[,] a = new double[N, N];
            double[] b = new double[N];
            object locker = new object();
            Array.Copy(A, a, NN);
            Array.Copy(B, b, N);
            for (int i = 0; i < N; i++)
            {
                Parallel.For(i + 1, N, j =>
                {
                    double value = a[j, i] / a[i, i];
                    for (int k = 0; k < N; ++k)
                    {
                        a[j, k] -= a[i, k] * value;
                    }
                    b[j] -= b[i] * value;
                });
            }
            for (int i = N - 1; i >= 0; --i)
            {
                double value = a[i, i];
                b[i] /= value;
                Parallel.For(i+1, N, j =>
                {
                    double temp = a[i, j] * b[j] / value;
                    lock (locker)
                        b[i] -= temp;
                });
            }
            Array.Copy(b, X, N);
        }

Решение задачи: «Синхронизация потоков»

textual
Листинг программы
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
 
namespace ConsoleApplication2
{
    class Program
    {
        static void Main(string[] args)
        {
            ThreadDataContext threadDataContext = new ThreadDataContext();
            threadDataContext.ItemProcessor = ProcessDataItem;
 
            Thread[] threads = new Thread[4];
            for (var i = 0; i < threads.Length; i++)
            {
                threads[i] = new Thread(ThreadFunc);
                threads[i].Name = "Thread " + i;
                threads[i].IsBackground = true;
                threads[i].Start(threadDataContext);
            }
            
            Console.WriteLine("Start batch 1");
            for (int i = 0; i < 3; i++)
            {
                threadDataContext.StartBatch();
                for (int j = 0; j < 10; j++)
                {
                    threadDataContext.AddDataItem(j);
                }
                threadDataContext.EndBatchAndWait();
 
                Console.WriteLine("Batch block end");
            }
            Console.WriteLine("Batch 1 end");
            
            Console.WriteLine("Start batch 2");
            for (int i = 0; i < 3; i++)
            {
                threadDataContext.StartBatch();
                for (int j = 0; j < 10; j++)
                {
                    threadDataContext.AddDataItem(j);
                }
                threadDataContext.EndBatchAndWait();
 
                Console.WriteLine("Batch block end");
            }
            Console.WriteLine("Batch 2 end");
 
            threadDataContext.Exit();
            foreach (var thread in threads)
            {
                thread.Join();
            }
 
            Console.WriteLine("Completed");
            Console.ReadLine();
        }
 
        private static void ThreadFunc(object state)
        {
            ThreadDataContext context = (ThreadDataContext)state;
 
            while (context.WaitStartBatch())
            {
                if (context.GetExit())
                {
                    break;
                }
 
                while (true)
                {
                    if (!context.ProcessNextDataItem())
                    {
                        break;
                    }
                }
            }
            
            Console.WriteLine($"'{Thread.CurrentThread.Name}' exit");
        }
 
        private static void ProcessDataItem(int item)
        {
            Console.WriteLine($"Processing item {item} in '{Thread.CurrentThread.Name}'");
            Thread.Sleep(100);
        }
 
        private class ThreadDataContext
        {
            private BlockingCollection<int> _dataBatch;
            private long _remainingDataCount;
            private bool _exit;
            private readonly ManualResetEvent _signalBatchCompleted = new ManualResetEvent(false);
            private readonly ManualResetEvent _signalBatchStart = new ManualResetEvent(false);
            public Action<int> ItemProcessor;
 
            public void StartBatch()
            {
                _signalBatchStart.Reset();
                _signalBatchCompleted.Reset();
                _dataBatch = new BlockingCollection<int>();
                _signalBatchStart.Set();
            }
            
            public void EndBatchAndWait()
            {
                _dataBatch.CompleteAdding();
 
                if (Interlocked.Read(ref _remainingDataCount) == 0)
                {
                    return;
                }
                else
                {
                    _signalBatchCompleted.WaitOne();
                }
            }
            
            public void AddDataItem(int item)
            {
                Interlocked.Increment(ref _remainingDataCount);
                _dataBatch.Add(item);
            }
 
            public void Exit()
            {
                _signalBatchStart.Reset();
                _signalBatchCompleted.Reset();
                _exit = true;
                _signalBatchStart.Set();
            }
            
            public bool WaitStartBatch()
            {
                _signalBatchStart.WaitOne();
                return _dataBatch != null;
            }
            
            public bool ProcessNextDataItem()
            {
                int item;
                bool taken = _dataBatch.TryTake(out item);
                if (taken)
                {
                    ItemProcessor(item);
 
                    if (Interlocked.Decrement(ref _remainingDataCount) == 0)
                    {
                        if (_dataBatch.IsCompleted)
                        {
                            _signalBatchCompleted.Set();
                        }
                    }
                }
 
                return taken;
            }
            
            public bool GetExit()
            {
                return _exit;
            }
        }
    }
}

Оцени полезность:

12   голосов , оценка 4.333 из 5
Похожие ответы