Nulls в Producer - C#
Формулировка задачи:
Здравствуйте. Прошу помочь, потому как сам уже в душе не я... знаю как это разрешить
Застопорил на том, что потоки не завершаются после окончания. Т.е. один из рабочих потоков при этом очень быстро завершает самый последний блок и успевает пойти на следующий круг раньше, чем основной поток дождётся producer Join и положит туда nulls. В этом случае поток повиснет в куске
, т.к. не войдет в блок
Но я понятия не приложу как это реализовать, паттерн головного мозга.
Пользовался https://rsdn.org/article/dotnet/CSThreading1.xml
while (true)
{
Block block = null;
try
{
lock (this.inputLockObj)
{
if (this.input.Count > 0)
{
block = this.input.Dequeue();
if (block == null)
return;
}
}
this.readAdd.Set();
if (block != null)
{
action.Act(block);
this.EnqueueHandledBlock(block);
}
else
{
this.readGet.WaitOne();
}
}
catch (ThreadAbortException e)
{
Console.WriteLine(e.Message);
return;
}
catch (Exception e)
{
callback?.Invoke(e.Message);
return;
}
}this.readGet.WaitOne();
if (this.input.Count > 0)
{
block = this.input.Dequeue();
if (block == null)
return;
}Решение задачи: «Nulls в Producer»
textual
Листинг программы
class ProducerConsumer
{
private Semaphore _workLimit;
private ManualResetEvent _awaiter = new ManualResetEvent(true);
private object _locker = new object();
private Queue<Action> _work = new Queue<Action>();
private int _threads;
private int _abortsPending;
public ProducerConsumer(int workLimit)
{
_workLimit = new Semaphore(workLimit, workLimit);
}
public void AddWorker()
{
var thread = new Thread(DoWork);
thread.Start();
lock (_locker)
_threads++;
}
public void RemoveWorker()
{
lock (_locker)
_abortsPending++;
}
public void Join()
{
_awaiter.WaitOne();
lock (_locker)
{
_abortsPending += _threads - _abortsPending;
Monitor.PulseAll(_locker);
}
}
public void AddWork(Action workItem)
{
_workLimit.WaitOne();
lock (_locker)
{
_work.Enqueue(workItem);
Monitor.Pulse(_locker);
_awaiter.Reset();
}
}
private void DoWork()
{
while (true)
{
Action action = null;
lock (_locker)
{
if (_work.Count == 0)
_awaiter.Set();
if (_abortsPending > 0)
{
_abortsPending--;
_threads--;
return;
}
while (!_work.Any())
{
Monitor.Wait(_locker);
if (_abortsPending > 0)
{
_abortsPending--;
_threads--;
return;
}
}
action = _work.Dequeue();
}
action();
_workLimit.Release();
}
}
}
void Main()
{
var pc = new ProducerConsumer(1);
pc.AddWorker();
pc.AddWorker();
pc.AddWorker();
pc.AddWorker();
pc.AddWorker();
pc.AddWorker();
pc.AddWorker();
pc.AddWorker();
pc.AddWorker();
pc.AddWork(() => { Console.WriteLine("1"); Thread.Sleep(1000); });
pc.AddWork(() => Console.WriteLine("2"));
pc.AddWork(() => Console.WriteLine("3"));
pc.AddWork(() => { Console.WriteLine("4"); Thread.Sleep(1000); });
Console.WriteLine("Join");
pc.Join();
Console.WriteLine("End");
}