summaryrefslogtreecommitdiff
path: root/libs/libmosquitto/src/send_publish.c
diff options
context:
space:
mode:
Diffstat (limited to 'libs/libmosquitto/src/send_publish.c')
-rw-r--r--libs/libmosquitto/src/send_publish.c215
1 files changed, 215 insertions, 0 deletions
diff --git a/libs/libmosquitto/src/send_publish.c b/libs/libmosquitto/src/send_publish.c
new file mode 100644
index 0000000000..f07c539d40
--- /dev/null
+++ b/libs/libmosquitto/src/send_publish.c
@@ -0,0 +1,215 @@
+/*
+Copyright (c) 2009-2019 Roger Light <roger@atchoo.org>
+
+All rights reserved. This program and the accompanying materials
+are made available under the terms of the Eclipse Public License v1.0
+and Eclipse Distribution License v1.0 which accompany this distribution.
+
+The Eclipse Public License is available at
+ http://www.eclipse.org/legal/epl-v10.html
+and the Eclipse Distribution License is available at
+ http://www.eclipse.org/org/documents/edl-v10.php.
+
+Contributors:
+ Roger Light - initial implementation and documentation.
+*/
+
+#include "config.h"
+
+#include <assert.h>
+#include <string.h>
+
+#ifdef WITH_BROKER
+# include "mosquitto_broker_internal.h"
+# include "sys_tree.h"
+#else
+# define G_PUB_BYTES_SENT_INC(A)
+#endif
+
+#include "mosquitto.h"
+#include "mosquitto_internal.h"
+#include "logging_mosq.h"
+#include "mqtt_protocol.h"
+#include "memory_mosq.h"
+#include "net_mosq.h"
+#include "packet_mosq.h"
+#include "property_mosq.h"
+#include "send_mosq.h"
+
+
+int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, const mosquitto_property *cmsg_props, const mosquitto_property *store_props, uint32_t expiry_interval)
+{
+#ifdef WITH_BROKER
+ size_t len;
+#ifdef WITH_BRIDGE
+ int i;
+ struct mosquitto__bridge_topic *cur_topic;
+ bool match;
+ int rc;
+ char *mapped_topic = NULL;
+ char *topic_temp = NULL;
+#endif
+#endif
+ assert(mosq);
+
+#if defined(WITH_BROKER) && defined(WITH_WEBSOCKETS)
+ if(mosq->sock == INVALID_SOCKET && !mosq->wsi) return MOSQ_ERR_NO_CONN;
+#else
+ if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
+#endif
+
+#ifdef WITH_BROKER
+ if(mosq->listener && mosq->listener->mount_point){
+ len = strlen(mosq->listener->mount_point);
+ if(len < strlen(topic)){
+ topic += len;
+ }else{
+ /* Invalid topic string. Should never happen, but silently swallow the message anyway. */
+ return MOSQ_ERR_SUCCESS;
+ }
+ }
+#ifdef WITH_BRIDGE
+ if(mosq->bridge && mosq->bridge->topics && mosq->bridge->topic_remapping){
+ for(i=0; i<mosq->bridge->topic_count; i++){
+ cur_topic = &mosq->bridge->topics[i];
+ if((cur_topic->direction == bd_both || cur_topic->direction == bd_out)
+ && (cur_topic->remote_prefix || cur_topic->local_prefix)){
+ /* Topic mapping required on this topic if the message matches */
+
+ rc = mosquitto_topic_matches_sub(cur_topic->local_topic, topic, &match);
+ if(rc){
+ return rc;
+ }
+ if(match){
+ mapped_topic = mosquitto__strdup(topic);
+ if(!mapped_topic) return MOSQ_ERR_NOMEM;
+ if(cur_topic->local_prefix){
+ /* This prefix needs removing. */
+ if(!strncmp(cur_topic->local_prefix, mapped_topic, strlen(cur_topic->local_prefix))){
+ topic_temp = mosquitto__strdup(mapped_topic+strlen(cur_topic->local_prefix));
+ mosquitto__free(mapped_topic);
+ if(!topic_temp){
+ return MOSQ_ERR_NOMEM;
+ }
+ mapped_topic = topic_temp;
+ }
+ }
+
+ if(cur_topic->remote_prefix){
+ /* This prefix needs adding. */
+ len = strlen(mapped_topic) + strlen(cur_topic->remote_prefix)+1;
+ topic_temp = mosquitto__malloc(len+1);
+ if(!topic_temp){
+ mosquitto__free(mapped_topic);
+ return MOSQ_ERR_NOMEM;
+ }
+ snprintf(topic_temp, len, "%s%s", cur_topic->remote_prefix, mapped_topic);
+ topic_temp[len] = '\0';
+ mosquitto__free(mapped_topic);
+ mapped_topic = topic_temp;
+ }
+ log__printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBLISH to %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", mosq->id, dup, qos, retain, mid, mapped_topic, (long)payloadlen);
+ G_PUB_BYTES_SENT_INC(payloadlen);
+ rc = send__real_publish(mosq, mid, mapped_topic, payloadlen, payload, qos, retain, dup, cmsg_props, store_props, expiry_interval);
+ mosquitto__free(mapped_topic);
+ return rc;
+ }
+ }
+ }
+ }
+#endif
+ log__printf(NULL, MOSQ_LOG_DEBUG, "Sending PUBLISH to %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", mosq->id, dup, qos, retain, mid, topic, (long)payloadlen);
+ G_PUB_BYTES_SENT_INC(payloadlen);
+#else
+ log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBLISH (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", mosq->id, dup, qos, retain, mid, topic, (long)payloadlen);
+#endif
+
+ return send__real_publish(mosq, mid, topic, payloadlen, payload, qos, retain, dup, cmsg_props, store_props, expiry_interval);
+}
+
+
+int send__real_publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, int qos, bool retain, bool dup, const mosquitto_property *cmsg_props, const mosquitto_property *store_props, uint32_t expiry_interval)
+{
+ struct mosquitto__packet *packet = NULL;
+ int packetlen;
+ int proplen = 0, varbytes;
+ int rc;
+ mosquitto_property expiry_prop;
+
+ assert(mosq);
+
+ if(topic){
+ packetlen = 2+strlen(topic) + payloadlen;
+ }else{
+ packetlen = 2 + payloadlen;
+ }
+ if(qos > 0) packetlen += 2; /* For message id */
+ if(mosq->protocol == mosq_p_mqtt5){
+ proplen = 0;
+ proplen += property__get_length_all(cmsg_props);
+ proplen += property__get_length_all(store_props);
+ if(expiry_interval > 0){
+ expiry_prop.next = NULL;
+ expiry_prop.value.i32 = expiry_interval;
+ expiry_prop.identifier = MQTT_PROP_MESSAGE_EXPIRY_INTERVAL;
+ expiry_prop.client_generated = false;
+
+ proplen += property__get_length_all(&expiry_prop);
+ }
+
+ varbytes = packet__varint_bytes(proplen);
+ if(varbytes > 4){
+ /* FIXME - Properties too big, don't publish any - should remove some first really */
+ cmsg_props = NULL;
+ store_props = NULL;
+ expiry_interval = 0;
+ }else{
+ packetlen += proplen + varbytes;
+ }
+ }
+ if(packet__check_oversize(mosq, packetlen)){
+#ifdef WITH_BROKER
+ log__printf(NULL, MOSQ_LOG_NOTICE, "Dropping too large outgoing PUBLISH for %s (%d bytes)", mosq->id, packetlen);
+#else
+ log__printf(NULL, MOSQ_LOG_NOTICE, "Dropping too large outgoing PUBLISH (%d bytes)", packetlen);
+#endif
+ return MOSQ_ERR_OVERSIZE_PACKET;
+ }
+
+ packet = mosquitto__calloc(1, sizeof(struct mosquitto__packet));
+ if(!packet) return MOSQ_ERR_NOMEM;
+
+ packet->mid = mid;
+ packet->command = CMD_PUBLISH | ((dup&0x1)<<3) | (qos<<1) | retain;
+ packet->remaining_length = packetlen;
+ rc = packet__alloc(packet);
+ if(rc){
+ mosquitto__free(packet);
+ return rc;
+ }
+ /* Variable header (topic string) */
+ if(topic){
+ packet__write_string(packet, topic, strlen(topic));
+ }else{
+ packet__write_uint16(packet, 0);
+ }
+ if(qos > 0){
+ packet__write_uint16(packet, mid);
+ }
+
+ if(mosq->protocol == mosq_p_mqtt5){
+ packet__write_varint(packet, proplen);
+ property__write_all(packet, cmsg_props, false);
+ property__write_all(packet, store_props, false);
+ if(expiry_interval > 0){
+ property__write_all(packet, &expiry_prop, false);
+ }
+ }
+
+ /* Payload */
+ if(payloadlen){
+ packet__write_bytes(packet, payload, payloadlen);
+ }
+
+ return packet__queue(mosq, packet);
+}