1 /*
2 * libwebsockets - small server side websockets and web server implementation
3 *
4 * Copyright (C) 2019 - 2021 Andy Green <[email protected]>
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to
8 * deal in the Software without restriction, including without limitation the
9 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10 * sell copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22 * IN THE SOFTWARE.
23 */
24
25 #include <private-lib-core.h>
26
27 static void
secstream_mqtt_cleanup(lws_ss_handle_t * h)28 secstream_mqtt_cleanup(lws_ss_handle_t *h)
29 {
30 uint32_t i;
31
32 if (h->u.mqtt.heap_baggage) {
33 lws_free(h->u.mqtt.heap_baggage);
34 h->u.mqtt.heap_baggage = NULL;
35 }
36
37 if (h->u.mqtt.sub_info.topic) {
38 for (i = 0; i < h->u.mqtt.sub_info.num_topics; i++) {
39 if (h->u.mqtt.sub_info.topic[i].name) {
40 lws_free((void*)h->u.mqtt.sub_info.topic[i].name);
41 h->u.mqtt.sub_info.topic[i].name = NULL;
42 }
43 }
44 lws_free(h->u.mqtt.sub_info.topic);
45 h->u.mqtt.sub_info.topic = NULL;
46 }
47 }
48
49 static int
secstream_mqtt_subscribe(struct lws * wsi)50 secstream_mqtt_subscribe(struct lws *wsi)
51 {
52 size_t used_in, used_out, topic_limit;
53 lws_strexp_t exp;
54 char* expbuf;
55 lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
56
57 if (!h || !h->policy)
58 return -1;
59
60 if (h->policy->u.mqtt.aws_iot)
61 topic_limit = LWS_MQTT_MAX_AWSIOT_TOPICLEN;
62 else
63 topic_limit = LWS_MQTT_MAX_TOPICLEN;
64
65 if (!h->policy->u.mqtt.subscribe || wsi->mqtt->done_subscribe)
66 return 0;
67
68 lws_strexp_init(&exp, (void*)h, lws_ss_exp_cb_metadata, NULL,
69 topic_limit);
70 /*
71 * Expand with no output first to calculate the size of
72 * expanded string then, allocate new buffer and expand
73 * again with the buffer
74 */
75 if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe,
76 strlen(h->policy->u.mqtt.subscribe), &used_in,
77 &used_out) != LSTRX_DONE) {
78 lwsl_err(
79 "%s, failed to expand MQTT subscribe"
80 " topic with no output\n",
81 __func__);
82 return 1;
83 }
84
85 expbuf = lws_malloc(used_out + 1, __func__);
86 if (!expbuf) {
87 lwsl_err(
88 "%s, failed to allocate MQTT subscribe"
89 "topic",
90 __func__);
91 return 1;
92 }
93
94 lws_strexp_init(&exp, (void*)h, lws_ss_exp_cb_metadata, expbuf,
95 used_out + 1);
96
97 if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe,
98 strlen(h->policy->u.mqtt.subscribe), &used_in,
99 &used_out) != LSTRX_DONE) {
100 lwsl_err("%s, failed to expand MQTT subscribe topic\n",
101 __func__);
102 lws_free(expbuf);
103 return 1;
104 }
105 lwsl_notice("%s, expbuf - %s\n", __func__, expbuf);
106 h->u.mqtt.sub_top.name = expbuf;
107
108 /*
109 * The policy says to subscribe to something, and we
110 * haven't done it yet. Do it using the pre-prepared
111 * string-substituted version of the policy string.
112 */
113
114 lwsl_notice("%s: subscribing %s\n", __func__,
115 h->u.mqtt.sub_top.name);
116
117 h->u.mqtt.sub_top.qos = h->policy->u.mqtt.qos;
118 memset(&h->u.mqtt.sub_info, 0, sizeof(h->u.mqtt.sub_info));
119 h->u.mqtt.sub_info.num_topics = 1;
120 h->u.mqtt.sub_info.topic = &h->u.mqtt.sub_top;
121 h->u.mqtt.sub_info.topic =
122 lws_malloc(sizeof(lws_mqtt_topic_elem_t), __func__);
123 h->u.mqtt.sub_info.topic[0].name = lws_strdup(expbuf);
124 h->u.mqtt.sub_info.topic[0].qos = h->policy->u.mqtt.qos;
125
126 if (lws_mqtt_client_send_subcribe(wsi, &h->u.mqtt.sub_info)) {
127 lwsl_notice("%s: unable to subscribe", __func__);
128 lws_free(expbuf);
129 h->u.mqtt.sub_top.name = NULL;
130 return -1;
131 }
132 lws_free(expbuf);
133 h->u.mqtt.sub_top.name = NULL;
134
135 /* Expect a SUBACK */
136 if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) {
137 lwsl_err("%s: Unable to set LWS_POLLIN\n", __func__);
138 return -1;
139 }
140 return 0;
141 }
142
143 static int
secstream_mqtt_publish(struct lws * wsi,uint8_t * buf,size_t buflen,const char * topic,lws_mqtt_qos_levels_t qos,int f)144 secstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buflen,
145 const char* topic,
146 lws_mqtt_qos_levels_t qos, int f)
147 {
148 lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
149 size_t used_in, used_out, topic_limit;
150 lws_strexp_t exp;
151 char *expbuf;
152 lws_mqtt_publish_param_t mqpp;
153
154 if (h->policy->u.mqtt.aws_iot)
155 topic_limit = LWS_MQTT_MAX_AWSIOT_TOPICLEN;
156 else
157 topic_limit = LWS_MQTT_MAX_TOPICLEN;
158
159 memset(&mqpp, 0, sizeof(mqpp));
160
161 lws_strexp_init(&exp, h, lws_ss_exp_cb_metadata, NULL,
162 topic_limit);
163
164 if (lws_strexp_expand(&exp, topic, strlen(topic), &used_in,
165 &used_out) != LSTRX_DONE) {
166 lwsl_err("%s, failed to expand MQTT publish"
167 " topic with no output\n", __func__);
168 return 1;
169 }
170 expbuf = lws_malloc(used_out + 1, __func__);
171 if (!expbuf) {
172 lwsl_err("%s, failed to allocate MQTT publish topic",
173 __func__);
174 return 1;
175 }
176
177 lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, expbuf,
178 used_out + 1);
179
180 if (lws_strexp_expand(&exp, topic, strlen(topic), &used_in,
181 &used_out) != LSTRX_DONE) {
182 lws_free(expbuf);
183 return 1;
184 }
185 lwsl_notice("%s, expbuf - %s\n", __func__, expbuf);
186 mqpp.topic = (char *)expbuf;
187
188 mqpp.topic_len = (uint16_t)strlen(mqpp.topic);
189 mqpp.packet_id = (uint16_t)(h->txord - 1);
190 mqpp.payload = buf;
191 if (h->writeable_len)
192 mqpp.payload_len = (uint32_t)h->writeable_len;
193 else
194 mqpp.payload_len = (uint32_t)buflen;
195
196 lwsl_notice("%s: payload len %d\n", __func__,
197 (int)mqpp.payload_len);
198
199 mqpp.qos = h->policy->u.mqtt.qos;
200
201 if (lws_mqtt_client_send_publish(wsi, &mqpp,
202 (const char *)buf,
203 (uint32_t)buflen,
204 f & LWSSS_FLAG_EOM)) {
205 lwsl_notice("%s: failed to publish\n", __func__);
206 lws_free(expbuf);
207 return -1;
208 }
209 lws_free(expbuf);
210 return 0;
211 }
212
213 static int
secstream_mqtt(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)214 secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
215 void *in, size_t len)
216 {
217 lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
218 lws_mqtt_publish_param_t *pmqpp;
219 uint8_t buf[LWS_PRE + 1400];
220 lws_ss_state_return_t r;
221 size_t buflen = sizeof(buf) - LWS_PRE;
222 int f = 0;
223
224 switch (reason) {
225
226 /* because we are protocols[0] ... */
227 case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
228 lwsl_info("%s: CLIENT_CONNECTION_ERROR: %s\n", __func__,
229 in ? (char *)in : "(null)");
230 if (!h)
231 break;
232
233 #if defined(LWS_WITH_CONMON)
234 lws_conmon_ss_json(h);
235 #endif
236
237 r = lws_ss_event_helper(h, LWSSSCS_UNREACHABLE);
238 h->wsi = NULL;
239
240 secstream_mqtt_cleanup(h);
241
242 if (r == LWSSSSRET_DESTROY_ME)
243 return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
244
245 r = lws_ss_backoff(h);
246 if (r != LWSSSSRET_OK)
247 return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
248
249 break;
250
251 case LWS_CALLBACK_MQTT_CLIENT_CLOSED:
252 if (!h)
253 break;
254 lws_sul_cancel(&h->sul_timeout);
255 #if defined(LWS_WITH_CONMON)
256 lws_conmon_ss_json(h);
257 #endif
258 if (h->ss_dangling_connected)
259 r = lws_ss_event_helper(h, LWSSSCS_DISCONNECTED);
260 else
261 r = lws_ss_event_helper(h, LWSSSCS_UNREACHABLE);
262 if (h->wsi)
263 lws_set_opaque_user_data(h->wsi, NULL);
264 h->wsi = NULL;
265
266 secstream_mqtt_cleanup(h);
267
268 if (r)
269 return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
270
271 if (h->policy && !(h->policy->flags & LWSSSPOLF_OPPORTUNISTIC) &&
272 !h->txn_ok && !wsi->a.context->being_destroyed) {
273 r = lws_ss_backoff(h);
274 if (r != LWSSSSRET_OK)
275 return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
276 }
277 break;
278
279 case LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED:
280 /*
281 * Make sure the handle wsi points to the stream wsi not the
282 * original nwsi, in the case it was migrated
283 */
284 h->wsi = wsi;
285 h->retry = 0;
286 h->seqstate = SSSEQ_CONNECTED;
287
288 if (!h->policy->u.mqtt.subscribe ||
289 !h->policy->u.mqtt.subscribe[0]) {
290 /*
291 * If subscribe is empty in the policy, then,
292 * skip sending SUBSCRIBE and signal the user
293 * application.
294 */
295 wsi->mqtt->done_subscribe = 1;
296 } else if (!h->policy->u.mqtt.clean_start &&
297 wsi->mqtt->session_resumed) {
298 wsi->mqtt->inside_resume_session = 1;
299 /*
300 * If the previous session is resumed and Server has
301 * stored session, then, do not subscribe.
302 */
303 if (!secstream_mqtt_subscribe(wsi))
304 wsi->mqtt->done_subscribe = 1;
305 wsi->mqtt->inside_resume_session = 0;
306 } else if (h->policy->u.mqtt.subscribe &&
307 !wsi->mqtt->done_subscribe) {
308 /*
309 * If a subscribe is pending on the stream, then make
310 * sure the SUBSCRIBE is done before signaling the
311 * user application.
312 */
313 lws_callback_on_writable(wsi);
314 break;
315 }
316 lws_sul_cancel(&h->sul);
317 #if defined(LWS_WITH_SYS_METRICS)
318 /*
319 * If any hanging caliper measurement, dump it, and free any tags
320 */
321 lws_metrics_caliper_report_hist(h->cal_txn, (struct lws *)NULL);
322 #endif
323 r = lws_ss_event_helper(h, LWSSSCS_CONNECTED);
324 if (r != LWSSSSRET_OK)
325 return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
326 if (h->policy->u.mqtt.topic)
327 lws_callback_on_writable(wsi);
328 break;
329
330 case LWS_CALLBACK_MQTT_CLIENT_RX:
331 // lwsl_user("LWS_CALLBACK_CLIENT_RECEIVE: read %d\n", (int)len);
332 if (!h || !h->info.rx)
333 return 0;
334
335 pmqpp = (lws_mqtt_publish_param_t *)in;
336
337 f = 0;
338 if (!pmqpp->payload_pos)
339 f |= LWSSS_FLAG_SOM;
340 if (pmqpp->payload_pos + len == pmqpp->payload_len)
341 f |= LWSSS_FLAG_EOM;
342
343 h->subseq = 1;
344
345 r = h->info.rx(ss_to_userobj(h), (const uint8_t *)pmqpp->payload,
346 len, f);
347 if (r != LWSSSSRET_OK)
348 return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
349
350 return 0; /* don't passthru */
351
352 case LWS_CALLBACK_MQTT_SUBSCRIBED:
353 /*
354 * Stream demanded a subscribe while connecting, once
355 * done notify CONNECTED event to the application.
356 */
357 if (wsi->mqtt->done_subscribe == 0) {
358 lws_sul_cancel(&h->sul);
359 r = lws_ss_event_helper(h, LWSSSCS_CONNECTED);
360 if (r != LWSSSSRET_OK)
361 return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
362 }
363 wsi->mqtt->done_subscribe = 1;
364 lws_callback_on_writable(wsi);
365 break;
366
367 case LWS_CALLBACK_MQTT_ACK:
368 lws_sul_cancel(&h->sul_timeout);
369 if (wsi->mqtt->inside_birth) {
370 /*
371 * Skip LWSSSCS_QOS_ACK_REMOTE for birth topic.
372 */
373 wsi->mqtt->inside_birth = 0;
374 wsi->mqtt->done_birth = 1;
375 break;
376 }
377 r = lws_ss_event_helper(h, LWSSSCS_QOS_ACK_REMOTE);
378 if (r != LWSSSSRET_OK)
379 return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
380 break;
381
382 case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE:
383 {
384 if (!h || !h->info.tx)
385 return 0;
386 lwsl_notice("%s: %s: WRITEABLE\n", __func__, lws_ss_tag(h));
387
388 if (h->seqstate != SSSEQ_CONNECTED) {
389 lwsl_warn("%s: seqstate %d\n", __func__, h->seqstate);
390 break;
391 }
392
393 if (!wsi->mqtt->done_subscribe && h->policy->u.mqtt.subscribe)
394 return secstream_mqtt_subscribe(wsi);
395
396 if (!wsi->mqtt->done_birth && h->policy->u.mqtt.birth_topic) {
397 lws_strexp_t exp;
398 size_t used_in, used_out = 0;
399 if (h->policy->u.mqtt.birth_message) {
400 lws_strexp_init(&exp, h, lws_ss_exp_cb_metadata,
401 (char *)(buf + LWS_PRE), buflen);
402 if (lws_strexp_expand(&exp, h->policy->u.mqtt.birth_message,
403 strlen(h->policy->u.mqtt.birth_message),
404 &used_in, &used_out) != LSTRX_DONE) {
405 return 1;
406 }
407 }
408 wsi->mqtt->inside_birth = 1;
409 return secstream_mqtt_publish(wsi, buf + LWS_PRE,
410 used_out, h->policy->u.mqtt.birth_topic,
411 h->policy->u.mqtt.birth_qos, LWSSS_FLAG_EOM);
412 }
413 r = h->info.tx(ss_to_userobj(h), h->txord++, buf + LWS_PRE,
414 &buflen, &f);
415 if (r == LWSSSSRET_TX_DONT_SEND)
416 return 0;
417
418 if (r == LWSSSSRET_DISCONNECT_ME) {
419 lws_mqtt_subscribe_param_t lmsp;
420 if (h->u.mqtt.sub_info.num_topics) {
421 lmsp.num_topics = h->u.mqtt.sub_info.num_topics;
422 lmsp.topic = h->u.mqtt.sub_info.topic;
423 lmsp.packet_id = (uint16_t)(h->txord - 1);
424 if (lws_mqtt_client_send_unsubcribe(wsi,
425 &lmsp)) {
426 lwsl_err("%s, failed to send"
427 " MQTT unsubsribe", __func__);
428 return -1;
429 }
430 return 0;
431 }
432 }
433
434 if (r < 0)
435 return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
436
437 return secstream_mqtt_publish(wsi, buf + LWS_PRE, buflen,
438 h->policy->u.mqtt.topic,
439 h->policy->u.mqtt.qos, f);
440 }
441
442 case LWS_CALLBACK_MQTT_UNSUBSCRIBED:
443 {
444 struct lws *nwsi = lws_get_network_wsi(wsi);
445 if (nwsi && (nwsi->mux.child_count == 1))
446 lws_mqtt_client_send_disconnect(nwsi);
447 return -1;
448 }
449
450 case LWS_CALLBACK_MQTT_UNSUBSCRIBE_TIMEOUT:
451 if (wsi->mqtt->inside_unsubscribe) {
452 lwsl_warn("%s: %s: Unsubscribe timout.\n", __func__,
453 lws_ss_tag(h));
454 return -1;
455 }
456 break;
457
458 default:
459 break;
460 }
461
462 return lws_callback_http_dummy(wsi, reason, user, in, len);
463 }
464
465 const struct lws_protocols protocol_secstream_mqtt = {
466 "lws-secstream-mqtt",
467 secstream_mqtt,
468 0, 0, 0, NULL, 0
469 };
470 /*
471 * Munge connect info according to protocol-specific considerations... this
472 * usually means interpreting aux in a protocol-specific way and using the
473 * pieces at connection setup time, eg, http url pieces.
474 *
475 * len bytes of buf can be used for things with scope until after the actual
476 * connect.
477 *
478 * For ws, protocol aux is <url path>;<ws subprotocol name>
479 */
480
481 enum {
482 SSCMM_STRSUB_WILL_TOPIC,
483 SSCMM_STRSUB_WILL_MESSAGE,
484 SSCMM_STRSUB_SUBSCRIBE,
485 SSCMM_STRSUB_TOPIC,
486 SSCMM_STRSUB_BIRTH_TOPIC,
487 SSCMM_STRSUB_BIRTH_MESSAGE
488 };
489
490 static int
secstream_connect_munge_mqtt(lws_ss_handle_t * h,char * buf,size_t len,struct lws_client_connect_info * i,union lws_ss_contemp * ct)491 secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len,
492 struct lws_client_connect_info *i,
493 union lws_ss_contemp *ct)
494 {
495 const char *sources[6] = {
496 /* we're going to string-substitute these before use */
497 h->policy->u.mqtt.will_topic,
498 h->policy->u.mqtt.will_message,
499 h->policy->u.mqtt.subscribe,
500 h->policy->u.mqtt.topic,
501 h->policy->u.mqtt.birth_topic,
502 h->policy->u.mqtt.birth_message
503 };
504 size_t used_in, olen[6] = { 0, 0, 0, 0, 0, 0 }, tot = 0;
505 lws_strexp_t exp;
506 char *ps[6];
507 uint8_t *p = NULL;
508 int n = -1;
509 size_t blen;
510 lws_system_blob_t *b = NULL;
511
512 memset(&ct->ccp, 0, sizeof(ct->ccp));
513 b = lws_system_get_blob(i->context,
514 LWS_SYSBLOB_TYPE_MQTT_CLIENT_ID, 0);
515
516 /* If LWS_SYSBLOB_TYPE_MQTT_CLIENT_ID is set */
517 if (b && (blen = lws_system_blob_get_size(b))) {
518 if (blen > LWS_MQTT_MAX_CIDLEN) {
519 lwsl_err("%s - Client ID too long.\n",
520 __func__);
521 return -1;
522 }
523 p = (uint8_t *)lws_zalloc(blen+1, __func__);
524 if (!p)
525 return -1;
526 n = lws_system_blob_get(b, p, &blen, 0);
527 if (n) {
528 ct->ccp.client_id = NULL;
529 } else {
530 ct->ccp.client_id = (const char *)p;
531 lwsl_notice("%s - Client ID = %s\n",
532 __func__, ct->ccp.client_id);
533 }
534 } else {
535 /* Default (Random) client ID */
536 ct->ccp.client_id = NULL;
537 }
538
539 b = lws_system_get_blob(i->context,
540 LWS_SYSBLOB_TYPE_MQTT_USERNAME, 0);
541
542 /* If LWS_SYSBLOB_TYPE_MQTT_USERNAME is set */
543 if (b && (blen = lws_system_blob_get_size(b))) {
544 p = (uint8_t *)lws_zalloc(blen+1, __func__);
545 if (!p)
546 return -1;
547 n = lws_system_blob_get(b, p, &blen, 0);
548 if (n) {
549 ct->ccp.username = NULL;
550 } else {
551 ct->ccp.username = (const char *)p;
552 lwsl_notice("%s - Username ID = %s\n",
553 __func__, ct->ccp.username);
554 }
555 }
556
557 b = lws_system_get_blob(i->context,
558 LWS_SYSBLOB_TYPE_MQTT_PASSWORD, 0);
559
560 /* If LWS_SYSBLOB_TYPE_MQTT_PASSWORD is set */
561 if (b && (blen = lws_system_blob_get_size(b))) {
562 p = (uint8_t *)lws_zalloc(blen+1, __func__);
563 if (!p)
564 return -1;
565 n = lws_system_blob_get(b, p, &blen, 0);
566 if (n) {
567 ct->ccp.password = NULL;
568 } else {
569 ct->ccp.password = (const char *)p;
570 lwsl_notice("%s - Password ID = %s\n",
571 __func__, ct->ccp.password);
572 }
573 }
574
575 ct->ccp.keep_alive = h->policy->u.mqtt.keep_alive;
576 ct->ccp.clean_start = (h->policy->u.mqtt.clean_start & 1u);
577 ct->ccp.will_param.qos = h->policy->u.mqtt.will_qos;
578 ct->ccp.will_param.retain = h->policy->u.mqtt.will_retain;
579 ct->ccp.birth_param.qos = h->policy->u.mqtt.birth_qos;
580 ct->ccp.birth_param.retain = h->policy->u.mqtt.birth_retain;
581 ct->ccp.aws_iot = h->policy->u.mqtt.aws_iot;
582 h->u.mqtt.topic_qos.qos = h->policy->u.mqtt.qos;
583
584 /*
585 * We're going to string-substitute several of these parameters, which
586 * have unknown, possibly large size. And, as their usage is deferred
587 * inside the asynchronous lifetime of the MQTT connection, they need
588 * to live on the heap.
589 *
590 * Notice these allocations at h->u.mqtt.heap_baggage belong to the
591 * underlying MQTT stream lifetime, not the logical SS lifetime, and
592 * are destroyed if present at connection error or close of the
593 * underlying connection.
594 *
595 *
596 * First, compute the length of each without producing strsubst output,
597 * and keep a running total.
598 */
599
600 for (n = 0; n < (int)LWS_ARRAY_SIZE(sources); n++) {
601 if (!sources[n])
602 continue;
603
604 lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata,
605 NULL, (size_t)-1);
606 if (lws_strexp_expand(&exp, sources[n], strlen(sources[n]),
607 &used_in, &olen[n]) != LSTRX_DONE) {
608 lwsl_err("%s: failed to subsitute %s\n", __func__,
609 sources[n]);
610 return 1;
611 }
612 tot += olen[n] + 1;
613 }
614
615 /*
616 * Then, allocate enough space on the heap for the total of the
617 * substituted results
618 */
619
620 h->u.mqtt.heap_baggage = lws_malloc(tot, __func__);
621 if (!h->u.mqtt.heap_baggage)
622 return 1;
623
624 /*
625 * Finally, issue the subsitutions one after the other into the single
626 * allocated result buffer and prepare pointers into them
627 */
628
629 p = h->u.mqtt.heap_baggage;
630 for (n = 0; n < (int)LWS_ARRAY_SIZE(sources); n++) {
631 lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata,
632 (char *)p, (size_t)-1);
633 if (!sources[n]) {
634 ps[n] = NULL;
635 continue;
636 }
637 ps[n] = (char *)p;
638 if (lws_strexp_expand(&exp, sources[n], strlen(sources[n]),
639 &used_in, &olen[n]) != LSTRX_DONE)
640 return 1;
641
642 p += olen[n] + 1;
643 }
644
645 /*
646 * Point the guys who want the substituted content at the substituted
647 * strings
648 */
649
650 ct->ccp.will_param.topic = ps[SSCMM_STRSUB_WILL_TOPIC];
651 ct->ccp.will_param.message = ps[SSCMM_STRSUB_WILL_MESSAGE];
652 h->u.mqtt.subscribe_to = ps[SSCMM_STRSUB_SUBSCRIBE];
653 h->u.mqtt.subscribe_to_len = olen[SSCMM_STRSUB_SUBSCRIBE];
654 h->u.mqtt.topic_qos.name = ps[SSCMM_STRSUB_TOPIC];
655 ct->ccp.birth_param.topic = ps[SSCMM_STRSUB_BIRTH_TOPIC];
656 ct->ccp.birth_param.message = ps[SSCMM_STRSUB_BIRTH_MESSAGE];
657
658 i->method = "MQTT";
659 i->mqtt_cp = &ct->ccp;
660
661 i->alpn = "x-amzn-mqtt-ca";
662
663 /* share connections where possible */
664 i->ssl_connection |= LCCSCF_PIPELINE;
665
666 return 0;
667 }
668
669 const struct ss_pcols ss_pcol_mqtt = {
670 "MQTT",
671 "x-amzn-mqtt-ca", //"mqtt/3.1.1",
672 &protocol_secstream_mqtt,
673 secstream_connect_munge_mqtt,
674 NULL, NULL
675 };
676