123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403 |
- 'use strict';
- const net = require('net'),
- tls = require('tls'),
- fs = require('fs'),
- dgram = require('dgram'),
- EventParser = require('../entities/EventParser.js'),
- Message = require('js-message');
- let Events = require('event-pubsub/es5');
- if(process.version[1]>4){
- Events = require('event-pubsub');
- }
- let eventParser = new EventParser();
- class Server extends Events{
- constructor(path,config,log,port){
- super();
- Object.assign(
- this,
- {
- config : config,
- path : path,
- port : port,
- udp4 : false,
- udp6 : false,
- log : log,
- server : false,
- sockets : [],
- emit : emit,
- broadcast : broadcast
- }
- );
- eventParser=new EventParser(this.config);
- this.on(
- 'close',
- serverClosed.bind(this)
- );
- }
- onStart(socket){
- this.trigger(
- 'start',
- socket
- );
- }
- stop(){
- this.server.close();
- }
- start(){
- if(!this.path){
- this.log('Socket Server Path not specified, refusing to start');
- return;
- }
- if(this.config.unlink){
- fs.unlink(
- this.path,
- startServer.bind(this)
- );
- }else{
- startServer.bind(this)();
- }
- }
- }
- function emit(socket, type, data){
- this.log('dispatching event to socket', ' : ', type, data);
- let message=new Message;
- message.type=type;
- message.data=data;
- if(this.config.rawBuffer){
- this.log(this.config.encoding)
- message=new Buffer(type,this.config.encoding);
- }else{
- message=eventParser.format(message);
- }
- if(this.udp4 || this.udp6){
- if(!socket.address || !socket.port){
- this.log('Attempting to emit to a single UDP socket without supplying socket address or port. Redispatching event as broadcast to all connected sockets');
- this.broadcast(type,data);
- return;
- }
- this.server.write(
- message,
- socket
- );
- return;
- }
- socket.write(message);
- }
- function broadcast(type,data){
- this.log('broadcasting event to all known sockets listening to ', this.path,' : ', ((this.port)?this.port:''), type, data);
- let message=new Message;
- message.type=type;
- message.data=data;
- if(this.config.rawBuffer){
- message=new Buffer(type,this.config.encoding);
- }else{
- message=eventParser.format(message);
- }
- if(this.udp4 || this.udp6){
- for(let i=1, count=this.sockets.length; i<count; i++){
- this.server.write(message,this.sockets[i]);
- }
- }else{
- for(let i=0, count=this.sockets.length; i<count; i++){
- this.sockets[i].write(message);
- }
- }
- }
- function serverClosed(){
- for(let i=0, count=this.sockets.length; i<count; i++){
- let socket=this.sockets[i];
- let destroyedSocketId=false;
- if(socket){
- if(socket.readable){
- continue;
- }
- }
- if(socket.id){
- destroyedSocketId=socket.id;
- }
- this.log('socket disconnected',destroyedSocketId.toString());
- if(socket && socket.destroy){
- socket.destroy();
- }
- this.sockets.splice(i,1);
- this.publish('socket.disconnected', socket, destroyedSocketId);
- return;
- }
- }
- function gotData(socket,data,UDPSocket){
- let sock=((this.udp4 || this.udp6)? UDPSocket : socket);
- if(this.config.rawBuffer){
- data=new Buffer(data,this.config.encoding);
- this.publish(
- 'data',
- data,
- sock
- );
- return;
- }
- if(!this.ipcBuffer){
- this.ipcBuffer='';
- }
- data=(this.ipcBuffer+=data);
- if(data.slice(-1)!=eventParser.delimiter || data.indexOf(eventParser.delimiter) == -1){
- this.log('Messages are large, You may want to consider smaller messages.');
- return;
- }
- this.ipcBuffer='';
- data=eventParser.parse(data);
- while(data.length>0){
- let message=new Message;
- message.load(data.shift());
- // Only set the sock id if it is specified.
- if (message.data && message.data.id){
- sock.id=message.data.id;
- }
- this.log('received event of : ',message.type,message.data);
- this.publish(
- message.type,
- message.data,
- sock
- );
- }
- }
- function socketClosed(socket){
- this.publish(
- 'close',
- socket
- );
- }
- function serverCreated(socket) {
- this.sockets.push(socket);
- if(socket.setEncoding){
- socket.setEncoding(this.config.encoding);
- }
- this.log('## socket connection to server detected ##');
- socket.on(
- 'close',
- socketClosed.bind(this)
- );
- socket.on(
- 'error',
- function(err){
- this.log('server socket error',err);
- this.publish('error',err);
- }.bind(this)
- );
- socket.on(
- 'data',
- gotData.bind(this,socket)
- );
- socket.on(
- 'message',
- function(msg,rinfo) {
- if (!rinfo){
- return;
- }
- this.log('Received UDP message from ', rinfo.address, rinfo.port);
- let data;
- if(this.config.rawSocket){
- data=new Buffer(msg,this.config.encoding);
- }else{
- data=msg.toString();
- }
- socket.emit('data',data,rinfo);
- }.bind(this)
- );
- this.publish(
- 'connect',
- socket
- );
- if(this.config.rawBuffer){
- return;
- }
- }
- function startServer() {
- this.log(
- 'starting server on ',this.path,
- ((this.port)?`:${this.port}`:'')
- );
- if(!this.udp4 && !this.udp6){
- this.log('starting TLS server',this.config.tls);
- if(!this.config.tls){
- this.server=net.createServer(
- serverCreated.bind(this)
- );
- }else{
- startTLSServer.bind(this)();
- }
- }else{
- this.server=dgram.createSocket(
- ((this.udp4)? 'udp4':'udp6')
- );
- this.server.write=UDPWrite.bind(this);
- this.server.on(
- 'listening',
- function UDPServerStarted() {
- serverCreated.bind(this)(this.server);
- }.bind(this)
- );
- }
- this.server.on(
- 'error',
- function(err){
- this.log('server error',err);
- this.publish(
- 'error',
- err
- );
- }.bind(this)
- );
- this.server.maxConnections=this.config.maxConnections;
- if(!this.port){
- this.log('starting server as', 'Unix || Windows Socket');
- if (process.platform ==='win32'){
- this.path = this.path.replace(/^\//, '');
- this.path = this.path.replace(/\//g, '-');
- this.path= `\\\\.\\pipe\\${this.path}`;
- }
- this.server.listen(
- this.path,
- this.onStart.bind(this)
- );
- return;
- }
- if(!this.udp4 && !this.udp6){
- this.log('starting server as', (this.config.tls?'TLS':'TCP'));
- this.server.listen(
- this.port,
- this.path,
- this.onStart.bind(this)
- );
- return;
- }
- this.log('starting server as',((this.udp4)? 'udp4':'udp6'));
- this.server.bind(
- this.port,
- this.path
- );
- this.onStart(
- {
- address : this.path,
- port : this.port
- }
- );
- }
- function startTLSServer(){
- this.log('starting TLS server',this.config.tls);
- if(this.config.tls.private){
- this.config.tls.key=fs.readFileSync(this.config.tls.private);
- }else{
- this.config.tls.key=fs.readFileSync(`${__dirname}/../local-node-ipc-certs/private/server.key`);
- }
- if(this.config.tls.public){
- this.config.tls.cert=fs.readFileSync(this.config.tls.public);
- }else{
- this.config.tls.cert=fs.readFileSync(`${__dirname}/../local-node-ipc-certs/server.pub`);
- }
- if(this.config.tls.dhparam){
- this.config.tls.dhparam=fs.readFileSync(this.config.tls.dhparam);
- }
- if(this.config.tls.trustedConnections){
- if(typeof this.config.tls.trustedConnections === 'string'){
- this.config.tls.trustedConnections=[this.config.tls.trustedConnections];
- }
- this.config.tls.ca=[];
- for(let i=0; i<this.config.tls.trustedConnections.length; i++){
- this.config.tls.ca.push(
- fs.readFileSync(this.config.tls.trustedConnections[i])
- );
- }
- }
- this.server=tls.createServer(
- this.config.tls,
- serverCreated.bind(this)
- );
- }
- function UDPWrite(message,socket){
- let data=new Buffer(message, this.config.encoding);
- this.server.send(
- data,
- 0,
- data.length,
- socket.port,
- socket.address,
- function(err, bytes) {
- if(err){
- this.log('error writing data to socket',err);
- this.publish(
- 'error',
- function(err){
- this.publish('error',err);
- }
- );
- }
- }
- );
- }
- module.exports=Server;
|