123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258 |
- 'use strict';
- const net = require('net'),
- tls = require('tls'),
- EventParser = require('../entities/EventParser.js'),
- Message = require('js-message'),
- fs = require('fs'),
- Queue = require('js-queue');
- let Events = require('event-pubsub/es5');
- if(process.version[1]>4){
- Events = require('event-pubsub');
- }
- let eventParser = new EventParser();
- class Client extends Events{
- constructor(config,log){
- super();
- Object.assign(
- this,
- {
- Client : Client,
- config : config,
- queue : new Queue,
- socket : false,
- connect : connect,
- emit : emit,
- log : log,
- retriesRemaining:config.maxRetries||0,
- explicitlyDisconnected: false
- }
- );
- eventParser=new EventParser(this.config);
- }
- }
- function emit(type,data){
- this.log('dispatching event to ', this.id, this.path, ' : ', type, ',', data);
- let message=new Message;
- message.type=type;
- message.data=data;
- if(this.config.rawBuffer){
- message=new Buffer(type,this.config.encoding);
- }else{
- message=eventParser.format(message);
- }
- if(!this.config.sync){
- this.socket.write(message);
- return;
- }
- this.queue.add(
- syncEmit.bind(this,message)
- );
- }
- function syncEmit(message){
- this.log('dispatching event to ', this.id, this.path, ' : ', message);
- this.socket.write(message);
- }
- function connect(){
- //init client object for scope persistance especially inside of socket events.
- let client=this;
- client.log('requested connection to ', client.id, client.path);
- if(!this.path){
- client.log('\n\n######\nerror: ', client.id ,' client has not specified socket path it wishes to connect to.');
- return;
- }
- const options={};
- if(!client.port){
- client.log('Connecting client on Unix Socket :', client.path);
- options.path=client.path;
- if (process.platform ==='win32' && !client.path.startsWith('\\\\.\\pipe\\')){
- options.path = options.path.replace(/^\//, '');
- options.path = options.path.replace(/\//g, '-');
- options.path= `\\\\.\\pipe\\${options.path}`;
- }
- client.socket = net.connect(options);
- }else{
- options.host=client.path;
- options.port=client.port;
- if(client.config.interface.localAddress){
- options.localAddress=client.config.interface.localAddress;
- }
- if(client.config.interface.localPort){
- options.localPort=client.config.interface.localPort;
- }
- if(client.config.interface.family){
- options.family=client.config.interface.family;
- }
- if(client.config.interface.hints){
- options.hints=client.config.interface.hints;
- }
- if(client.config.interface.lookup){
- options.lookup=client.config.interface.lookup;
- }
- if(!client.config.tls){
- client.log('Connecting client via TCP to', options);
- client.socket = net.connect(options);
- }else{
- client.log('Connecting client via TLS to', client.path ,client.port,client.config.tls);
- if(client.config.tls.private){
- client.config.tls.key=fs.readFileSync(client.config.tls.private);
- }
- if(client.config.tls.public){
- client.config.tls.cert=fs.readFileSync(client.config.tls.public);
- }
- if(client.config.tls.trustedConnections){
- if(typeof client.config.tls.trustedConnections === 'string'){
- client.config.tls.trustedConnections=[client.config.tls.trustedConnections];
- }
- client.config.tls.ca=[];
- for(let i=0; i<client.config.tls.trustedConnections.length; i++){
- client.config.tls.ca.push(
- fs.readFileSync(client.config.tls.trustedConnections[i])
- );
- }
- }
- Object.assign(client.config.tls,options);
- client.socket = tls.connect(
- client.config.tls
- );
- }
- }
- client.socket.setEncoding(this.config.encoding);
- client.socket.on(
- 'error',
- function(err){
- client.log('\n\n######\nerror: ', err);
- client.publish('error', err);
- }
- );
- client.socket.on(
- 'connect',
- function connectionMade(){
- client.publish('connect');
- client.retriesRemaining=client.config.maxRetries;
- client.log('retrying reset');
- }
- );
- client.socket.on(
- 'close',
- function connectionClosed(){
- client.log('connection closed' ,client.id , client.path,
- client.retriesRemaining, 'tries remaining of', client.config.maxRetries
- );
- if(
- client.config.stopRetrying ||
- client.retriesRemaining<1 ||
- client.explicitlyDisconnected
- ){
- client.publish('disconnect');
- client.log(
- (client.config.id),
- 'exceeded connection rety amount of',
- ' or stopRetrying flag set.'
- );
- client.socket.destroy();
- client.publish('destroy');
- client=undefined;
- return;
- }
- setTimeout(
- function retryTimeout(){
- client.retriesRemaining--;
- client.connect();
- }.bind(null,client),
- client.config.retry
- );
- client.publish('disconnect');
- }
- );
- client.socket.on(
- 'data',
- function(data) {
- client.log('## received events ##');
- if(client.config.rawBuffer){
- client.publish(
- 'data',
- new Buffer(data,client.config.encoding)
- );
- if(!client.config.sync){
- return;
- }
- client.queue.next();
- return;
- }
- if(!this.ipcBuffer){
- this.ipcBuffer='';
- }
- data=(this.ipcBuffer+=data);
- if(data.slice(-1)!=eventParser.delimiter || data.indexOf(eventParser.delimiter) == -1){
- client.log('Messages are large, You may want to consider smaller messages.');
- return;
- }
- this.ipcBuffer='';
- const events = eventParser.parse(data);
- const eCount = events.length;
- for(let i=0; i<eCount; i++){
- let message=new Message;
- message.load(events[i]);
- client.log('detected event', message.type, message.data);
- client.publish(
- message.type,
- message.data
- );
- }
- if(!client.config.sync){
- return;
- }
- client.queue.next();
- }
- );
- }
- module.exports=Client;
|