The Core object interface
Description
The Core class is a QObject. It aggregates all streams from providers into once Core Streams Session .
The Core class is a main class of Daggy Core library and can be included by:
Copy #include <DaggyCore/Core.hpp>
Constructor
Copy Core(Sources sources,
QObject* parent = nullptr);
Core(QString session,
Sources sources,
QObject* parent = nullptr);
Session is id for Core Streams Session . If no session was passed to constructor - new session id will generate (in uuid format).
Sources is required parameter. It contains provider commands for streaming:
Copy namespace daggy {
namespace sources {
namespace commands {
namespace streams {
struct Meta {
QString session;
std::chrono::time_point<std::chrono::system_clock> start_time;
QString extension;
DaggyStreamTypes type;
std::uint64_t seq_num;
std::chrono::time_point<std::chrono::system_clock> time;
};
}
struct Stream {
streams::Meta meta;
QByteArray part;
};
struct DAGGYCORE_EXPORT Properties {
QString extension;
QString exec;
QVariantMap parameters = {};
bool restart = false;
bool operator==(const Properties& other) const;
};
}
using Commands = QMap<QString, commands::Properties>;
using Command = QPair<QString, commands::Properties>;
struct DAGGYCORE_EXPORT Properties {
QString type;
QString host;
Commands commands;
bool reconnect = false;
QVariantMap parameters = {};
bool operator==(const Properties& other) const;
};
}
using Sources = QMap<QString, sources::Properties>;
using Source = QPair<QString, sources::Properties>;
}
Properties of sources is described here
Prepare and extend supported providers
Copy std::error_code prepare();
std::error_code prepare(std::span<providers::IFabric*> fabrics);
std::error_code prepare(QString& error) noexcept;
std::error_code prepare(std::span<providers::IFabric*> fabrics, QString& error) noexcept;
Before Daggy Core start , need to prepare providers . If you need to extend provider types, that not out of box from Daggy Core lib (local and ssh2 ), you can create own providers fabric:
Copy class DAGGYCORE_EXPORT IFabric
{
public:
IFabric();
virtual ~IFabric();
virtual const QString& type() const = 0;
Result<IProvider*> create(const QString& session,
const Source& source,
QObject* parent);
protected:
virtual Result<IProvider*> createProvider(const QString& session,
const Source& source,
QObject* parent) = 0;
};
Own fabric instances need to pass in prepare method.
Connect aggregators
Copy std::error_code connectAggregator(aggregators::IAggregator* aggregator) noexcept;
Before Daggy Core start , you can add aggregators.
Out of box Daggy Core Aggregators
There are three types of aggregators, that already described :
Copy #include <DaggyCore/aggregators/CFile.hpp>
#include <DaggyCore/aggregators/CConsole.hpp>
#include <DaggyCore/aggregators/CCallback.hpp>
User defined aggregator
User can create own aggregator by implementation of IAggregator interface:
Copy class DAGGYCORE_EXPORT IAggregator : public QObject
{
Q_OBJECT
public:
IAggregator(QObject* parent = nullptr);
virtual ~IAggregator();
virtual bool isReady() const = 0;
public slots:
virtual void onDataProviderStateChanged(QString provider_id,
DaggyProviderStates state) = 0;
virtual void onDataProviderError(QString provider_id,
std::error_code error_code) = 0;
virtual void onCommandStateChanged(QString provider_id,
QString command_id,
DaggyCommandStates state,
int exit_code) = 0;
virtual void onCommandError(QString provider_id,
QString command_id,
std::error_code error_code) = 0;
virtual void onCommandStream(QString provider_id,
QString command_id,
sources::commands::Stream stream) = 0;
virtual void onDaggyStateChanged(DaggyStates state) = 0;
};
Start and stop Core
Copy std::error_code start() noexcept;
std::error_code stop() noexcept;
After start is called, Core changed state by the next state machine logic:
Finished state reached by calling stop method, or after all providers are finished.
Streams viewing
Copy signals:
void stateChanged(DaggyStates state);
void dataProviderStateChanged(QString provider_id,
DaggyProviderStates state);
void dataProviderError(QString provider_id,
std::error_code error_code);
void commandStateChanged(QString provider_id,
QString command_id,
DaggyCommandStates state,
int exit_code);
void commandStream(QString provider_id,
QString command_id,
sources::commands::Stream stream);
void commandError(QString provider_id,
QString command_id,
std::error_code error_code);
Usage Example
Copy #include <DaggyCore/Core.hpp>
#include <DaggyCore/Sources.hpp>
#include <DaggyCore/aggregators/CFile.hpp>
#include <DaggyCore/aggregators/CConsole.hpp>
#include <QCoreApplication>
#include <QTimer>
namespace {
constexpr const char* json_data = R"JSON(
{
"sources": {
"localhost" : {
"type": "local",
"commands": {
"ping1": {
"exec": "ping 127.0.0.1",
"extension": "log"
},
"ping2": {
"exec": "ping 127.0.0.1",
"extension": "log",
"restart": true
}
}
}
}
}
)JSON";
}
int main(int argc, char** argv)
{
QCoreApplication app(argc, argv);
daggy::Core core(*daggy::sources::convertors::json(json_data));
daggy::aggregators::CFile file_aggregator("test");
daggy::aggregators::CConsole console_aggregator("test");
core.connectAggregator(&file_aggregator);
core.connectAggregator(&console_aggregator);
QObject::connect(&core, &daggy::Core::stateChanged, &core,
[&](DaggyStates state){
if(state == DaggyFinished)
app.quit();
});
QTimer::singleShot(3000, &core, [&]()
{
core.stop();
});
QTimer::singleShot(5000, &core, [&]()
{
app.exit(-1);
});
core.prepare();
core.start();
return app.exec();
}