Как реализовать безопасную многопоточность - C#
Формулировка задачи:
Есть у меня класс, который содержит поток. Этот класс имеет переменные: состояние и команда. Обе - перечисления. Нужно безопасно переключать командами состояние потока, чтобы исключить блокировки.
Вот команды и состояния.
Нужен пример обёртки над потоком, которая могла бы безопасно переводить поток из одного состояния в другое, ожидая полного завершения команды. Знает ли кто что-нибудь подобное? Я сделал свою реализацию, но со временем работы приложения происходит какая-то странная блокировка и отловить я её не могу. Грешу на велосипед.
public enum EnThreadCommand { tcmdNone = 0, tcmdIdle, tcmdStart, tcmdStop, tcmdUpdate, tcmdUpdateOnce } public enum EnThreadState { tstStop = 0, tstIdle, tstUpdate, tstUpdateOnce }
Решение задачи: «Как реализовать безопасную многопоточность»
textual
Листинг программы
void Main() { var worker = new Worker(); worker.Started += delegate { Console.WriteLine("Begin started"); Thread.Sleep(1000); }; worker.Stopped += delegate { Console.WriteLine("Begin stopped"); Thread.Sleep(1000); }; worker.Tick += delegate { Console.WriteLine("Begin tick"); Thread.Sleep(1000); }; worker.ExecuteCommand(CommandType.Start); worker.ExecuteCommand(CommandType.Pause); worker.ExecuteCommand(CommandType.ForceTick); worker.ExecuteCommand(CommandType.Resume); worker.ExecuteCommand(CommandType.Pause); ParallelEnumerable.Range(0, 3) .WithExecutionMode(ParallelExecutionMode.ForceParallelism) .WithDegreeOfParallelism(10) .ForAll(i => worker.ExecuteCommandAsync(CommandType.ForceTick)); worker.ExecuteCommand(CommandType.Stop); worker.ExecuteCommand(CommandType.Start); worker.ExecuteCommand(CommandType.ForceTick); Console.ReadLine(); worker.ExecuteCommand(CommandType.Stop); } enum CommandType { Start, Stop, Pause, Resume, ForceTick }; class Command { public readonly int Id; public readonly CommandType CommandType; public Command(int id, CommandType commandType) { Id = id; CommandType = commandType; } } // Что такое асинхронное выполнение команды? По мне так это // запихивание этой команды в очередь без ожидания её обработки. // Значит нам понадобится очередь команд, да не простая, а с генерацией UId. class CommandsQueue : IEnumerable<Command> { private int _nextId = 1000; private Queue<Command> _commands = new Queue<Command>(); public int Enqueue(CommandType commandType) { var command = new Command(_nextId, commandType); _commands.Enqueue(command); unchecked { _nextId++; } ElementAdded(); return command.Id; } public Command Dequeue() { if (_commands.Any()) return _commands.Dequeue(); return null; } public IEnumerator<Command> GetEnumerator() { return _commands.GetEnumerator(); } IEnumerator IEnumerable.GetEnumerator() { return _commands.GetEnumerator(); } // Когда поток стоит на паузе, нам необходимо будить его // при поступлении новой команды. Поэтому это событие нам // пригодится. public event Action ElementAdded = delegate { }; } // Что такое синхронное выполнение команды? Это асинхронное выполнение // этой команды с последующим ожиданием завершения её обработки. Так // будем же ждать команду по Id. class AwaitersSet<TKey> { private object _locker = new object(); private Dictionary<TKey, ManualResetEvent> _awaiters = new Dictionary<TKey, ManualResetEvent>(); // Добавляем возможность ждать команду с определённым Id. // Возвращаемое значение тут, в принципе, не важно. public bool AddAwaiter(TKey id) { lock (_locker) { if (_awaiters.ContainsKey(id)) return false; var awaiter = new ManualResetEvent(false); _awaiters.Add(id, awaiter); return true; } } // Сигнализируем, что команда выполнена. Те, кто ждал её, // проснутся, те, кто будут ждать после - даже не уснут. public void SignalIfExistsAndRemove(TKey id) { lock (_locker) { ManualResetEvent awaiter = null; if (_awaiters.TryGetValue(id, out awaiter)) awaiter.Set(); _awaiters.Remove(id); } } // Если ожидателя нет, то команда выполнилась (или ошибка во внешнем коде, // который не удосужился создать ожидателя). public void WaitIfExists(TKey id) { ManualResetEvent awaiter = null; lock (_locker) _awaiters.TryGetValue(id, out awaiter); if (awaiter != null) awaiter.WaitOne(); } } class Worker { private enum State { Stopped, Started, Paused }; [Serializable] private class AbortRequested : Exception { }; private readonly object _locker = new object(); private readonly CommandsQueue _commands = new CommandsQueue(); private readonly AwaitersSet<int> _awaiters = new AwaitersSet<int>(); private Thread _worker; private State _state = State.Stopped; private AutoResetEvent _enqueueEvent = new AutoResetEvent(false); private Dictionary<CommandType, Action> _commandActions = new Dictionary<CommandType, Action>(); private Dictionary<State, Action> _stateActions = new Dictionary<State, Action>(); private void InitCommandActions() { // Я отказался от поддержки асинхронного запуска, так что в очереди этой команды быть не может // (исключение будет выброшено раньше)! _commandActions.Add(CommandType.Start, delegate { // недостижимый код throw new InvalidOperationException("Can't start thread, it is already started!"); }); _commandActions.Add(CommandType.Stop, delegate { Stopped(); _worker = null; _state = State.Stopped; throw new AbortRequested(); }); // Две паузы подряд вызовут ошибку _commandActions.Add(CommandType.Pause, delegate { if (_state != State.Started) throw new InvalidOperationException("Can't pause thread, it is not started!"); _state = State.Paused; }); // Продолжение невозможно, если поток не на паузе _commandActions.Add(CommandType.Resume, delegate { if (_state != State.Paused) throw new InvalidOperationException("Can't resume thread, it is not paused!"); _state = State.Started; }); _commandActions.Add(CommandType.ForceTick, delegate { Tick(); }); } private void InitStateActions() { // Если поток запущен - тикаем _stateActions.Add(State.Started, delegate { Tick(); }); // Если других комманд нет, ждём их поступления _stateActions.Add(State.Paused, delegate { lock (_locker) { if (_commands.Any()) _enqueueEvent.Set(); else _enqueueEvent.Reset(); } _enqueueEvent.WaitOne(); }); } private void PerformCommandAction(CommandType commandType) { _commandActions[commandType](); } private void PerformStateAction() { Action action; if (_stateActions.TryGetValue(_state, out action)) action(); } private void DoWork() { // Id последней извлечённой команды int? commandId = null; try { Started(); while (true) { Command command; lock (_locker) command = _commands.Dequeue(); if (command != null) { commandId = command.Id; PerformCommandAction(command.CommandType); _awaiters.SignalIfExistsAndRemove(command.Id); } PerformStateAction(); } } catch (AbortRequested) { // если в очереди ещё есть команды, то все они должны быть Stop lock (_locker) { // сигнализируем о завершении этой команды _awaiters.SignalIfExistsAndRemove((int)commandId); if (_commands.Any()) throw new InvalidOperationException( string.Format("Cant't perform command {0}: thread is stopped!", _commands.First().CommandType)); } // после выхода из лока кто-нибудь может кинуть ещё команду. Поскольку // данный цикл обработки завершился, то её уже никто не выполнит. // По сути мы обязываем внешний код после команды стоп всегда запрашивать // команду старт, а иначе может случится молчаливое зависание. Чтобы // по-настоящему реализовать возможность множественного выполнения команды Стоп, // надо вставлять дополнительную логику. } } private void StartImpl() { // последняя команда (если она есть) должна быть Стоп. Command command; lock (_locker) { command = _commands.LastOrDefault(); if (command != null && command.CommandType != CommandType.Stop) throw new InvalidOperationException("Can't start thread, it is not stopped!"); if (command != null) _awaiters.AddAwaiter(command.Id); } // ждём завершения последней команды if (command != null) _awaiters.WaitIfExists(command.Id); lock (_locker) { if (_state != State.Stopped) throw new InvalidOperationException("Can't start thread, it is not stopped!"); // пока ждали, кто-то закинул ещё команду - это недопустимо if (_commands.Any()) throw new InvalidOperationException( string.Format("Cant't perform command {0}: thread is stopped!", _commands.First())); _state = State.Started; _worker = new Thread(DoWork); _worker.Start(); } } // возвращает Id созданной команды private int ExecuteCommandAsyncImpl(CommandType commandType) { if (commandType == CommandType.Start) throw new NotSupportedException("Asynchronous start is not supported!"); lock (_locker) return _commands.Enqueue(commandType); } public Worker() { InitStateActions(); InitCommandActions(); // если поток стоит на паузе, добавление команды его разбудит _commands.ElementAdded += delegate { _enqueueEvent.Set(); }; } public void ExecuteCommandAsync(CommandType commandType) { ExecuteCommandAsyncImpl(commandType); } public void ExecuteCommand(CommandType commandType) { if (commandType == CommandType.Start) { StartImpl(); return; } int id; lock (_locker) { id = ExecuteCommandAsyncImpl(commandType); _awaiters.AddAwaiter(id); } _awaiters.WaitIfExists(id); } public event Action Started = delegate { }; public event Action Stopped = delegate { }; public event Action Tick = delegate { }; }
ИИ поможет Вам:
- решить любую задачу по программированию
- объяснить код
- расставить комментарии в коде
- и т.д