MQTT 开源代理mosquitto的网络层封装相当sucks


最近学习MQTT协议,选择了当前比较流行的MQTT Broker “mosquitto”,但是在阅读代码过程中发现其网络底层库封装的相当差劲。

对于MQTT协议的变长头长度的读取上,基本上采取每次一个byte的方式进行读取判断,对于系统调用read的高代价来讲,真的是相当的浪费,也难怪其不能作为高并发的服务器进行处理。 

 

当然mosquitto需要优化的地方还很多:

1. 使用poll而不是使用epoll (可能是处于跨平台考虑,如果linux下可以使用epoll替换),同时的就是刚才提到的 byte 读取网络数据

2. 订阅树的管理上,对于大量的请求断开或者重练效率比较低

3. 空闲空间管理机制优化和数据包发送方式的修改

4. 内存管理上malloc new 没有使用mem pool机制,在大并发情况下,内存管理容易出现问题

5. 锁遍地飞,如果采用reactor_ 

但是从另一个方面讲,mosquitto作为开源的实现,思路上还是比较清晰,为mqtt服务器开发提供了比较完备的参考,这也就是它的价值所在了。

 

#ifdef WITH_BROKER

int _mosquitto_packet_read(struct mosquitto_db *db, struct mosquitto *mosq)

#else

int _mosquitto_packet_read(struct mosquitto *mosq)

#endif

{

    uint8_t byte;

    ssize_t read_length;

    int rc = 0;

    

    if(!mosq) return MOSQ_ERR_INVAL;

    if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;

    if(mosq->state == mosq_cs_connect_pending){

        return MOSQ_ERR_SUCCESS;

    }

    

    /* This gets called if pselect() indicates that there is network data

     * available - ie. at least one byte.  What we do depends on what data we

     * already have.

     * If we've not got a command, attempt to read one and save it. This should

     * always work because it's only a single byte.

     * Then try to read the remaining length. This may fail because it is may

     * be more than one byte - will need to save data pending next read if it

     * does fail.

     * Then try to read the remaining payload, where 'payload' here means the

     * combined variable header and actual payload. This is the most likely to

     * fail due to longer length, so save current data and current position.

     * After all data is read, send to _mosquitto_handle_packet() to deal with.

     * Finally, free the memory and reset everything to starting conditions.

     */

    if(!mosq->in_packet.command){

        read_length = _mosquitto_net_read(mosq, &byte, 1);

        if(read_length == 1){

            mosq->in_packet.command = byte;

#ifdef WITH_BROKER

#  ifdef WITH_SYS_TREE

            g_bytes_received++;

#  endif

            /* Clients must send CONNECT as their first command. */

            if(!(mosq->bridge) && mosq->state == mosq_cs_new && (byte&0xF0) != CONNECT) return MOSQ_ERR_PROTOCOL;

#endif

        }else{

            if(read_length == 0) return MOSQ_ERR_CONN_LOST; /* EOF */

#ifdef WIN32

            errno = WSAGetLastError();

#endif

            if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){

                return MOSQ_ERR_SUCCESS;

            }else{

                switch(errno){

                    case COMPAT_ECONNRESET:

                        return MOSQ_ERR_CONN_LOST;

                    default:

                        return MOSQ_ERR_ERRNO;

                }

            }

        }

    }

    /* remaining_count is the number of bytes that the remaining_length

     * parameter occupied in this incoming packet. We don't use it here as such

     * (it is used when allocating an outgoing packet), but we must be able to

     * determine whether all of the remaining_length parameter has been read.

     * remaining_count has three states here:

     *   0 means that we haven't read any remaining_length bytes

     *   <0 means we have read some remaining_length bytes but haven't finished

     *   >0 means we have finished reading the remaining_length bytes.

     */

    if(mosq->in_packet.remaining_count <= 0){

        do{

            read_length = _mosquitto_net_read(mosq, &byte, 1);

            if(read_length == 1){

                mosq->in_packet.remaining_count--;

                /* Max 4 bytes length for remaining length as defined by protocol.

                 * Anything more likely means a broken/malicious client.

                 */

                if(mosq->in_packet.remaining_count < -4) return MOSQ_ERR_PROTOCOL;

                

#if defined(WITH_BROKER) && defined(WITH_SYS_TREE)

                g_bytes_received++;

#endif

                mosq->in_packet.remaining_length += (byte & 127) * mosq->in_packet.remaining_mult;

                mosq->in_packet.remaining_mult *= 128;

            }else{

                if(read_length == 0) return MOSQ_ERR_CONN_LOST; /* EOF */

#ifdef WIN32

                errno = WSAGetLastError();

#endif

                if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){

                    return MOSQ_ERR_SUCCESS;

                }else{

                    switch(errno){

                        case COMPAT_ECONNRESET:

                            return MOSQ_ERR_CONN_LOST;

                        default:

                            return MOSQ_ERR_ERRNO;

                    }

                }

            }

        }while((byte & 128) != 0);

        /* We have finished reading remaining_length, so make remaining_count

         * positive. */

        mosq->in_packet.remaining_count *= -1;

        

        if(mosq->in_packet.remaining_length > 0){

            mosq->in_packet.payload = _mosquitto_malloc(mosq->in_packet.remaining_length*sizeof(uint8_t));

            if(!mosq->in_packet.payload) return MOSQ_ERR_NOMEM;

            mosq->in_packet.to_process = mosq->in_packet.remaining_length;

        }

    }

    while(mosq->in_packet.to_process>0){

        read_length = _mosquitto_net_read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process);

        if(read_length > 0){

#if defined(WITH_BROKER) && defined(WITH_SYS_TREE)

            g_bytes_received += read_length;

#endif

            mosq->in_packet.to_process -= read_length;

            mosq->in_packet.pos += read_length;

        }else{

#ifdef WIN32

            errno = WSAGetLastError();

#endif

            if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){

                if(mosq->in_packet.to_process > 1000){

                    /* Update last_msg_in time if more than 1000 bytes left to

                     * receive. Helps when receiving large messages.

                     * This is an arbitrary limit, but with some consideration.

                     * If a client can't send 1000 bytes in a second it

                     * probably shouldn't be using a 1 second keep alive. */

                    pthread_mutex_lock(&mosq->msgtime_mutex);

                    mosq->last_msg_in = mosquitto_time();

                    pthread_mutex_unlock(&mosq->msgtime_mutex);

                }

                return MOSQ_ERR_SUCCESS;

            }else{

                switch(errno){

                    case COMPAT_ECONNRESET:

                        return MOSQ_ERR_CONN_LOST;

                    default:

                        return MOSQ_ERR_ERRNO;

                }

            }

        }

    }

    

    /* All data for this packet is read. */

    mosq->in_packet.pos = 0;

#ifdef WITH_BROKER

#  ifdef WITH_SYS_TREE

    g_msgs_received++;

    if(((mosq->in_packet.command)&0xF5) == PUBLISH){

        g_pub_msgs_received++;

    }

#  endif

    rc = mqtt3_packet_handle(db, mosq);

#else

    rc = _mosquitto_packet_handle(mosq);

#endif

    

    /* Free data and reset values */

    _mosquitto_packet_cleanup(&mosq->in_packet);

    

    pthread_mutex_lock(&mosq->msgtime_mutex);

    mosq->last_msg_in = mosquitto_time();

    pthread_mutex_unlock(&mosq->msgtime_mutex);

    return rc;

}

相关