Implement the command queue thread safe
The queue will be maybe filled from within another thread (mqtt/web) and handled from the main loop
This commit is contained in:
parent
be09c4052f
commit
832df5a80e
@ -99,7 +99,7 @@ void HoymilesRadio::handleReceivedPackage()
|
|||||||
}
|
}
|
||||||
} else if (!_busyFlag) {
|
} else if (!_busyFlag) {
|
||||||
// Currently in idle mode --> send packet if one is in the queue
|
// Currently in idle mode --> send packet if one is in the queue
|
||||||
if (!_commandQueue.empty()) {
|
if (!isQueueEmpty()) {
|
||||||
CommandAbstract* cmd = _commandQueue.front().get();
|
CommandAbstract* cmd = _commandQueue.front().get();
|
||||||
|
|
||||||
auto inv = Hoymiles.getInverterBySerial(cmd->getTargetAddress());
|
auto inv = Hoymiles.getInverterBySerial(cmd->getTargetAddress());
|
||||||
|
|||||||
@ -5,7 +5,7 @@
|
|||||||
#include "commands/CommandAbstract.h"
|
#include "commands/CommandAbstract.h"
|
||||||
#include "types.h"
|
#include "types.h"
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <queue>
|
#include <ThreadSafeQueue.h>
|
||||||
|
|
||||||
class HoymilesRadio {
|
class HoymilesRadio {
|
||||||
public:
|
public:
|
||||||
@ -39,7 +39,7 @@ protected:
|
|||||||
void handleReceivedPackage();
|
void handleReceivedPackage();
|
||||||
|
|
||||||
serial_u _dtuSerial;
|
serial_u _dtuSerial;
|
||||||
std::queue<std::shared_ptr<CommandAbstract>> _commandQueue;
|
ThreadSafeQueue<std::shared_ptr<CommandAbstract>> _commandQueue;
|
||||||
bool _isInitialized = false;
|
bool _isInitialized = false;
|
||||||
bool _busyFlag = false;
|
bool _busyFlag = false;
|
||||||
|
|
||||||
|
|||||||
62
lib/ThreadSafeQueue/ThreadSafeQueue.h
Normal file
62
lib/ThreadSafeQueue/ThreadSafeQueue.h
Normal file
@ -0,0 +1,62 @@
|
|||||||
|
// SPDX-License-Identifier: GPL-2.0-or-later
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <mutex>
|
||||||
|
#include <optional>
|
||||||
|
#include <queue>
|
||||||
|
|
||||||
|
template <typename T>
|
||||||
|
class ThreadSafeQueue {
|
||||||
|
public:
|
||||||
|
ThreadSafeQueue() = default;
|
||||||
|
ThreadSafeQueue(const ThreadSafeQueue<T>&) = delete;
|
||||||
|
ThreadSafeQueue& operator=(const ThreadSafeQueue<T>&) = delete;
|
||||||
|
|
||||||
|
ThreadSafeQueue(ThreadSafeQueue<T>&& other)
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(_mutex);
|
||||||
|
_queue = std::move(other._queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual ~ThreadSafeQueue() { }
|
||||||
|
|
||||||
|
unsigned long size() const
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(_mutex);
|
||||||
|
return _queue.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
std::optional<T> pop()
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(_mutex);
|
||||||
|
if (_queue.empty()) {
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
T tmp = _queue.front();
|
||||||
|
_queue.pop();
|
||||||
|
return tmp;
|
||||||
|
}
|
||||||
|
|
||||||
|
void push(const T& item)
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(_mutex);
|
||||||
|
_queue.push(item);
|
||||||
|
}
|
||||||
|
|
||||||
|
T front()
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(_mutex);
|
||||||
|
return _queue.front();
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
// Moved out of public interface to prevent races between this
|
||||||
|
// and pop().
|
||||||
|
bool empty() const
|
||||||
|
{
|
||||||
|
return _queue.empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
std::queue<T> _queue;
|
||||||
|
mutable std::mutex _mutex;
|
||||||
|
};
|
||||||
Loading…
Reference in New Issue
Block a user