js/Mobilizing/net/PubSub.js
import EventEmitter from "../core/util/EventEmitter";
/**
* Fired when the client successfully connects to the server
* @event connect
*/
const EVT_CONNECT = "connect";
/**
* Fired when the client disconnects from the server
* @event disconnect
*/
const EVT_DISCONNECT = "disconnect";
/**
* Fired when a connection error occurs
* @event error
*/
const EVT_ERROR = "error";
/**
* Fired when a connection times out
* @event connect_timeout
*/
const EVT_CONNECT_TIMEOUT = "connect_timeout";
/**
* PubSub is a publish-subscribe messaging system based on socket.io
* It allows simple socket communication between different clients
* An runing instance of MobilizingServer is required to make this work
*
* @example
* //TODO
*/
export default class PubSub {
/**
* @param {Object} params Parameters object, given by the constructor.
* @param {String} [params.url=""] The URL of the server on which the MobilizingServer instance is running
* @param {Boolean} [params.autoConnect=true] Whether the connection should be automatically opened
*/
constructor({
url = "",
autoConnect = true
} = {}) {
this.url = url;
this.autoConnect = autoConnect;
this.id = null;
this.events = new EventEmitter({ "scope": this });
if (this.autoConnect) {
this.open(this.url);
}
this.subscriptions = {};
}
/**
* Open the socket connection
* @param {String} url URL of the server on which the MobilizingServer instance is running
*/
open(url) {
if (!("io" in window)) {
const script = document.createElement("script");
script.src = `${url}/socket.io/socket.io.js`;
script.onload = () => {
this.open(url);
};
script.onerror = (error) => {
this.events.trigger(EVT_ERROR, error);
};
document.head.appendChild(script);
}
else {
this.socket = window.io.connect(url);
this.socket.on("connect", () => {
this.id = this.socket.io.engine.id;
this.socket.on("disconnect", () => {
this.events.trigger(EVT_DISCONNECT, this.getID());
});
this.events.trigger(EVT_CONNECT, this.getID());
});
this.socket.on("error", (error) => {
this.events.trigger(EVT_ERROR, error);
});
this.socket.on("connect_timeout", (timeout) => {
this.events.trigger(EVT_CONNECT_TIMEOUT, timeout);
});
}
}
/**
* Close the socket connection
*/
close() {
if (this.socket) {
this.socket.disconnect();
delete this.socket;
}
}
/**
* Publish a message to a specific channel
* @param {String} channel The channel on which to publish the message
* @param {Mixed} message The message to publish
*/
publish(channel, message) {
if (this.socket) {
this.socket.emit("publish", channel, message);
}
}
/**
* Subscribe for messages from a specific channel
* @param {String} channel The channel on which to subscribe
* @param {Function} callback The callback to call when a message is received on the channel
*/
subscribe(channel, callback) {
if (typeof this.subscriptions[channel] === "undefined") {
this.subscriptions[channel] = [];
}
if (this.socket) {
this.socket.emit("subscribe", channel);
this.socket.on(channel, callback);
this.subscriptions[channel].push(callback);
}
}
/**
* Unsubscribe for messages from a specific channel
* @param {String} channel The channel from which to unsubscribe
* @param {Function} [callback] The callback to remove, all callbacks will be removed if unspecified
*/
unsubscribe(channel, callback) {
if (this.socket) {
if (typeof callback === "undefined") {
this.socket.emit("unsubscribe", channel);
delete this.subscriptions[channel];
}
else if (this.subscriptions[channel] instanceof Array) {
this.subscriptions[channel] = this.subscriptions[channel].filter((item) => {
return item !== callback;
});
if (this.subscriptions[channel].length === 0) {
this.unsubscribe(channel);
return;
}
}
this.socket.off(channel, callback);
}
}
/**
* Get the socket id
*/
getID() {
return this.id;
}
}