xref: /aosp_15_r20/external/libwebsockets/lib/secure-streams/protocols/ss-mqtt.c (revision 1c60b9aca93fdbc9b5f19b2d2194c91294b22281)
1 /*
2  * libwebsockets - small server side websockets and web server implementation
3  *
4  * Copyright (C) 2019 - 2021 Andy Green <[email protected]>
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to
8  * deal in the Software without restriction, including without limitation the
9  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10  * sell copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22  * IN THE SOFTWARE.
23  */
24 
25 #include <private-lib-core.h>
26 
27 static void
secstream_mqtt_cleanup(lws_ss_handle_t * h)28 secstream_mqtt_cleanup(lws_ss_handle_t *h)
29 {
30 	uint32_t i;
31 
32 	if (h->u.mqtt.heap_baggage) {
33 		lws_free(h->u.mqtt.heap_baggage);
34 		h->u.mqtt.heap_baggage = NULL;
35 	}
36 
37 	if (h->u.mqtt.sub_info.topic) {
38 		for (i = 0; i < h->u.mqtt.sub_info.num_topics; i++) {
39 			if (h->u.mqtt.sub_info.topic[i].name) {
40 				lws_free((void*)h->u.mqtt.sub_info.topic[i].name);
41 				h->u.mqtt.sub_info.topic[i].name = NULL;
42 			}
43 		}
44 		lws_free(h->u.mqtt.sub_info.topic);
45 		h->u.mqtt.sub_info.topic = NULL;
46 	}
47 }
48 
49 static int
secstream_mqtt_subscribe(struct lws * wsi)50 secstream_mqtt_subscribe(struct lws *wsi)
51 {
52 	size_t used_in, used_out, topic_limit;
53 	lws_strexp_t exp;
54 	char* expbuf;
55 	lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
56 
57 	if (!h || !h->policy)
58 		return -1;
59 
60 	if (h->policy->u.mqtt.aws_iot)
61 		topic_limit = LWS_MQTT_MAX_AWSIOT_TOPICLEN;
62 	else
63 		topic_limit = LWS_MQTT_MAX_TOPICLEN;
64 
65 	if (!h->policy->u.mqtt.subscribe || wsi->mqtt->done_subscribe)
66 		return 0;
67 
68 	lws_strexp_init(&exp, (void*)h, lws_ss_exp_cb_metadata, NULL,
69 			topic_limit);
70 	/*
71 	 * Expand with no output first to calculate the size of
72 	 * expanded string then, allocate new buffer and expand
73 	 * again with the buffer
74 	 */
75 	if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe,
76 			      strlen(h->policy->u.mqtt.subscribe), &used_in,
77 			      &used_out) != LSTRX_DONE) {
78 		lwsl_err(
79 			"%s, failed to expand MQTT subscribe"
80 			" topic with no output\n",
81 			__func__);
82 		return 1;
83 	}
84 
85 	expbuf = lws_malloc(used_out + 1, __func__);
86 	if (!expbuf) {
87 		lwsl_err(
88 			 "%s, failed to allocate MQTT subscribe"
89 			 "topic",
90 			 __func__);
91 		return 1;
92 	}
93 
94 	lws_strexp_init(&exp, (void*)h, lws_ss_exp_cb_metadata, expbuf,
95 			used_out + 1);
96 
97 	if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe,
98 			      strlen(h->policy->u.mqtt.subscribe), &used_in,
99 			      &used_out) != LSTRX_DONE) {
100 		lwsl_err("%s, failed to expand MQTT subscribe topic\n",
101 			 __func__);
102 		lws_free(expbuf);
103 		return 1;
104 	}
105 	lwsl_notice("%s, expbuf - %s\n", __func__, expbuf);
106 	h->u.mqtt.sub_top.name = expbuf;
107 
108 	/*
109 	 * The policy says to subscribe to something, and we
110 	 * haven't done it yet.  Do it using the pre-prepared
111 	 * string-substituted version of the policy string.
112 	 */
113 
114 	lwsl_notice("%s: subscribing %s\n", __func__,
115 		    h->u.mqtt.sub_top.name);
116 
117 	h->u.mqtt.sub_top.qos = h->policy->u.mqtt.qos;
118 	memset(&h->u.mqtt.sub_info, 0, sizeof(h->u.mqtt.sub_info));
119 	h->u.mqtt.sub_info.num_topics = 1;
120 	h->u.mqtt.sub_info.topic = &h->u.mqtt.sub_top;
121 	h->u.mqtt.sub_info.topic =
122 			    lws_malloc(sizeof(lws_mqtt_topic_elem_t), __func__);
123 	h->u.mqtt.sub_info.topic[0].name = lws_strdup(expbuf);
124 	h->u.mqtt.sub_info.topic[0].qos = h->policy->u.mqtt.qos;
125 
126 	if (lws_mqtt_client_send_subcribe(wsi, &h->u.mqtt.sub_info)) {
127 		lwsl_notice("%s: unable to subscribe", __func__);
128 		lws_free(expbuf);
129 		h->u.mqtt.sub_top.name = NULL;
130 		return -1;
131 	}
132 	lws_free(expbuf);
133 	h->u.mqtt.sub_top.name = NULL;
134 
135 	/* Expect a SUBACK */
136 	if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) {
137 		lwsl_err("%s: Unable to set LWS_POLLIN\n", __func__);
138 		return -1;
139 	}
140 	return 0;
141 }
142 
143 static int
secstream_mqtt_publish(struct lws * wsi,uint8_t * buf,size_t buflen,const char * topic,lws_mqtt_qos_levels_t qos,int f)144 secstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buflen,
145 			const char* topic,
146 			lws_mqtt_qos_levels_t qos,  int f)
147 {
148 	lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
149 	size_t used_in, used_out, topic_limit;
150 	lws_strexp_t exp;
151 	char *expbuf;
152 	lws_mqtt_publish_param_t mqpp;
153 
154 	if (h->policy->u.mqtt.aws_iot)
155 		topic_limit = LWS_MQTT_MAX_AWSIOT_TOPICLEN;
156 	else
157 		topic_limit = LWS_MQTT_MAX_TOPICLEN;
158 
159 	memset(&mqpp, 0, sizeof(mqpp));
160 
161 	lws_strexp_init(&exp, h, lws_ss_exp_cb_metadata, NULL,
162 			topic_limit);
163 
164 	if (lws_strexp_expand(&exp, topic, strlen(topic), &used_in,
165 			      &used_out) != LSTRX_DONE) {
166 		lwsl_err("%s, failed to expand MQTT publish"
167 			 " topic with no output\n", __func__);
168 		return 1;
169 	}
170 	expbuf = lws_malloc(used_out + 1, __func__);
171 	if (!expbuf) {
172 		lwsl_err("%s, failed to allocate MQTT publish topic",
173 			  __func__);
174 		return 1;
175 	}
176 
177 	lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, expbuf,
178 			used_out + 1);
179 
180 	if (lws_strexp_expand(&exp, topic, strlen(topic), &used_in,
181 			      &used_out) != LSTRX_DONE) {
182 		lws_free(expbuf);
183 		return 1;
184 	}
185 	lwsl_notice("%s, expbuf - %s\n", __func__, expbuf);
186 	mqpp.topic = (char *)expbuf;
187 
188 	mqpp.topic_len = (uint16_t)strlen(mqpp.topic);
189 	mqpp.packet_id = (uint16_t)(h->txord - 1);
190 	mqpp.payload = buf;
191 	if (h->writeable_len)
192 		mqpp.payload_len = (uint32_t)h->writeable_len;
193 	else
194 		mqpp.payload_len = (uint32_t)buflen;
195 
196 	lwsl_notice("%s: payload len %d\n", __func__,
197 		    (int)mqpp.payload_len);
198 
199 	mqpp.qos = h->policy->u.mqtt.qos;
200 
201 	if (lws_mqtt_client_send_publish(wsi, &mqpp,
202 					 (const char *)buf,
203 					 (uint32_t)buflen,
204 					 f & LWSSS_FLAG_EOM)) {
205 		lwsl_notice("%s: failed to publish\n", __func__);
206 		lws_free(expbuf);
207 		return -1;
208 	}
209 	lws_free(expbuf);
210 	return 0;
211 }
212 
213 static int
secstream_mqtt(struct lws * wsi,enum lws_callback_reasons reason,void * user,void * in,size_t len)214 secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
215 	     void *in, size_t len)
216 {
217 	lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
218 	lws_mqtt_publish_param_t *pmqpp;
219 	uint8_t buf[LWS_PRE + 1400];
220 	lws_ss_state_return_t r;
221 	size_t buflen = sizeof(buf) - LWS_PRE;
222 	int f = 0;
223 
224 	switch (reason) {
225 
226 	/* because we are protocols[0] ... */
227 	case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
228 		lwsl_info("%s: CLIENT_CONNECTION_ERROR: %s\n", __func__,
229 			 in ? (char *)in : "(null)");
230 		if (!h)
231 			break;
232 
233 #if defined(LWS_WITH_CONMON)
234 		lws_conmon_ss_json(h);
235 #endif
236 
237 		r = lws_ss_event_helper(h, LWSSSCS_UNREACHABLE);
238 		h->wsi = NULL;
239 
240 		secstream_mqtt_cleanup(h);
241 
242 		if (r == LWSSSSRET_DESTROY_ME)
243 			return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
244 
245 		r = lws_ss_backoff(h);
246 		if (r != LWSSSSRET_OK)
247 			return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
248 
249 		break;
250 
251 	case LWS_CALLBACK_MQTT_CLIENT_CLOSED:
252 		if (!h)
253 			break;
254 		lws_sul_cancel(&h->sul_timeout);
255 #if defined(LWS_WITH_CONMON)
256 		lws_conmon_ss_json(h);
257 #endif
258 		if (h->ss_dangling_connected)
259 			r = lws_ss_event_helper(h, LWSSSCS_DISCONNECTED);
260 		else
261 			r = lws_ss_event_helper(h, LWSSSCS_UNREACHABLE);
262 		if (h->wsi)
263 			lws_set_opaque_user_data(h->wsi, NULL);
264 		h->wsi = NULL;
265 
266 		secstream_mqtt_cleanup(h);
267 
268 		if (r)
269 			return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
270 
271 		if (h->policy && !(h->policy->flags & LWSSSPOLF_OPPORTUNISTIC) &&
272 		    !h->txn_ok && !wsi->a.context->being_destroyed) {
273 			r = lws_ss_backoff(h);
274 			if (r != LWSSSSRET_OK)
275 				return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
276 		}
277 		break;
278 
279 	case LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED:
280 		/*
281 		 * Make sure the handle wsi points to the stream wsi not the
282 		 * original nwsi, in the case it was migrated
283 		 */
284 		h->wsi = wsi;
285 		h->retry = 0;
286 		h->seqstate = SSSEQ_CONNECTED;
287 
288 		if (!h->policy->u.mqtt.subscribe ||
289 		    !h->policy->u.mqtt.subscribe[0]) {
290 			/*
291 			 * If subscribe is empty in the policy, then,
292 			 * skip sending SUBSCRIBE and signal the user
293 			 * application.
294 			 */
295 			wsi->mqtt->done_subscribe = 1;
296 		} else if (!h->policy->u.mqtt.clean_start &&
297 			   wsi->mqtt->session_resumed) {
298 			wsi->mqtt->inside_resume_session = 1;
299 			/*
300 			 * If the previous session is resumed and Server has
301 			 * stored session, then, do not subscribe.
302 			 */
303 			if (!secstream_mqtt_subscribe(wsi))
304 				wsi->mqtt->done_subscribe = 1;
305 			wsi->mqtt->inside_resume_session = 0;
306 		} else if (h->policy->u.mqtt.subscribe &&
307 			   !wsi->mqtt->done_subscribe) {
308 			/*
309 			 * If a subscribe is pending on the stream, then make
310 			 * sure the SUBSCRIBE is done before signaling the
311 			 * user application.
312 			 */
313 			lws_callback_on_writable(wsi);
314 			break;
315 		}
316 		lws_sul_cancel(&h->sul);
317 #if defined(LWS_WITH_SYS_METRICS)
318 		/*
319 		 * If any hanging caliper measurement, dump it, and free any tags
320 		 */
321 		lws_metrics_caliper_report_hist(h->cal_txn, (struct lws *)NULL);
322 #endif
323 		r = lws_ss_event_helper(h, LWSSSCS_CONNECTED);
324 		if (r != LWSSSSRET_OK)
325 			return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
326 		if (h->policy->u.mqtt.topic)
327 			lws_callback_on_writable(wsi);
328 		break;
329 
330 	case LWS_CALLBACK_MQTT_CLIENT_RX:
331 		// lwsl_user("LWS_CALLBACK_CLIENT_RECEIVE: read %d\n", (int)len);
332 		if (!h || !h->info.rx)
333 			return 0;
334 
335 		pmqpp = (lws_mqtt_publish_param_t *)in;
336 
337 		f = 0;
338 		if (!pmqpp->payload_pos)
339 			f |= LWSSS_FLAG_SOM;
340 		if (pmqpp->payload_pos + len == pmqpp->payload_len)
341 			f |= LWSSS_FLAG_EOM;
342 
343 		h->subseq = 1;
344 
345 		r = h->info.rx(ss_to_userobj(h), (const uint8_t *)pmqpp->payload,
346 			   len, f);
347 		if (r != LWSSSSRET_OK)
348 			return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
349 
350 		return 0; /* don't passthru */
351 
352 	case LWS_CALLBACK_MQTT_SUBSCRIBED:
353 		/*
354 		 * Stream demanded a subscribe while connecting, once
355 		 * done notify CONNECTED event to the application.
356 		 */
357 		if (wsi->mqtt->done_subscribe == 0) {
358 			lws_sul_cancel(&h->sul);
359 			r = lws_ss_event_helper(h, LWSSSCS_CONNECTED);
360 			if (r != LWSSSSRET_OK)
361 				return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
362 		}
363 		wsi->mqtt->done_subscribe = 1;
364 		lws_callback_on_writable(wsi);
365 		break;
366 
367 	case LWS_CALLBACK_MQTT_ACK:
368 		lws_sul_cancel(&h->sul_timeout);
369 		if (wsi->mqtt->inside_birth) {
370 			/*
371 			 * Skip LWSSSCS_QOS_ACK_REMOTE for birth topic.
372 			 */
373 			wsi->mqtt->inside_birth = 0;
374 			wsi->mqtt->done_birth = 1;
375 			break;
376 		}
377 		r = lws_ss_event_helper(h, LWSSSCS_QOS_ACK_REMOTE);
378 		if (r != LWSSSSRET_OK)
379 			return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
380 		break;
381 
382 	case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE:
383 	{
384 		if (!h || !h->info.tx)
385 			return 0;
386 		lwsl_notice("%s: %s: WRITEABLE\n", __func__, lws_ss_tag(h));
387 
388 		if (h->seqstate != SSSEQ_CONNECTED) {
389 			lwsl_warn("%s: seqstate %d\n", __func__, h->seqstate);
390 			break;
391 		}
392 
393 		if (!wsi->mqtt->done_subscribe && h->policy->u.mqtt.subscribe)
394 			return secstream_mqtt_subscribe(wsi);
395 
396 		if (!wsi->mqtt->done_birth && h->policy->u.mqtt.birth_topic) {
397 			lws_strexp_t exp;
398 			size_t used_in, used_out = 0;
399 			if (h->policy->u.mqtt.birth_message) {
400 				lws_strexp_init(&exp, h, lws_ss_exp_cb_metadata,
401 						(char *)(buf + LWS_PRE), buflen);
402 				if (lws_strexp_expand(&exp, h->policy->u.mqtt.birth_message,
403 						      strlen(h->policy->u.mqtt.birth_message),
404 						      &used_in, &used_out) != LSTRX_DONE) {
405 					return 1;
406 				}
407 			}
408 			wsi->mqtt->inside_birth = 1;
409 			return secstream_mqtt_publish(wsi, buf + LWS_PRE,
410 					used_out, h->policy->u.mqtt.birth_topic,
411 					h->policy->u.mqtt.birth_qos, LWSSS_FLAG_EOM);
412 		}
413 		r = h->info.tx(ss_to_userobj(h),  h->txord++,  buf + LWS_PRE,
414 			       &buflen, &f);
415 		if (r == LWSSSSRET_TX_DONT_SEND)
416 			return 0;
417 
418 		if (r == LWSSSSRET_DISCONNECT_ME) {
419 			lws_mqtt_subscribe_param_t lmsp;
420 			if (h->u.mqtt.sub_info.num_topics) {
421 				lmsp.num_topics = h->u.mqtt.sub_info.num_topics;
422 				lmsp.topic = h->u.mqtt.sub_info.topic;
423 				lmsp.packet_id = (uint16_t)(h->txord - 1);
424 				if (lws_mqtt_client_send_unsubcribe(wsi,
425 								    &lmsp)) {
426 					lwsl_err("%s, failed to send"
427 					         " MQTT unsubsribe", __func__);
428 					return -1;
429 				}
430 				return 0;
431 			}
432 		}
433 
434 		if (r < 0)
435 			return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
436 
437 		return secstream_mqtt_publish(wsi, buf + LWS_PRE, buflen,
438 					      h->policy->u.mqtt.topic,
439 					      h->policy->u.mqtt.qos, f);
440 	}
441 
442 	case LWS_CALLBACK_MQTT_UNSUBSCRIBED:
443 	{
444 		struct lws *nwsi = lws_get_network_wsi(wsi);
445 		if (nwsi && (nwsi->mux.child_count == 1))
446 			lws_mqtt_client_send_disconnect(nwsi);
447 		return -1;
448 	}
449 
450 	case LWS_CALLBACK_MQTT_UNSUBSCRIBE_TIMEOUT:
451 		if (wsi->mqtt->inside_unsubscribe) {
452 			lwsl_warn("%s: %s: Unsubscribe timout.\n", __func__,
453 				  lws_ss_tag(h));
454 			return -1;
455 		}
456 		break;
457 
458 	default:
459 		break;
460 	}
461 
462 	return lws_callback_http_dummy(wsi, reason, user, in, len);
463 }
464 
465 const struct lws_protocols protocol_secstream_mqtt = {
466 	"lws-secstream-mqtt",
467 	secstream_mqtt,
468 	0, 0, 0, NULL, 0
469 };
470 /*
471  * Munge connect info according to protocol-specific considerations... this
472  * usually means interpreting aux in a protocol-specific way and using the
473  * pieces at connection setup time, eg, http url pieces.
474  *
475  * len bytes of buf can be used for things with scope until after the actual
476  * connect.
477  *
478  * For ws, protocol aux is <url path>;<ws subprotocol name>
479  */
480 
481 enum {
482 	SSCMM_STRSUB_WILL_TOPIC,
483 	SSCMM_STRSUB_WILL_MESSAGE,
484 	SSCMM_STRSUB_SUBSCRIBE,
485 	SSCMM_STRSUB_TOPIC,
486 	SSCMM_STRSUB_BIRTH_TOPIC,
487 	SSCMM_STRSUB_BIRTH_MESSAGE
488 };
489 
490 static int
secstream_connect_munge_mqtt(lws_ss_handle_t * h,char * buf,size_t len,struct lws_client_connect_info * i,union lws_ss_contemp * ct)491 secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len,
492 			     struct lws_client_connect_info *i,
493 			     union lws_ss_contemp *ct)
494 {
495 	const char *sources[6] = {
496 		/* we're going to string-substitute these before use */
497 		h->policy->u.mqtt.will_topic,
498 		h->policy->u.mqtt.will_message,
499 		h->policy->u.mqtt.subscribe,
500 		h->policy->u.mqtt.topic,
501 		h->policy->u.mqtt.birth_topic,
502 		h->policy->u.mqtt.birth_message
503 	};
504 	size_t used_in, olen[6] = { 0, 0, 0, 0, 0, 0 }, tot = 0;
505 	lws_strexp_t exp;
506 	char *ps[6];
507 	uint8_t *p = NULL;
508 	int n = -1;
509 	size_t blen;
510 	lws_system_blob_t *b = NULL;
511 
512 	memset(&ct->ccp, 0, sizeof(ct->ccp));
513 	b = lws_system_get_blob(i->context,
514 				LWS_SYSBLOB_TYPE_MQTT_CLIENT_ID, 0);
515 
516 	/* If LWS_SYSBLOB_TYPE_MQTT_CLIENT_ID is set */
517 	if (b && (blen = lws_system_blob_get_size(b))) {
518 		if (blen > LWS_MQTT_MAX_CIDLEN) {
519 			lwsl_err("%s - Client ID too long.\n",
520 				 __func__);
521 			return -1;
522 		}
523 		p = (uint8_t *)lws_zalloc(blen+1, __func__);
524 		if (!p)
525 			return -1;
526 		n = lws_system_blob_get(b, p, &blen, 0);
527 		if (n) {
528 			ct->ccp.client_id = NULL;
529 		} else {
530 			ct->ccp.client_id = (const char *)p;
531 			lwsl_notice("%s - Client ID = %s\n",
532 				    __func__, ct->ccp.client_id);
533 		}
534 	} else {
535 		/* Default (Random) client ID */
536 		ct->ccp.client_id = NULL;
537 	}
538 
539 	b = lws_system_get_blob(i->context,
540 				LWS_SYSBLOB_TYPE_MQTT_USERNAME, 0);
541 
542 	/* If LWS_SYSBLOB_TYPE_MQTT_USERNAME is set */
543 	if (b && (blen = lws_system_blob_get_size(b))) {
544 		p = (uint8_t *)lws_zalloc(blen+1, __func__);
545 		if (!p)
546 			return -1;
547 		n = lws_system_blob_get(b, p, &blen, 0);
548 		if (n) {
549 			ct->ccp.username = NULL;
550 		} else {
551 			ct->ccp.username = (const char *)p;
552 			lwsl_notice("%s - Username ID = %s\n",
553 				    __func__, ct->ccp.username);
554 		}
555 	}
556 
557 	b = lws_system_get_blob(i->context,
558 				LWS_SYSBLOB_TYPE_MQTT_PASSWORD, 0);
559 
560 	/* If LWS_SYSBLOB_TYPE_MQTT_PASSWORD is set */
561 	if (b && (blen = lws_system_blob_get_size(b))) {
562 		p = (uint8_t *)lws_zalloc(blen+1, __func__);
563 		if (!p)
564 			return -1;
565 		n = lws_system_blob_get(b, p, &blen, 0);
566 		if (n) {
567 			ct->ccp.password = NULL;
568 		} else {
569 			ct->ccp.password = (const char *)p;
570 			lwsl_notice("%s - Password ID = %s\n",
571 				    __func__, ct->ccp.password);
572 		}
573 	}
574 
575 	ct->ccp.keep_alive		= h->policy->u.mqtt.keep_alive;
576 	ct->ccp.clean_start		= (h->policy->u.mqtt.clean_start & 1u);
577 	ct->ccp.will_param.qos		= h->policy->u.mqtt.will_qos;
578 	ct->ccp.will_param.retain	= h->policy->u.mqtt.will_retain;
579 	ct->ccp.birth_param.qos		= h->policy->u.mqtt.birth_qos;
580 	ct->ccp.birth_param.retain	= h->policy->u.mqtt.birth_retain;
581 	ct->ccp.aws_iot			= h->policy->u.mqtt.aws_iot;
582 	h->u.mqtt.topic_qos.qos		= h->policy->u.mqtt.qos;
583 
584 	/*
585 	 * We're going to string-substitute several of these parameters, which
586 	 * have unknown, possibly large size.  And, as their usage is deferred
587 	 * inside the asynchronous lifetime of the MQTT connection, they need
588 	 * to live on the heap.
589 	 *
590 	 * Notice these allocations at h->u.mqtt.heap_baggage belong to the
591 	 * underlying MQTT stream lifetime, not the logical SS lifetime, and
592 	 * are destroyed if present at connection error or close of the
593 	 * underlying connection.
594 	 *
595 	 *
596 	 * First, compute the length of each without producing strsubst output,
597 	 * and keep a running total.
598 	 */
599 
600 	for (n = 0; n < (int)LWS_ARRAY_SIZE(sources); n++) {
601 		if (!sources[n])
602 			continue;
603 
604 		lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata,
605 				NULL, (size_t)-1);
606 		if (lws_strexp_expand(&exp, sources[n], strlen(sources[n]),
607 				      &used_in, &olen[n]) != LSTRX_DONE) {
608 			lwsl_err("%s: failed to subsitute %s\n", __func__,
609 					sources[n]);
610 			return 1;
611 		}
612 		tot += olen[n] + 1;
613 	}
614 
615 	/*
616 	 * Then, allocate enough space on the heap for the total of the
617 	 * substituted results
618 	 */
619 
620 	h->u.mqtt.heap_baggage = lws_malloc(tot, __func__);
621 	if (!h->u.mqtt.heap_baggage)
622 		return 1;
623 
624 	/*
625 	 * Finally, issue the subsitutions one after the other into the single
626 	 * allocated result buffer and prepare pointers into them
627 	 */
628 
629 	p = h->u.mqtt.heap_baggage;
630 	for (n = 0; n < (int)LWS_ARRAY_SIZE(sources); n++) {
631 		lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata,
632 				(char *)p, (size_t)-1);
633 		if (!sources[n]) {
634 			ps[n] = NULL;
635 			continue;
636 		}
637 		ps[n] = (char *)p;
638 		if (lws_strexp_expand(&exp, sources[n], strlen(sources[n]),
639 				      &used_in, &olen[n]) != LSTRX_DONE)
640 			return 1;
641 
642 		p += olen[n] + 1;
643 	}
644 
645 	/*
646 	 * Point the guys who want the substituted content at the substituted
647 	 * strings
648 	 */
649 
650 	ct->ccp.will_param.topic	= ps[SSCMM_STRSUB_WILL_TOPIC];
651 	ct->ccp.will_param.message	= ps[SSCMM_STRSUB_WILL_MESSAGE];
652 	h->u.mqtt.subscribe_to		= ps[SSCMM_STRSUB_SUBSCRIBE];
653 	h->u.mqtt.subscribe_to_len	= olen[SSCMM_STRSUB_SUBSCRIBE];
654 	h->u.mqtt.topic_qos.name	= ps[SSCMM_STRSUB_TOPIC];
655 	ct->ccp.birth_param.topic	= ps[SSCMM_STRSUB_BIRTH_TOPIC];
656 	ct->ccp.birth_param.message	= ps[SSCMM_STRSUB_BIRTH_MESSAGE];
657 
658 	i->method = "MQTT";
659 	i->mqtt_cp = &ct->ccp;
660 
661 	i->alpn = "x-amzn-mqtt-ca";
662 
663 	/* share connections where possible */
664 	i->ssl_connection |= LCCSCF_PIPELINE;
665 
666 	return 0;
667 }
668 
669 const struct ss_pcols ss_pcol_mqtt = {
670 	"MQTT",
671 	"x-amzn-mqtt-ca", //"mqtt/3.1.1",
672 	&protocol_secstream_mqtt,
673 	secstream_connect_munge_mqtt,
674 	NULL, NULL
675 };
676