[OpenDocString] kdeconnect-kde (cpp)
connectionmultiplexer.cpp
ConnectionMultiplexer::ConnectionMultiplexer(QBluetoothSocket *socket, QObject *parent)
    : QObject(parent)
    , mSocket{socket}
    , receivedProtocolVersion{false}
{
    connect(mSocket, &QIODevice::readyRead, this, &ConnectionMultiplexer::readyRead);
    connect(mSocket, &QIODevice::aboutToClose, this, &ConnectionMultiplexer::disconnected);
    connect(mSocket, &QBluetoothSocket::disconnected, this, &ConnectionMultiplexer::disconnected);
    connect(mSocket, &QIODevice::bytesWritten, this, &ConnectionMultiplexer::bytesWritten);

    // Send the protocol version
    QByteArray message(23, (char)0);
    message[0] = MESSAGE_PROTOCOL_VERSION;
    qToBigEndian(4, &message.data()[1]);
    // Leave UUID empty
    // Only support version 1 (lowest supported = highest supported = 1)
    qToBigEndian(1, &message.data()[19]);
    qToBigEndian(1, &message.data()[21]);
    to_write_bytes.append(message);

    // Send the protocol version message (queued)
    QMetaObject::invokeMethod(this, &ConnectionMultiplexer::bytesWritten, Qt::QueuedConnection);

    // Always open the default channel
    addChannel(QBluetoothUuid{QStringLiteral(DEFAULT_CHANNEL_UUID)});

    // Immediately check if we can read stuff ("readyRead" may not be called in that case)
    if (mSocket->bytesAvailable()) {
        // But invoke it queued
        QMetaObject::invokeMethod(this, &ConnectionMultiplexer::readyRead, Qt::QueuedConnection);
    }
}
This constructor builds a connection multiplexer object and its duplexer. It takes the socket object and its associated connections. It sets up signal/slot connections for ready and disconnected, as well creates a message of the given protocol version. It also sets up periodic read and write operations for the protocol version.
ConnectionMultiplexer::~ConnectionMultiplexer()
{
    // Always make sure we close the connection
    close();
}
This implements a close signal and makes sure we close the connection. It makes sure we close the connection before continuing.
void ConnectionMultiplexer::readyRead()
{
    // Continue parsing messages until we need more data for another message
    while (tryParseMessage()) { }
}
This code is responsible for parsing messages until there is data to read. It will continue parsing messages until there is data to read.
void ConnectionMultiplexer::disconnected()
{
    // In case we get disconnected, remove all channels
    for (auto &&channel : channels) {
        disconnect(channel.data(), nullptr, this, nullptr);
        channel->disconnected();
    }
    channels.clear();
    for (auto channel : unrequested_channels) {
        delete channel;
    }
    unrequested_channels.clear();
}
This removes all channels that were not connected. It removes all channels that were requested and sets the disconnected state to disconnected.
void ConnectionMultiplexer::close()
{
    // In case we want to close the connection, remove all channels
    for (auto &&channel : channels) {
        disconnect(channel.data(), nullptr, this, nullptr);
        channel->disconnected();
    }
    channels.clear();
    for (auto channel : unrequested_channels) {
        delete channel;
    }
    unrequested_channels.clear();

    mSocket->close();
}
This closes the connection by disconnecting all channels and removing the unrequested channels.
bool ConnectionMultiplexer::isOpen() const
{
    return mSocket->isOpen();
}
This implements checking if the socket is open.
bool ConnectionMultiplexer::tryParseMessage()
{
    mSocket->startTransaction();

    // The message header is 19 bytes long
    QByteArray header = mSocket->read(19);
    if (header.size() != 19) {
        mSocket->rollbackTransaction();
        return false;
    }

    /**
     * Parse the header:
     *  - message type (1 byte)
     *  - message length (2 bytes, Big-Endian), excludes header size
     *  - channel uuid (16 bytes, Big-Endian)
     */
    char message_type = header[0];
    uint16_t message_length = qFromBigEndian(&header.data()[1]);
    quint128 message_uuid_raw;
    for (int i = 0; i < 16; ++i)
        message_uuid_raw.data[i] = header[3 + i];
    QBluetoothUuid message_uuid = QBluetoothUuid(message_uuid_raw);

    // Check if we have the full message including its data
    QByteArray data = mSocket->read(message_length);
    if (data.size() != message_length) {
        mSocket->rollbackTransaction();
        return false;
    }

    Q_ASSERT(receivedProtocolVersion || message_type == MESSAGE_PROTOCOL_VERSION);

    // Parse the different message types
    if (message_type == MESSAGE_OPEN_CHANNEL) {
        // The other endpoint requested us to open a channel
        Q_ASSERT(message_length == 0);

        addChannel(message_uuid);
    } else if (message_type == MESSAGE_READ) {
        // The other endpoint has read some data and requests more data
        Q_ASSERT(message_length == 2);
        // Read the number of bytes requested (2 bytes, Big-Endian)
        uint16_t additional_read = qFromBigEndian(data.data());
        Q_ASSERT(additional_read > 0);

        // Check if we haven't closed the channel in the meanwhile
        //    (note: different from the user's endpoint of a closed channel, since we might have outstanding buffers)
        auto iter = channels.find(message_uuid);
        if (iter != channels.end() && (*iter)->connected) {
            auto channel = *iter;

            // We have "additional_read" more bytes we can safely write in this channel
            channel->freeWriteAmount += additional_read;
            mSocket->commitTransaction();
            // We might still have some data in the write buffer
            Q_EMIT channel->writeAvailable();
            return true;
        }
    } else if (message_type == MESSAGE_WRITE) {
        // The other endpoint has written data into a channel (because we requested it)
        Q_ASSERT(message_length > 0);

        // Check if we haven't closed the channel in the meanwhile
        //    (note: different from the user's endpoint of a closed channel, since we might have outstanding buffers)
        auto iter = channels.find(message_uuid);
        if (iter != channels.end() && (*iter)->connected) {
            auto channel = *iter;

            Q_ASSERT(channel->requestedReadAmount >= message_length);

            // We received some data, so update the buffer and the amount of outstanding read requests
            channel->requestedReadAmount -= message_length;
            channel->read_buffer.append(std::move(data));

            mSocket->commitTransaction();
            // Indicate that the channel can read some bytes
            Q_EMIT channel->readyRead();
            return true;
        }
    } else if (message_type == MESSAGE_CLOSE_CHANNEL) {
        // The other endpoint wants to close a channel
        Q_ASSERT(message_length == 0);

        // Check if we haven't closed the channel in the meanwhile
        //    (note: different from the user's endpoint of a closed channel, since we might have outstanding buffers)
        auto iter = channels.find(message_uuid);
        if (iter != channels.end()) {
            auto channel = *iter;

            // We don't want signals anymore, since the channel is closed
            disconnect(channel.data(), nullptr, this, nullptr);
            removeChannel(message_uuid);
        }
    } else if (message_type == MESSAGE_PROTOCOL_VERSION) {
        // Checks for protocol compatibility
        Q_ASSERT(message_length >= 4);
        // Read the lowest & highest version supported (each 2 bytes, Big-Endian)
        uint16_t lowest_version = qFromBigEndian(&data.data()[0]);
        uint16_t highest_version = qFromBigEndian(&data.data()[2]);

        Q_ASSERT(lowest_version == 1);
        Q_ASSERT(highest_version >= 1);
        receivedProtocolVersion = true;
    } else {
        // Other message types are not supported
        Q_ASSERT(false);
    }

    mSocket->commitTransaction();
    return true;
}
This tries to parse a message of the given type, length, and data. If the message is successful, the function returns true. Otherwise, the function returns false.
QBluetoothUuid ConnectionMultiplexer::newChannel()
{
    // Create a random uuid
    QBluetoothUuid new_id(QUuid::createUuid());

    // Open the channel on the other endpoint
    QByteArray message(3, (char)0);
    message[0] = MESSAGE_OPEN_CHANNEL;
    qToBigEndian(0, &message.data()[1]);

    quint128 new_id_raw = new_id.toUInt128();
    message.append((const char *)new_id_raw.data, 16);
    to_write_bytes.append(message);

    // Add the channel ourselves
    addChannel(new_id);
    // Write the data
    bytesWritten();
    return new_id;
}
This creates a new uuid and opens a channel on the other end of the channel list, and writes the message to the write bytes buffer, and returns the new uuid. Finally, it returns the new uuid.
void ConnectionMultiplexer::addChannel(QBluetoothUuid new_id)
{
    MultiplexChannelState *channelState = new MultiplexChannelState();
    // Connect all channels queued, so that we have opportunities to combine read/write requests

    Q_ASSERT(unrequested_channels.size() <= 20);

    // Note that none of the channels knows its own uuid, so we have to add it ourselves
    connect(
        channelState,
        &MultiplexChannelState::readAvailable,
        this,
        [new_id, this]() {
            channelCanRead(new_id);
        },
        Qt::QueuedConnection);
    connect(
        channelState,
        &MultiplexChannelState::writeAvailable,
        this,
        [new_id, this]() {
            channelCanWrite(new_id);
        },
        Qt::QueuedConnection);
    connect(
        channelState,
        &MultiplexChannelState::requestClose,
        this,
        [new_id, this]() {
            closeChannel(new_id);
        },
        Qt::QueuedConnection);
    auto channelStatePtr = QSharedPointer{channelState};
    channels[new_id] = channelStatePtr;
    unrequested_channels[new_id] = new MultiplexChannel{channelStatePtr};
    // Immediately ask for data in this channel
    Q_EMIT channelStatePtr->readAvailable();
}
This adds a channel to the list of channels, so that it can read and write available. It also connects all channels queued so that it has opportunities to combine read and write requests.
std::unique_ptr ConnectionMultiplexer::getChannel(QBluetoothUuid channelId)
{
    auto iter = unrequested_channels.find(channelId);
    if (iter == unrequested_channels.end()) {
        return nullptr;
    } else if (!(*iter)->isOpen()) {
        // Delete the channel
        delete *iter;
        unrequested_channels.erase(iter);
        // Don't return closed channels
        return nullptr;
    } else {
        auto channel = *iter;
        unrequested_channels.erase(iter);
        return std::unique_ptr{channel};
    }
}
This returns a pointer to a channel object, or null if the channel is not found.
std::unique_ptr ConnectionMultiplexer::getDefaultChannel()
{
    return getChannel(QBluetoothUuid{QStringLiteral(DEFAULT_CHANNEL_UUID)});
}
Returns a pointer to the default channel.
void ConnectionMultiplexer::bytesWritten()
{
    if (to_write_bytes.size() > 0) {
        // If we have stuff to write, try to write it
        auto num_written = mSocket->write(to_write_bytes);
        if (num_written <= 0) {
            // On error: disconnected will be called later
            // On buffer full: will be retried later
            return;
        } else if (num_written == to_write_bytes.size()) {
            to_write_bytes.clear();
        } else {
            to_write_bytes.remove(0, num_written);
            return;
        }
    }
}
This method writes the contents of the to_write_bytes list to the socket if it has stuff to write, and removes the count if it is full.
void ConnectionMultiplexer::channelCanRead(QBluetoothUuid channelId)
{
    auto iter = channels.find(channelId);
    if (iter == channels.end())
        return;
    auto channel = *iter;

    // Check if we can request more data to read without overflowing the buffer
    if (channel->read_buffer.size() + channel->requestedReadAmount < channel->BUFFER_SIZE) {
        // Request the exact amount to fill up the buffer
        auto read_amount = channel->BUFFER_SIZE - channel->requestedReadAmount - channel->read_buffer.size();
        channel->requestedReadAmount += read_amount;

        // Send a MESSAGE_READ request for more data
        QByteArray message(3, (char)0);
        message[0] = MESSAGE_READ;
        qToBigEndian(2, &message.data()[1]);
        quint128 id_raw = channelId.toUInt128();
        message.append((const char *)id_raw.data, 16);
        message.append(2, 0);
        qToBigEndian(read_amount, &message.data()[19]);
        to_write_bytes.append(message);
        // Try to send it immediately
        bytesWritten();
    }
}
This requests that the channel with the given id can read more data. It checks if the channel has a reference to it, if it can find it, and if it can request more data to read, and if it can send a MESSAGE_READ request to the channel immediately.
void ConnectionMultiplexer::channelCanWrite(QBluetoothUuid channelId)
{
    auto iter = channels.find(channelId);
    if (iter == channels.end())
        return;
    auto channel = *iter;

    // Check if we can freely send data and we actually have some data
    if (channel->write_buffer.size() > 0 && channel->freeWriteAmount > 0) {
        // Figure out how much we can send now
        auto amount = qMin((int)channel->write_buffer.size(), channel->freeWriteAmount);
        QByteArray data = channel->write_buffer.left(amount);
        channel->write_buffer.remove(0, amount);
        channel->freeWriteAmount -= amount;

        // Send the data
        QByteArray message(3, (char)0);
        message[0] = MESSAGE_WRITE;
        qToBigEndian(amount, &message.data()[1]);

        quint128 id_raw = channelId.toUInt128();
        message.append((const char *)id_raw.data, 16);
        message.append(data);
        to_write_bytes.append(message);
        // Try to send it immediately
        bytesWritten();
        // Let the channel's users know that some data has been written
        Q_EMIT channel->bytesWritten(amount);

        // If the user previously asked to close the channel and we finally managed to write the buffer, actually close it
        if (channel->write_buffer.isEmpty() && channel->close_after_write) {
            closeChannel(channelId);
        }
    }
}
This sends a message to the channel if it exists, and if it can send some data, and if it can send it immediately, and close the channel if it doesn't have any data.
void ConnectionMultiplexer::closeChannel(QBluetoothUuid channelId)
{
    auto iter = channels.find(channelId);
    if (iter == channels.end())
        return;
    auto channel = *iter;

    // If the user wants to close a channel, then the user won't be reading from it anymore
    channel->read_buffer.clear();
    channel->close_after_write = true;

    // If there's still stuff to write, don't close it just yet
    if (!channel->write_buffer.isEmpty())
        return;
    channels.erase(iter);
    channel->connected = false;

    // Send the actual close channel message
    QByteArray message(3, (char)0);
    message[0] = MESSAGE_CLOSE_CHANNEL;
    qToBigEndian(0, &message.data()[1]);

    quint128 id_raw = channelId.toUInt128();
    message.append((const char *)id_raw.data, 16);
    to_write_bytes.append(message);
    // Try to send it immediately
    bytesWritten();
}
This sends a close message to the given channel id. It first retrieves the channel object of the given id, and if it doesn't exist, it creates one. Then it erases the channel object from the list of channels, sets the connected flag and marks it as closed. Then it sends the close message to the write buffer.
void ConnectionMultiplexer::removeChannel(QBluetoothUuid channelId)
{
    auto iter = channels.find(channelId);
    if (iter == channels.end())
        return;
    auto channel = *iter;

    // Remove the channel from the channel list
    channels.erase(iter);
    channel->connected = false;

    Q_EMIT channel->disconnected();
}
This removes the channel with the given id. It first retrieves the channel object of the given id, and if it exists, it removes it from the list, and marks it as disconnected.