Как реализовать безопасную многопоточность - C#

Узнай цену своей работы

Формулировка задачи:

Есть у меня класс, который содержит поток. Этот класс имеет переменные: состояние и команда. Обе - перечисления. Нужно безопасно переключать командами состояние потока, чтобы исключить блокировки. Вот команды и состояния.
    public enum EnThreadCommand
    {
        tcmdNone = 0, tcmdIdle, tcmdStart, tcmdStop, tcmdUpdate, tcmdUpdateOnce
    }
 
    public enum EnThreadState
    {
        tstStop = 0, tstIdle, tstUpdate, tstUpdateOnce
    }
Нужен пример обёртки над потоком, которая могла бы безопасно переводить поток из одного состояния в другое, ожидая полного завершения команды. Знает ли кто что-нибудь подобное? Я сделал свою реализацию, но со временем работы приложения происходит какая-то странная блокировка и отловить я её не могу. Грешу на велосипед.

Решение задачи: «Как реализовать безопасную многопоточность»

textual
Листинг программы
void Main()
{
    var worker = new Worker();
 
    worker.Started += delegate
    {
        Console.WriteLine("Begin started");
        Thread.Sleep(1000);
    };
 
    worker.Stopped += delegate
    {
        Console.WriteLine("Begin stopped");
        Thread.Sleep(1000);
    };
 
    worker.Tick += delegate
    {
        Console.WriteLine("Begin tick");
        Thread.Sleep(1000);
    };
    
    worker.ExecuteCommand(CommandType.Start);
    worker.ExecuteCommand(CommandType.Pause);
    worker.ExecuteCommand(CommandType.ForceTick);
    worker.ExecuteCommand(CommandType.Resume);
    worker.ExecuteCommand(CommandType.Pause);
    
    ParallelEnumerable.Range(0, 3)
        .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
        .WithDegreeOfParallelism(10)
        .ForAll(i => worker.ExecuteCommandAsync(CommandType.ForceTick));
        
    worker.ExecuteCommand(CommandType.Stop);
    worker.ExecuteCommand(CommandType.Start);
    worker.ExecuteCommand(CommandType.ForceTick);
    
    Console.ReadLine();
    worker.ExecuteCommand(CommandType.Stop);
}
 
enum CommandType { Start, Stop, Pause, Resume, ForceTick };
 
class Command
{
    public readonly int Id;
    public readonly CommandType CommandType;
    
    public Command(int id, CommandType commandType)
    {
        Id = id;
        CommandType = commandType;
    }
}
 
// Что такое асинхронное выполнение команды? По мне так это
// запихивание этой команды в очередь без ожидания её обработки.
// Значит нам понадобится очередь команд, да не простая, а с генерацией UId.
class CommandsQueue : IEnumerable<Command>
{
    private int _nextId = 1000;
    private Queue<Command> _commands = new Queue<Command>();
 
    public int Enqueue(CommandType commandType)
    {
        var command = new Command(_nextId, commandType);
        _commands.Enqueue(command);
        unchecked { _nextId++; }
        ElementAdded();
        return command.Id;
    }
 
    public Command Dequeue()
    {
        if (_commands.Any())
            return _commands.Dequeue();
        return null;
    }
 
    public IEnumerator<Command> GetEnumerator()
    {
        return _commands.GetEnumerator();
    }
 
    IEnumerator IEnumerable.GetEnumerator()
    {
        return _commands.GetEnumerator();
    }
 
    // Когда поток стоит на паузе, нам необходимо будить его
    // при поступлении новой команды. Поэтому это событие нам
    // пригодится.
    public event Action ElementAdded = delegate { };
}
 
// Что такое синхронное выполнение команды? Это асинхронное выполнение
// этой команды с последующим ожиданием завершения её обработки. Так
// будем же ждать команду по Id.
class AwaitersSet<TKey>
{
    private object _locker = new object();
    private Dictionary<TKey, ManualResetEvent> _awaiters = new Dictionary<TKey, ManualResetEvent>();
 
    // Добавляем возможность ждать команду с определённым Id.
    // Возвращаемое значение тут, в принципе, не важно.
    public bool AddAwaiter(TKey id)
    {
        lock (_locker)
        {
            if (_awaiters.ContainsKey(id))
                return false;
 
            var awaiter = new ManualResetEvent(false);
            _awaiters.Add(id, awaiter);
 
            return true;
        }
    }
 
    // Сигнализируем, что команда выполнена. Те, кто ждал её,
    // проснутся, те, кто будут ждать после - даже не уснут.
    public void SignalIfExistsAndRemove(TKey id)
    {
        lock (_locker)
        {
            ManualResetEvent awaiter = null;
            if (_awaiters.TryGetValue(id, out awaiter))
                awaiter.Set();
            _awaiters.Remove(id);
        }
    }
 
    // Если ожидателя нет, то команда выполнилась (или ошибка во внешнем коде,
    // который не удосужился создать ожидателя).
    public void WaitIfExists(TKey id)
    {
        ManualResetEvent awaiter = null;
 
        lock (_locker)
            _awaiters.TryGetValue(id, out awaiter);
 
        if (awaiter != null)
            awaiter.WaitOne();
    }
}
 
 
class Worker
{
    private enum State { Stopped, Started, Paused };
 
    [Serializable]
    private class AbortRequested : Exception { };
 
    private readonly object _locker = new object();
    private readonly CommandsQueue _commands = new CommandsQueue();
    private readonly AwaitersSet<int> _awaiters = new AwaitersSet<int>();
    private Thread _worker;
    private State _state = State.Stopped;
    
    private AutoResetEvent _enqueueEvent = new AutoResetEvent(false);
 
    private Dictionary<CommandType, Action> _commandActions = new Dictionary<CommandType, Action>();
    private Dictionary<State, Action> _stateActions = new Dictionary<State, Action>();
 
    private void InitCommandActions()
    {
        // Я отказался от поддержки асинхронного запуска, так что в очереди этой команды быть не может
        // (исключение будет выброшено раньше)!
        _commandActions.Add(CommandType.Start, delegate
        {
            // недостижимый код
            throw new InvalidOperationException("Can't start thread, it is already started!");
        });
 
        _commandActions.Add(CommandType.Stop, delegate
        {
            Stopped();
            _worker = null;
            _state = State.Stopped;
            throw new AbortRequested();
        });
 
        // Две паузы подряд вызовут ошибку
        _commandActions.Add(CommandType.Pause, delegate
        {
            if (_state != State.Started)
                throw new InvalidOperationException("Can't pause thread, it is not started!");
            _state = State.Paused;
        });
 
        // Продолжение невозможно, если поток не на паузе
        _commandActions.Add(CommandType.Resume, delegate
        {
            if (_state != State.Paused)
                throw new InvalidOperationException("Can't resume thread, it is not paused!");
            _state = State.Started;
        });
 
        _commandActions.Add(CommandType.ForceTick, delegate
        {
            Tick();
        });
    }
 
    private void InitStateActions()
    {
        // Если поток запущен - тикаем
        _stateActions.Add(State.Started, delegate
        {
            Tick();
        });
 
        // Если других комманд нет, ждём их поступления
        _stateActions.Add(State.Paused, delegate
        {
            lock (_locker)
            {
                if (_commands.Any())
                    _enqueueEvent.Set();
                else
                    _enqueueEvent.Reset();
            }
            
            _enqueueEvent.WaitOne();
        });
    }
 
    private void PerformCommandAction(CommandType commandType)
    {
        _commandActions[commandType]();
    }
 
    private void PerformStateAction()
    {
        Action action;
        if (_stateActions.TryGetValue(_state, out action))
            action();
    }
 
    private void DoWork()
    {
        // Id последней извлечённой команды
        int? commandId = null;
        try
        {
            Started();
 
            while (true)
            {
                Command command;
                lock (_locker)
                    command = _commands.Dequeue();
 
                if (command != null)
                {
                    commandId = command.Id;
                    PerformCommandAction(command.CommandType);
                    _awaiters.SignalIfExistsAndRemove(command.Id);
                }
 
                PerformStateAction();
            }
        }
        catch (AbortRequested)
        {
            // если в очереди ещё есть команды, то все они должны быть Stop
            lock (_locker)
            {
                // сигнализируем о завершении этой команды
                _awaiters.SignalIfExistsAndRemove((int)commandId);
 
                if (_commands.Any())
                    throw new InvalidOperationException(
                        string.Format("Cant't perform command {0}: thread is stopped!", _commands.First().CommandType));
            }
            // после выхода из лока кто-нибудь может кинуть ещё команду. Поскольку
            // данный цикл обработки завершился, то её уже никто не выполнит.
            // По сути мы обязываем внешний код после команды стоп всегда запрашивать
            // команду старт, а иначе может случится молчаливое зависание. Чтобы 
            // по-настоящему реализовать возможность множественного выполнения команды Стоп, 
            // надо вставлять дополнительную логику.
        }
    }
 
    private void StartImpl()
    {
        // последняя команда (если она есть) должна быть Стоп.
        Command command;
        lock (_locker)
        {
            command = _commands.LastOrDefault();
            if (command != null && command.CommandType != CommandType.Stop)
                throw new InvalidOperationException("Can't start thread, it is not stopped!");
            if (command != null)
                _awaiters.AddAwaiter(command.Id);
        }
 
        // ждём завершения последней команды
        if (command != null)
            _awaiters.WaitIfExists(command.Id);
 
        lock (_locker)
        {
            if (_state != State.Stopped)
                throw new InvalidOperationException("Can't start thread, it is not stopped!");
 
            // пока ждали, кто-то закинул ещё команду - это недопустимо
            if (_commands.Any())
                throw new InvalidOperationException(
                    string.Format("Cant't perform command {0}: thread is stopped!", _commands.First()));
 
            _state = State.Started;
            _worker = new Thread(DoWork);
            _worker.Start();
        }
    }
 
    // возвращает Id созданной команды
    private int ExecuteCommandAsyncImpl(CommandType commandType)
    {
        if (commandType == CommandType.Start)
            throw new NotSupportedException("Asynchronous start is not supported!");
 
        lock (_locker)
            return _commands.Enqueue(commandType);
    }
 
    public Worker()
    {
        InitStateActions();
        InitCommandActions();
 
        // если поток стоит на паузе, добавление команды его разбудит
        _commands.ElementAdded += delegate
        {
            _enqueueEvent.Set();
        };
    }
 
    public void ExecuteCommandAsync(CommandType commandType)
    {
        ExecuteCommandAsyncImpl(commandType);
    }
 
    public void ExecuteCommand(CommandType commandType)
    {
        if (commandType == CommandType.Start)
        {
            StartImpl();
            return;
        }
 
        int id;
        lock (_locker)
        {
            id = ExecuteCommandAsyncImpl(commandType);
            _awaiters.AddAwaiter(id);
        }
 
        _awaiters.WaitIfExists(id);
    }
 
    public event Action Started = delegate { };
    public event Action Stopped = delegate { };
    public event Action Tick = delegate { };
}

ИИ поможет Вам:


  • решить любую задачу по программированию
  • объяснить код
  • расставить комментарии в коде
  • и т.д
Попробуйте бесплатно

Оцени полезность:

9   голосов , оценка 4.222 из 5
Похожие ответы