Implement a Chatroom Client
To connect to the chatroom, you'll utilize the MQTT
protocol. The details required for the connection should be gathered from the previous steps. Below is an example in TypeScript
demonstrating how to establish this connection:
import * as mqtt from "mqtt";
import * as qs from "querystring";
// Define interfaces for chatroom connection and message structures
interface ChatroomConfig {
connection: ChatroomConnect;
topic: Topic;
user: ChatUser;
}
interface ChatroomConnect {
token: string;
signature: string;
authorizer: string;
endpoint: string;
clientId: string;
}
interface Topic {
pub: string;
sub: string;
}
interface ChatroomClaims {
sub: string;
tenant_id: string;
chat_id: string;
type: string;
device_id: string;
roles: string[];
}
interface InteractionMessage {
id?: string;
user: ChatUser;
type: InteractionType;
text_message: TextMessage;
created_at?: string;
sent_at: string;
received_at?: string;
published_at?: string;
}
interface ChatUser {
id?: string;
device_id?: string;
custom_name: string;
is_admin?: boolean;
blocked?: boolean;
}
// Enum for defining interaction types in the chatroom
enum InteractionType {
INTERACTION_TYPE_TEXT = 1,
INTERACTION_TYPE_MUTE,
INTERACTION_TYPE_UNMUTE,
INTERACTION_TYPE_PIN_MESSAGE,
INTERACTION_TYPE_UNPIN_MESSAGE,
INTERACTION_TYPE_BLOCK_USER,
INTERACTION_TYPE_UNBLOCK_USER,
INTERACTION_TYPE_POLL_STARTED,
INTERACTION_TYPE_POLL_CLOSED,
INTERACTION_TYPE_POLL_ARCHIVED,
INTERACTION_TYPE_POLL_UPDATE,
INTERACTION_TYPE_POLL_USER_VOTE,
INTERACTION_TYPE_STICKER,
INTERACTION_TYPE_VIEWER_INFO_ENABLED,
INTERACTION_TYPE_VIEWER_INFO_DISABLED,
INTERACTION_TYPE_VIEWER_INFO_UPDATE,
}
interface TextMessage {
text: string;
filtered?: boolean;
}
// Function to create a new MQTT client for connecting to the chatroom
const newClient = (conn: ChatroomConnect): { connect: () => mqtt.MqttClient } => {
// Function to parse JWT claims from the provided token
const parseClaims = (): ChatroomClaims => {
const parts = conn.token.split(".");
const claims: ChatroomClaims = JSON.parse(
Buffer.from(parts[1], "base64url").toString()
);
return claims;
};
// Prepare query parameters for MQTT connection
const queryObj: any = {
"x-amz-customauthorizer-name": conn.authorizer,
"x-amz-customauthorizer-signature": conn.signature,
"x-token": conn.token,
};
const claims: ChatroomClaims = parseClaims();
// Return an object with a connect function to establish the MQTT connection
return {
connect: (): mqtt.MqttClient => {
return mqtt.connect({
hostname: conn.endpoint,
protocol: 'wss',
protocolVersion: 5,
port: 443,
username: `?${qs.stringify(queryObj)}`,
clientId: conn.clientId,
reconnectPeriod: 1000,
rejectUnauthorized: true,
});
},
};
};
// Main function to execute the chat client logic
const main = () => {
// Mock function to simulate receiving a chatroom config response
const mockChatroomConfigResp = (): ChatroomConfig => {
return JSON.parse(`{
"connection": {
"endpoint": "",
"authorizer": "",
"token": "",
"signature": "",
"client_id": ""
},
"topic": {
"pub": "",
"sub": ""
},
"user": {
"is_admin": true,
"customer_id": "",
"device_id": "",
"custom_name": ""
}
}`);
};
// Sample user definition
const user: ChatUser = {
custom_name: "user1",
};
// Get the response object
const resp = mockChatroomConfigResp();
// Function to simulate message publishing
const simulatePublish = () => {
let times = 0;
setInterval(() => {
let date = new Date();
let msg: InteractionMessage = {
user: user,
type: InteractionType.INTERACTION_TYPE_TEXT,
text_message: {
text: `[${times++}]`,
},
sent_at: date.toISOString()
};
// Publish messages to the chatroom
client.publish(resp.topic.pub, JSON.stringify(msg), { qos: 1 }, (err) => {
if (err) {
console.error(err)
}
})
}, 1000);
};
// Connect the client and set up event handlers
const client = newClient(resp.connection).connect();
client.on("connect", () => {
console.log("iot core connected.");
// Subscribe to the chatroom's topic
client.subscribe(resp.topic.sub, { qos: 0 });
// Start simulating message publishing
simulatePublish()
});
// Handle incoming messages
client.on("message", (topic, payload) => {
let messages: [InteractionMessage] = JSON.parse(payload.toString());
for (let m of messages) {
console.log(
`[${m.received_at}] @${m.user.custom_name}: ${m.text_message.text}`
);
}
});
// Handle connection errors
client.on("error", (err) => {
console.log(err);
});
// Handle connection closure
client.on("close", () => {
console.log("close");
});
// Handle reconnection attempts
client.on("reconnect", () => {
console.log("reconnecting...");
});
};
// Execute the main function
main();
The source code consists of several key components essential for the functionality of the chatroom client:
- MQTT Module: Utilized for handling MQTT client functionalities.
- Query String Module: Used for manipulating query strings, crucial in constructing connection parameters.
- Interfaces: Defines custom interfaces to structure the data for chatroom interactions and MQTT connectivity.
- JWT Token Parsing: Critical for authentication and authorization in MQTT communication.
Connect to Chatroom
- Import MQTT and Query String Modules: The code begins by importing the
mqtt
module for MQTT client functionality andqs
(querystring) for handling query strings. - Define Interfaces: Interfaces like
ChatroomConnect
,Topic
, andChatroomClaims
are defined to structure the data for chatroom connection and interaction. - newClient Function: This function creates a new MQTT client for the chatroom connection. It accepts a
ChatroomConnect
object containing the connection details.- Parse JWT Claims: Inside
newClient
, theparseClaims
function decodes the JWT token to extract chatroom claims. - Prepare MQTT Connection Parameters: The function sets up the connection parameters, including hostname (endpoint), protocol (
wss
for secure WebSocket), port (443), and query parameters from the token.
- Parse JWT Claims: Inside
- Connect Function: This function within
newClient
usesmqtt.connect
to establish the connection. It configures the client with the hostname, protocol, port, username (constructed from query parameters), client ID (based on device ID and subscriber ID), and reconnect strategy. - Main Function Execution: The
main
function is called, simulating a token response and initializing the MQTT client.- Client Connection and Event Handlers: The client connects to the MQTT broker, and event handlers for
connect
,message
,error
,close
, andreconnect
are set.
- Client Connection and Event Handlers: The client connects to the MQTT broker, and event handlers for
Send Message to Chatroom
- Simulate Message Publishing: Within the
main
function,simulatePublish
is defined to send messages periodically.- Message Creation: A message of type
InteractionMessage
is created, including the user, message type, text, and timestamps. - Publishing Messages: The message is serialized to JSON and published to the chatroom's publish topic (
resp.chat.pub
) using theclient.publish
method.
- Message Creation: A message of type
- Interval-based Publishing: The
simulatePublish
function usessetInterval
to send messages at regular intervals.
Receive Message from Chatroom
- Subscribe to Chatroom Topic: Upon successful connection (
client.on("connect")
), the client subscribes to the chatroom's subscribe topic (resp.chat.sub
). - Handle Incoming Messages: The
client.on("message")
event handler is triggered when a message is received on the subscribed topic.- Message Parsing: The received payload is parsed from a JSON string into an
InteractionMessage
array. - Logging Messages: Each message in the array is logged, displaying the received time, sender's custom name, and message text.
- Message Parsing: The received payload is parsed from a JSON string into an
- Message Event Handling: The chat client continuously listens for new messages on the subscribed topic and processes them as they arrive.
In summary, the code demonstrates a practical application of MQTT for real-time messaging. By following the structured approach outlined in this article, developers can create robust and efficient chat applications that leverage the power of MQTT for real-time, bidirectional communication.