Как реализовать безопасную многопоточность - C#
Формулировка задачи:
Есть у меня класс, который содержит поток. Этот класс имеет переменные: состояние и команда. Обе - перечисления. Нужно безопасно переключать командами состояние потока, чтобы исключить блокировки.
Вот команды и состояния.
Нужен пример обёртки над потоком, которая могла бы безопасно переводить поток из одного состояния в другое, ожидая полного завершения команды. Знает ли кто что-нибудь подобное? Я сделал свою реализацию, но со временем работы приложения происходит какая-то странная блокировка и отловить я её не могу. Грешу на велосипед.
public enum EnThreadCommand
{
tcmdNone = 0, tcmdIdle, tcmdStart, tcmdStop, tcmdUpdate, tcmdUpdateOnce
}
public enum EnThreadState
{
tstStop = 0, tstIdle, tstUpdate, tstUpdateOnce
}Решение задачи: «Как реализовать безопасную многопоточность»
textual
Листинг программы
void Main()
{
var worker = new Worker();
worker.Started += delegate
{
Console.WriteLine("Begin started");
Thread.Sleep(1000);
};
worker.Stopped += delegate
{
Console.WriteLine("Begin stopped");
Thread.Sleep(1000);
};
worker.Tick += delegate
{
Console.WriteLine("Begin tick");
Thread.Sleep(1000);
};
worker.ExecuteCommand(CommandType.Start);
worker.ExecuteCommand(CommandType.Pause);
worker.ExecuteCommand(CommandType.ForceTick);
worker.ExecuteCommand(CommandType.Resume);
worker.ExecuteCommand(CommandType.Pause);
ParallelEnumerable.Range(0, 3)
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.WithDegreeOfParallelism(10)
.ForAll(i => worker.ExecuteCommandAsync(CommandType.ForceTick));
worker.ExecuteCommand(CommandType.Stop);
worker.ExecuteCommand(CommandType.Start);
worker.ExecuteCommand(CommandType.ForceTick);
Console.ReadLine();
worker.ExecuteCommand(CommandType.Stop);
}
enum CommandType { Start, Stop, Pause, Resume, ForceTick };
class Command
{
public readonly int Id;
public readonly CommandType CommandType;
public Command(int id, CommandType commandType)
{
Id = id;
CommandType = commandType;
}
}
// Что такое асинхронное выполнение команды? По мне так это
// запихивание этой команды в очередь без ожидания её обработки.
// Значит нам понадобится очередь команд, да не простая, а с генерацией UId.
class CommandsQueue : IEnumerable<Command>
{
private int _nextId = 1000;
private Queue<Command> _commands = new Queue<Command>();
public int Enqueue(CommandType commandType)
{
var command = new Command(_nextId, commandType);
_commands.Enqueue(command);
unchecked { _nextId++; }
ElementAdded();
return command.Id;
}
public Command Dequeue()
{
if (_commands.Any())
return _commands.Dequeue();
return null;
}
public IEnumerator<Command> GetEnumerator()
{
return _commands.GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator()
{
return _commands.GetEnumerator();
}
// Когда поток стоит на паузе, нам необходимо будить его
// при поступлении новой команды. Поэтому это событие нам
// пригодится.
public event Action ElementAdded = delegate { };
}
// Что такое синхронное выполнение команды? Это асинхронное выполнение
// этой команды с последующим ожиданием завершения её обработки. Так
// будем же ждать команду по Id.
class AwaitersSet<TKey>
{
private object _locker = new object();
private Dictionary<TKey, ManualResetEvent> _awaiters = new Dictionary<TKey, ManualResetEvent>();
// Добавляем возможность ждать команду с определённым Id.
// Возвращаемое значение тут, в принципе, не важно.
public bool AddAwaiter(TKey id)
{
lock (_locker)
{
if (_awaiters.ContainsKey(id))
return false;
var awaiter = new ManualResetEvent(false);
_awaiters.Add(id, awaiter);
return true;
}
}
// Сигнализируем, что команда выполнена. Те, кто ждал её,
// проснутся, те, кто будут ждать после - даже не уснут.
public void SignalIfExistsAndRemove(TKey id)
{
lock (_locker)
{
ManualResetEvent awaiter = null;
if (_awaiters.TryGetValue(id, out awaiter))
awaiter.Set();
_awaiters.Remove(id);
}
}
// Если ожидателя нет, то команда выполнилась (или ошибка во внешнем коде,
// который не удосужился создать ожидателя).
public void WaitIfExists(TKey id)
{
ManualResetEvent awaiter = null;
lock (_locker)
_awaiters.TryGetValue(id, out awaiter);
if (awaiter != null)
awaiter.WaitOne();
}
}
class Worker
{
private enum State { Stopped, Started, Paused };
[Serializable]
private class AbortRequested : Exception { };
private readonly object _locker = new object();
private readonly CommandsQueue _commands = new CommandsQueue();
private readonly AwaitersSet<int> _awaiters = new AwaitersSet<int>();
private Thread _worker;
private State _state = State.Stopped;
private AutoResetEvent _enqueueEvent = new AutoResetEvent(false);
private Dictionary<CommandType, Action> _commandActions = new Dictionary<CommandType, Action>();
private Dictionary<State, Action> _stateActions = new Dictionary<State, Action>();
private void InitCommandActions()
{
// Я отказался от поддержки асинхронного запуска, так что в очереди этой команды быть не может
// (исключение будет выброшено раньше)!
_commandActions.Add(CommandType.Start, delegate
{
// недостижимый код
throw new InvalidOperationException("Can't start thread, it is already started!");
});
_commandActions.Add(CommandType.Stop, delegate
{
Stopped();
_worker = null;
_state = State.Stopped;
throw new AbortRequested();
});
// Две паузы подряд вызовут ошибку
_commandActions.Add(CommandType.Pause, delegate
{
if (_state != State.Started)
throw new InvalidOperationException("Can't pause thread, it is not started!");
_state = State.Paused;
});
// Продолжение невозможно, если поток не на паузе
_commandActions.Add(CommandType.Resume, delegate
{
if (_state != State.Paused)
throw new InvalidOperationException("Can't resume thread, it is not paused!");
_state = State.Started;
});
_commandActions.Add(CommandType.ForceTick, delegate
{
Tick();
});
}
private void InitStateActions()
{
// Если поток запущен - тикаем
_stateActions.Add(State.Started, delegate
{
Tick();
});
// Если других комманд нет, ждём их поступления
_stateActions.Add(State.Paused, delegate
{
lock (_locker)
{
if (_commands.Any())
_enqueueEvent.Set();
else
_enqueueEvent.Reset();
}
_enqueueEvent.WaitOne();
});
}
private void PerformCommandAction(CommandType commandType)
{
_commandActions[commandType]();
}
private void PerformStateAction()
{
Action action;
if (_stateActions.TryGetValue(_state, out action))
action();
}
private void DoWork()
{
// Id последней извлечённой команды
int? commandId = null;
try
{
Started();
while (true)
{
Command command;
lock (_locker)
command = _commands.Dequeue();
if (command != null)
{
commandId = command.Id;
PerformCommandAction(command.CommandType);
_awaiters.SignalIfExistsAndRemove(command.Id);
}
PerformStateAction();
}
}
catch (AbortRequested)
{
// если в очереди ещё есть команды, то все они должны быть Stop
lock (_locker)
{
// сигнализируем о завершении этой команды
_awaiters.SignalIfExistsAndRemove((int)commandId);
if (_commands.Any())
throw new InvalidOperationException(
string.Format("Cant't perform command {0}: thread is stopped!", _commands.First().CommandType));
}
// после выхода из лока кто-нибудь может кинуть ещё команду. Поскольку
// данный цикл обработки завершился, то её уже никто не выполнит.
// По сути мы обязываем внешний код после команды стоп всегда запрашивать
// команду старт, а иначе может случится молчаливое зависание. Чтобы
// по-настоящему реализовать возможность множественного выполнения команды Стоп,
// надо вставлять дополнительную логику.
}
}
private void StartImpl()
{
// последняя команда (если она есть) должна быть Стоп.
Command command;
lock (_locker)
{
command = _commands.LastOrDefault();
if (command != null && command.CommandType != CommandType.Stop)
throw new InvalidOperationException("Can't start thread, it is not stopped!");
if (command != null)
_awaiters.AddAwaiter(command.Id);
}
// ждём завершения последней команды
if (command != null)
_awaiters.WaitIfExists(command.Id);
lock (_locker)
{
if (_state != State.Stopped)
throw new InvalidOperationException("Can't start thread, it is not stopped!");
// пока ждали, кто-то закинул ещё команду - это недопустимо
if (_commands.Any())
throw new InvalidOperationException(
string.Format("Cant't perform command {0}: thread is stopped!", _commands.First()));
_state = State.Started;
_worker = new Thread(DoWork);
_worker.Start();
}
}
// возвращает Id созданной команды
private int ExecuteCommandAsyncImpl(CommandType commandType)
{
if (commandType == CommandType.Start)
throw new NotSupportedException("Asynchronous start is not supported!");
lock (_locker)
return _commands.Enqueue(commandType);
}
public Worker()
{
InitStateActions();
InitCommandActions();
// если поток стоит на паузе, добавление команды его разбудит
_commands.ElementAdded += delegate
{
_enqueueEvent.Set();
};
}
public void ExecuteCommandAsync(CommandType commandType)
{
ExecuteCommandAsyncImpl(commandType);
}
public void ExecuteCommand(CommandType commandType)
{
if (commandType == CommandType.Start)
{
StartImpl();
return;
}
int id;
lock (_locker)
{
id = ExecuteCommandAsyncImpl(commandType);
_awaiters.AddAwaiter(id);
}
_awaiters.WaitIfExists(id);
}
public event Action Started = delegate { };
public event Action Stopped = delegate { };
public event Action Tick = delegate { };
}