Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support publishing and receiving large messages. #193

Merged
merged 4 commits into from
May 14, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 25 additions & 7 deletions Adafruit_MQTT.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ uint16_t Adafruit_MQTT::readFullPacket(uint8_t *buffer, uint16_t maxsize,
// will read a packet and Do The Right Thing with length
uint8_t *pbuff = buffer;

uint8_t rlen;
uint16_t rlen;
Copy link
Author

@xdylanm xdylanm May 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Packets larger than 255 bytes would overflow rlen.

Copy link
Member

@brentru brentru May 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does solve an issue I was having today with a very large binary payload, thank you.

Correctly identify topic start position for >127byte messages (may also be addressed in PR166 -- happy to merge that fix in if preferred).

@flavio-fernandes Is this implemented in #166?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @brentru . No, this fix is related but not exactly what I was trying to address in #166 . The #166 was done to avoid corrupting memory should we attempt to
publish more data than what is available by MAXBUFFERSIZE. I think the function introduced in this PR (topicOffsetFromLenth) is what #166 introduces, so we definitely should combine that. @xdylanm how about you incorporate that into this pr? You can take the credit; I do not care for that. ;) Basically, we need this logic in addition to what you are doing here: d6f07ed#diff-a20445c19076bfb61fa2cf57b03325851600fc2dc25391f7e97f2de072fda081R701

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@flavio-fernandes in my own build, I originally had a simpler version of the length test
uint16_t const topicoffset = (len > 127 ? 1 : 0)
but I liked your function approach better, so I followed that pattern and credit to you =). If #166 is close to being merged, I'm happy to just rebase off of that when it's merged and update my PR. Per your note, you're trying to solve a different problem, which should certainly get resolved.

That said, I realized that this solution does not support packets > 65535 bytes since the lengths are all stored in uint16_t, and I'm still not sure what the right solution is there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood. Yes, you correctly point out the fact that we cannot go beyond 65535 since we are using uint16.

I will defer to @brentru on how we want to proceed here, but my opinion is to go ahead with PR/166 as is,
and then have you leverage the function packetAdditionalLen in PR/193 and potentially improve on the uint16 limitation. Sounds reasonable, @brentru ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds OK and I agree. - @xdylanm, see the comment below about using clang-format so we can approve this PR.


// read the packet type:
rlen = readPacket(pbuff, 1, timeout);
Expand Down Expand Up @@ -473,6 +473,22 @@ Adafruit_MQTT_Subscribe *Adafruit_MQTT::readSubscription(int16_t timeout) {
return handleSubscriptionPacket(len);
}

namespace {

uint16_t topicOffsetFromLength(uint16_t const len)
{
if (len < 128) { // 7 bits (+1 continuation bit)
return 0;
} else if (len < 16384) { // 14 bits (+2 continuation bits)
return 1;
} else if (len < 2097152) { // 21 bits
return 2;
}
return 3;
}

} // anon

Adafruit_MQTT_Subscribe *Adafruit_MQTT::handleSubscriptionPacket(uint16_t len) {
uint16_t i, topiclen, datalen;

Expand All @@ -491,7 +507,9 @@ Adafruit_MQTT_Subscribe *Adafruit_MQTT::handleSubscriptionPacket(uint16_t len) {
}

// Parse out length of packet.
topiclen = buffer[3];
uint16_t const topicoffset = topicOffsetFromLength(len);
uint16_t const topicstart = topicoffset + 4;
topiclen = buffer[3+topicoffset];
DEBUG_PRINT(F("Looking for subscription len "));
DEBUG_PRINTLN(topiclen);

Expand All @@ -504,7 +522,7 @@ Adafruit_MQTT_Subscribe *Adafruit_MQTT::handleSubscriptionPacket(uint16_t len) {
continue;
// Stop if the subscription topic matches the received topic. Be careful
// to make comparison case insensitive.
if (strncasecmp((char *)buffer + 4, subscriptions[i]->topic, topiclen) ==
if (strncasecmp((char *)buffer + topicstart, subscriptions[i]->topic, topiclen) ==
0) {
DEBUG_PRINT(F("Found sub #"));
DEBUG_PRINTLN(i);
Expand All @@ -520,20 +538,20 @@ Adafruit_MQTT_Subscribe *Adafruit_MQTT::handleSubscriptionPacket(uint16_t len) {
// Check if it is QoS 1, TODO: we dont support QoS 2
if ((buffer[0] & 0x6) == 0x2) {
packet_id_len = 2;
packetid = buffer[topiclen + 4];
packetid = buffer[topiclen + topicstart];
packetid <<= 8;
packetid |= buffer[topiclen + 5];
packetid |= buffer[topiclen + topicstart + 1];
}

// zero out the old data
memset(subscriptions[i]->lastread, 0, SUBSCRIPTIONDATALEN);

datalen = len - topiclen - packet_id_len - 4;
datalen = len - topiclen - packet_id_len - topicstart;
if (datalen > SUBSCRIPTIONDATALEN) {
datalen = SUBSCRIPTIONDATALEN - 1; // cut it off
}
// extract out just the data, into the subscription object itself
memmove(subscriptions[i]->lastread, buffer + 4 + topiclen + packet_id_len,
memmove(subscriptions[i]->lastread, buffer + topicstart + topiclen + packet_id_len,
datalen);
subscriptions[i]->datalen = datalen;
DEBUG_PRINT(F("Data len: "));
Expand Down
7 changes: 4 additions & 3 deletions Adafruit_MQTT_Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,19 @@ uint16_t Adafruit_MQTT_Client::readPacket(uint8_t *buffer, uint16_t maxlen,

bool Adafruit_MQTT_Client::sendPacket(uint8_t *buffer, uint16_t len) {
uint16_t ret = 0;

uint16_t offset = 0;
while (len > 0) {
if (client->connected()) {
// send 250 bytes at most at a time, can adjust this later based on Client

uint16_t sendlen = len > 250 ? 250 : len;
// Serial.print("Sending: "); Serial.println(sendlen);
ret = client->write(buffer, sendlen);
ret = client->write(buffer + offset, sendlen);
DEBUG_PRINT(F("Client sendPacket returned: "));
DEBUG_PRINTLN(ret);
len -= ret;

offset += ret;

if (ret != sendlen) {
DEBUG_PRINTLN("Failed to send packet.");
return false;
Expand Down