跳到主要内容

Implement a Chatroom Client

To connect to the chatroom, you'll utilize the MQTT protocol. The details required for the connection should be gathered from the previous steps. Below is an example in TypeScript demonstrating how to establish this connection:

import * as mqtt from "mqtt";
import * as qs from "querystring";

// Define interfaces for chatroom connection and message structures
interface ChatroomConfig {
connection: ChatroomConnect;
topic: Topic;
user: ChatUser;
}

interface ChatroomConnect {
token: string;
signature: string;
authorizer: string;
endpoint: string;
clientId: string;
}

interface Topic {
pub: string;
sub: string;
}

interface ChatroomClaims {
sub: string;
tenant_id: string;
chat_id: string;
type: string;
device_id: string;
roles: string[];
}

interface InteractionMessage {
id?: string;
user: ChatUser;
type: InteractionType;
text_message: TextMessage;
created_at?: string;
sent_at: string;
received_at?: string;
published_at?: string;
}

interface ChatUser {
id?: string;
device_id?: string;
custom_name: string;
is_admin?: boolean;
blocked?: boolean;
}

// Enum for defining interaction types in the chatroom
enum InteractionType {
INTERACTION_TYPE_TEXT = 1,
INTERACTION_TYPE_MUTE,
INTERACTION_TYPE_UNMUTE,
INTERACTION_TYPE_PIN_MESSAGE,
INTERACTION_TYPE_UNPIN_MESSAGE,
INTERACTION_TYPE_BLOCK_USER,
INTERACTION_TYPE_UNBLOCK_USER,
INTERACTION_TYPE_POLL_STARTED,
INTERACTION_TYPE_POLL_CLOSED,
INTERACTION_TYPE_POLL_ARCHIVED,
INTERACTION_TYPE_POLL_UPDATE,
INTERACTION_TYPE_POLL_USER_VOTE,
INTERACTION_TYPE_STICKER,
INTERACTION_TYPE_VIEWER_INFO_ENABLED,
INTERACTION_TYPE_VIEWER_INFO_DISABLED,
INTERACTION_TYPE_VIEWER_INFO_UPDATE,
}

interface TextMessage {
text: string;
filtered?: boolean;
}

// Function to create a new MQTT client for connecting to the chatroom
const newClient = (conn: ChatroomConnect): { connect: () => mqtt.MqttClient } => {
// Function to parse JWT claims from the provided token
const parseClaims = (): ChatroomClaims => {
const parts = conn.token.split(".");
const claims: ChatroomClaims = JSON.parse(
Buffer.from(parts[1], "base64url").toString()
);

return claims;
};

// Prepare query parameters for MQTT connection
const queryObj: any = {
"x-amz-customauthorizer-name": conn.authorizer,
"x-amz-customauthorizer-signature": conn.signature,
"x-token": conn.token,
};

const claims: ChatroomClaims = parseClaims();

// Return an object with a connect function to establish the MQTT connection
return {
connect: (): mqtt.MqttClient => {
return mqtt.connect({
hostname: conn.endpoint,
protocol: 'wss',
protocolVersion: 5,
port: 443,
username: `?${qs.stringify(queryObj)}`,
clientId: conn.clientId,
reconnectPeriod: 1000,
rejectUnauthorized: true,
});
},
};
};

// Main function to execute the chat client logic
const main = () => {
// Mock function to simulate receiving a chatroom config response
const mockChatroomConfigResp = (): ChatroomConfig => {
return JSON.parse(`{
"connection": {
"endpoint": "",
"authorizer": "",
"token": "",
"signature": "",
"client_id": ""
},
"topic": {
"pub": "",
"sub": ""
},
"user": {
"is_admin": true,
"customer_id": "",
"device_id": "",
"custom_name": ""
}
}`);
};

// Sample user definition
const user: ChatUser = {
custom_name: "user1",
};

// Get the response object
const resp = mockChatroomConfigResp();

// Function to simulate message publishing
const simulatePublish = () => {
let times = 0;

setInterval(() => {
let date = new Date();

let msg: InteractionMessage = {
user: user,
type: InteractionType.INTERACTION_TYPE_TEXT,
text_message: {
text: `[${times++}]`,
},
sent_at: date.toISOString()
};

// Publish messages to the chatroom
client.publish(resp.topic.pub, JSON.stringify(msg), { qos: 1 }, (err) => {
if (err) {
console.error(err)
}
})
}, 1000);
};

// Connect the client and set up event handlers
const client = newClient(resp.connection).connect();

client.on("connect", () => {
console.log("iot core connected.");

// Subscribe to the chatroom's topic
client.subscribe(resp.topic.sub, { qos: 0 });

// Start simulating message publishing
simulatePublish()
});

// Handle incoming messages
client.on("message", (topic, payload) => {
let messages: [InteractionMessage] = JSON.parse(payload.toString());

for (let m of messages) {
console.log(
`[${m.received_at}] @${m.user.custom_name}: ${m.text_message.text}`
);
}
});

// Handle connection errors
client.on("error", (err) => {
console.log(err);
});

// Handle connection closure
client.on("close", () => {
console.log("close");
});

// Handle reconnection attempts
client.on("reconnect", () => {
console.log("reconnecting...");
});
};

// Execute the main function
main();

The source code consists of several key components essential for the functionality of the chatroom client:

  • MQTT Module: Utilized for handling MQTT client functionalities.
  • Query String Module: Used for manipulating query strings, crucial in constructing connection parameters.
  • Interfaces: Defines custom interfaces to structure the data for chatroom interactions and MQTT connectivity.
  • JWT Token Parsing: Critical for authentication and authorization in MQTT communication.

Connect to Chatroom

  1. Import MQTT and Query String Modules: The code begins by importing the mqtt module for MQTT client functionality and qs (querystring) for handling query strings.
  2. Define Interfaces: Interfaces like ChatroomConnect, Topic, and ChatroomClaims are defined to structure the data for chatroom connection and interaction.
  3. newClient Function: This function creates a new MQTT client for the chatroom connection. It accepts a ChatroomConnect object containing the connection details.
    • Parse JWT Claims: Inside newClient, the parseClaims function decodes the JWT token to extract chatroom claims.
    • Prepare MQTT Connection Parameters: The function sets up the connection parameters, including hostname (endpoint), protocol (wss for secure WebSocket), port (443), and query parameters from the token.
  4. Connect Function: This function within newClient uses mqtt.connect to establish the connection. It configures the client with the hostname, protocol, port, username (constructed from query parameters), client ID (based on device ID and subscriber ID), and reconnect strategy.
  5. Main Function Execution: The main function is called, simulating a token response and initializing the MQTT client.
    • Client Connection and Event Handlers: The client connects to the MQTT broker, and event handlers for connect, message, error, close, and reconnect are set.

Send Message to Chatroom

  1. Simulate Message Publishing: Within the main function, simulatePublish is defined to send messages periodically.
    • Message Creation: A message of type InteractionMessage is created, including the user, message type, text, and timestamps.
    • Publishing Messages: The message is serialized to JSON and published to the chatroom's publish topic (resp.chat.pub) using the client.publish method.
  2. Interval-based Publishing: The simulatePublish function uses setInterval to send messages at regular intervals.

Receive Message from Chatroom

  1. Subscribe to Chatroom Topic: Upon successful connection (client.on("connect")), the client subscribes to the chatroom's subscribe topic (resp.chat.sub).
  2. Handle Incoming Messages: The client.on("message") event handler is triggered when a message is received on the subscribed topic.
    • Message Parsing: The received payload is parsed from a JSON string into an InteractionMessage array.
    • Logging Messages: Each message in the array is logged, displaying the received time, sender's custom name, and message text.
  3. Message Event Handling: The chat client continuously listens for new messages on the subscribed topic and processes them as they arrive.

In summary, the code demonstrates a practical application of MQTT for real-time messaging. By following the structured approach outlined in this article, developers can create robust and efficient chat applications that leverage the power of MQTT for real-time, bidirectional communication.