Taro3

View on GitHub

アプリケーションからのソケットとのインタラクション

次のプロジェクトは mandelbrot-app です。これには、作業員と対話するQTcpServerとマンデルブロ集合の絵が含まれます。忘れないように、mandelbrot-appのアーキテクチャの図はここに示されています。

image

このアプリケーションを一から構築していきます。特定のWorkerとの接続を維持するためのクラスから始めましょう。WorkerClientです。このクラスは、特定の QThread 内に存在し、前節で説明した QTcpSocket/QDataStream メカニズムと同じ QTcpSocket/QDataStream メカニズムを使用して Worker クラスと対話します。

mandelbrot-appで、WorkerClientという名前の新しいC++クラスを作成し、WorkerClient.hを以下のように更新します。

#include <QTcpSocket>
#include <QList>
#include <QDataStream>

#include "JobRequest.h"
#include "JobResult.h"
#include "Message.h"

class WorkerClient : public QObject
{
    Q_OBJECT

public:
    WorkerClient(int socketDescriptor);

private:
    int mSocketDescriptor;
    int mCpuCoreCount;
    QTcpSocket mSocket;
    QDataStream mSocketReader;
};

Q_DECLARE_METATYPE(WorkerClient*)

Workerと非常に似ています。しかし、ライフサイクルの観点からは異なる挙動をするかもしれません。新しいWorkerがQTcpServerに接続するたびに、関連するQThreadと一緒に新しいWorkerClientが生成されます。WorkerClientオブジェクトは、mSocketを介してWorkerクラスと対話する責任を負います。

Workerが切断された場合、WorkerClientオブジェクトは削除され、QTcpServerクラスから削除されます。

メンバーから始めて、このヘッダーの内容を見直してみましょう。

これで、WorkerClient.hに関数を追加することができます。

class WorkerClient : public QObject
{
    Q_OBJECT

public:
    WorkerClient(int socketDescriptor);
    int cpuCoreCount() const;

signals:
    void unregistered(WorkerClient* workerClient);
    void jobCompleted(WorkerClient* workerClient,
                      JobResult jobResult);
    void sendJobRequests(QList<JobRequest> requests);

public slots:
    void start();
    void abortJob();

private slots:
    void readMessages();
    void doSendJobRequests(QList<JobRequest> requests);

private:
    void handleWorkerRegistered(Message& message);
    void handleWorkerUnregistered(Message& message);
    void handleJobResult(Message& message);
    ...
};

それぞれの関数が何をするのか見てみましょう。

このクラスには3つのシグナルがあります。

スロットの詳細をご紹介します。

そして最後に、プライベート機能の詳細です。

WorkerClient.cppでの実装はよく知っているはずです。ここにコンストラクタがあります。

#include "MessageUtils.h"

WorkerClient::WorkerClient(int socketDescriptor) :
    QObject(),
    mSocketDescriptor(socketDescriptor),
    mSocket(this),
    mSocketReader(&mSocket)
{
    connect(this, &WorkerClient::sendJobRequests,
            this, &WorkerClient::doSendJobRequests);
}

フィールドは初期化リストで初期化され、sendJobRequestsシグナルはプライベートスロットであるdoSendJobRequestsに接続されています。このトリックは、複数の関数の宣言を避けながらも、スレッド間でキュー接続を保持するために使用されます。

start()関数を進めていきます。

void WorkerClient::start()
{
    connect(&mSocket, &QTcpSocket::readyRead,
            this, &WorkerClient::readMessages);
            mSocket.setSocketDescriptor(mSocketDescriptor);
}

これは非常に短いです。最初にソケットからのREADYRead()シグナルをreadMessages()スロットに接続します。その後、mSocket は mSocketDescriptor で適切に設定されます。

接続は、WorkerClientに関連付けられたQThreadクラスで実行される必要があるため、start()で実行する必要があります。これにより、接続が直接接続され、mSocketがWorkerClientと対話するためにシグナルをキューイングする必要がないことがわかります。

関数の終了時には、関連する QThread は終了していないことに注意してください。それどころか、QThread::exec() でイベントループを実行しています。QThread クラスは、誰かが QThread::exit() を呼び出すまでイベントループを実行し続けます。

start()関数の唯一の目的は、適切なスレッドの親和性でmSocket接続の作業を行うことです。その後は、データを処理するためにQtのシグナル/スロット機構だけに頼っています。忙しいwhileループは必要ありません。

readMessages() クラスが待っています。

void WorkerClient::readMessages()
{
    auto messages = MessageUtils::readMessages(mSocketReader);
    for(auto& message : *messages) {
        switch (message->type) {
        case Message::Type::WORKER_REGISTER:
            handleWorkerRegistered(*message);
            break;

        case Message::Type::WORKER_UNREGISTER:
            handleWorkerUnregistered(*message);
            break;

        case Message::Type::JOB_RESULT:
            handleJobResult(*message);
            break;

        default:
            break;
        }
    }
}

ここでは何も驚くことはありません。Worker の場合と全く同じです。MessageUtils::readMessages() を使用して Messages をデシリアライズし、メッセージタイプごとに適切な関数を呼び出します。

以下、handleWorkerRegistered()から始まる各関数の内容です。

void WorkerClient::handleWorkerRegistered(Message& message)
{
    QDataStream in(&message.data, QIODevice::ReadOnly);
    in >> mCpuCoreCount;
}

WORKER_REGISTERメッセージの場合、Workerはmessage.dataでintをシリアライズしているだけなので、in » mCpuCoreCountでその場でmCpuCoreCountを初期化することができます。

これで、handleWorkerUnregistered()の本体。

void WorkerClient::handleWorkerUnregistered(Message& /*message*/)
{
    emit unregistered(this);
}

これは、WorkerClientのオーナーに拾われるunregistered()シグナルを送信するための中継です。

最後の「読み込み」関数はhandleJobResult()です。

void WorkerClient::handleJobResult(Message& message)
{
    QDataStream in(&message.data, QIODevice::ReadOnly);
    JobResult jobResult;
    in >> jobResult;
    emit jobCompleted(this, jobResult);
}

これは見かけによらず短いです。これは message.data から jobResult コンポーネントをデシリアライズして jobCompleted() シグナルを出すだけです。

ソケットへの書き込み」関数は abortJob() と doSendJobRequest() です。

void WorkerClient::abortJob()
{
    MessageUtils::sendMessage(mSocket,
                              Message::Type::ALL_JOBS_ABORT,
                              true);
}

void WorkerClient::doSendJobRequests(QList<JobRequest> requests)
{
    QByteArray data;
    QDataStream stream(&data, QIODevice::WriteOnly);
    stream << requests;

    MessageUtils::sendMessage(mSocket,
                              Message::Type::JOB_REQUEST,
                              data);
}

abortJob() 関数は forceFlush フラグを true に設定して ALL_JOBS_ABORT メッセージを送信し、doSendJobRequests() はリクエストをストリームにシリアライズしてから MessageUtils::sendMessage() を使用して送信します。


戻る