!function FarmModuleWrapper() {
let core; let mod; let o = {}; const pipelines = function() {};
/**
* Blackrock Farm Module
*
* @public
* @class Server.Modules.Farm
* @augments Server.Modules.Core.Module
* @param {Server.Modules.Core} coreObj - The Parent Core Object
* @return {Server.Modules.Farm} module - The Farm Module
*
* @description This is the Farm Module of the Blackrock Application Server.
* It provides distributed / cluster compute capabilities for the server,
* allowing you to run multiple disparate instances of the server (that can
* be geographically disbursed) in unison. It uses the Scuttlebutt Gossip
* Protocol to share state across all nodes in the cluster.
* PLEASE NOTE: This interface is undergoing development and
* is not yet functional.
*
* @example
* Tbc...
*
* @author Darren Smith
* @copyright Copyright (c) 2021 Darren Smith
* @license Licensed under the LGPL license.
*/
module.exports = function FarmModule(coreObj) {
if (mod) return mod;
core = coreObj; mod = new core.Mod('Farm'); o.log = core.module('logger').log;
o.farmServers = {}; o.utils = {}; o.jobServer = false;
o.log('debug', 'Blackrock Farm > Initialising...', {module: mod.name}, 'MODULE_INIT');
process.nextTick(function() {
pipelines.init();
});
return mod;
};
/**
* (Internal > Pipeline [1]) Init Pipeline
*
* @private
* @memberof Server.Modules.Generator
* @function registerWithCLI
* @ignore
*
* @description
* Tbc...
*
* @example
* Tbc...
*/
pipelines.runInitPipeline = function FarmInitPipeline() {
// noinspection JSUnresolvedFunction
core.lib.rxPipeline({}).pipe(
// Fires once on server initialisation:
pipelines.init.loadScuttlebutt,
pipelines.init.createModelAndServer,
pipelines.init.persistToDisk,
pipelines.init.setupUpdateListener,
pipelines.init.connectToSeeds,
pipelines.init.setupGetAndSetMethods,
pipelines.init.setupIsJobServer,
pipelines.init.setupEventEmitter,
pipelines.init.setupUpdateRouter,
pipelines.init.updateServerStatus,
pipelines.init.inactivateStaleServers,
pipelines.init.toggleLocalAsJobServer,
pipelines.init.checkAndVoteOnJobServerRoles
).subscribe();
};
/**
* (Internal > Stream Methods [1]) Load Scuttlebutt
*
* @private
* @memberof Server.Modules.Farm
* @function loadScuttlebutt
* @ignore
* @param {observable} source - Source Observable
* @return {observable} destination - Destination Observable
*
* @description
* Tbc...
*
* @example
* Tbc...
* */
pipelines.init.loadScuttlebutt = function FarmIPLLoadScuttlebutt(source) {
// noinspection JSUnresolvedFunction
return core.lib.rxOperator(function FarmIPLLoadScuttlebuttOp(observer, evt) {
evt.lib = {
sb: {
Model: require('./_support/scuttlebutt/model'),
Events: require('./_support/scuttlebutt/events'),
Security: require('./_support/scuttlebutt/security'),
ScuttleBucket: require('./_support/scuttlebucket'),
},
net: require('net'),
};
o.log('debug', 'Farm > [1] Loaded Scuttlebutt Libraries',
{module: mod.name}, 'FARM_LOADED_SCUTTLEBUTT');
observer.next(evt);
}, source);
};
/**
* (Internal > Stream Methods [2]) Create Model And Server
*
* @private
* @memberof Server.Modules.Farm
* @function createModelAndServer
* @ignore
* @param {observable} source - Source Observable
* @return {observable} destination - Destination Observable
*
* @description
* Tbc...
*
* @example
* Tbc...
*/
pipelines.init.createModelAndServer = function FarmIPLCreateModel(source) {
// noinspection JSUnresolvedFunction
return core.lib.rxOperator(function FarmIPLCreateModelOp(observer, evt) {
o.log('debug', 'Blackrock Farm > [2] Attempting to create model and start server',
{module: mod.name}, 'FARM_SERVER_STARTING');
let farm;
// noinspection JSUnresolvedVariable
if (core.cfg().farm) {
// noinspection JSUnresolvedVariable
farm = core.cfg().farm;
} else farm = {};
let port;
if (farm.server && farm.server.port) port = farm.server.port;
else port = 8000;
o.utils.isPortTaken(port, function FarmIPLCreateModelPortTakenCb(err, result) {
if (result !== false) {
evt.serverNotStarted = true;
o.log('error',
'Farm > Cannot start Scuttlebutt as the defined port (' + port + ') is already in use',
{module: mod.name}, 'FARM_SERVER_PORT_IN_USE');
observer.next(evt);
return;
}
const sl = evt.lib.sb;
const create = function FarmIPLCreateModelCreateScuttleBucket() {
return new sl.ScuttleBucket()
.add('model', new sl.Model())
.add('events', new sl.Events('evts'));
};
o.scuttleBucketInstance = create();
evt.lib.net.createServer(function FarmIPLCreateModelCreateServerCb(stream) {
const ms = o.scuttleBucketInstance.createStream();
stream.pipe(ms).pipe(stream);
ms.on('error', function FarmIPLCreateModelOnMSErr() {
stream.destroy();
});
stream.on('error', function FarmIPLCreateModelOnStreamErr() {
ms.destroy();
});
}).listen(port, function() {
o.log('debug',
'Farm > Created New Scuttlebutt Model + TCP Server Listening On Port ' + port,
{module: mod.name}, 'FARM_SERVER_STARTED');
});
o.serverModel = o.scuttleBucketInstance.get('model');
o.serverEmitter = o.scuttleBucketInstance.get('events');
observer.next(evt);
});
}, source);
};
/**
* (Internal > Stream Methods [3]) Persist To Disk (NOT WORKING)
*
* @private
* @memberof Server.Modules.Farm
* @function persistToDisk
* @ignore
* @param {observable} source - Source Observable
* @return {observable} destination - Destination Observable
*
* @description
* Tbc...
*
* @example
* Tbc...
*/
pipelines.init.persistToDisk = function FarmIPLPersistToDisk(source) {
// noinspection JSUnresolvedFunction
return core.lib.rxOperator(function FarmIPLPersistToDiskOp(observer, evt) {
o.log('debug', 'Farm > [3] Setting Up Disk Persistence...',
{module: mod.name}, 'FARM_INIT_DISK_PERS');
let farm;
// noinspection JSUnresolvedVariable
if (core.cfg().farm) {
// noinspection JSUnresolvedVariable
farm = core.cfg().farm;
} else farm = {};
let cache;
if (farm.server && farm.server.cache) cache = farm.server.cache;
else cache = null;
if (cache) {
const file = core.fetchBasePath('cache') + '/' + cache;
const fs = require('fs');
if (!fs.existsSync(file)) fs.closeSync(fs.openSync(file, 'w'));
fs.createReadStream(file).pipe(o.scuttleBucketInstance.createWriteStream());
o.scuttleBucketInstance.on('sync', function FarmIPLPersistToDiskSyncCallback() {
o.scuttleBucketInstance.createReadStream().pipe(fs.createWriteStream(file));
});
}
observer.next(evt);
}, source);
};
/**
* (Internal > Stream Methods [4]) Setup Update Listener
*
* @private
* @memberof Server.Modules.Farm
* @function setupUpdateListener
* @ignore
* @param {observable} source - Source Observable
* @return {observable} destination - Destination Observable
*
* @description
* Tbc...
*
* @example
* Tbc...
*/
pipelines.init.setupUpdateListener = function FarmSetupUpdateListener(source) {
// noinspection JSUnresolvedFunction
return core.lib.rxOperator(function FarmSetupUpdateListenerOp(observer, evt) {
o.log('debug', 'Farm > [4] Setting Up Update Listener...',
{module: mod.name}, 'FARM_INIT_UPDATE_LISTENER');
/**
* Update Listener
*
* @public
* @memberof Server.Modules.Farm
* @function updateListener
* @param {function} fn - Callback For Listener
* @return {*} result - Lookup Value
*
* @description
* Tbc...
*
* @example
* Tbc...
*/
mod.updateListener = function FarmUpdateListener(fn) {
return o.serverModel.on('update', fn);
};
observer.next(evt);
}, source);
};
/**
* (Internal > Stream Methods [5]) Connect To Seed
*
* @private
* @memberof Server.Modules.Farm
* @function connectToSeeds
* @ignore
* @param {observable} source - Source Observable
* @return {observable} destination - Destination Observable
*
* @description
* Tbc...
*
* @example
* Tbc...
*/
pipelines.init.connectToSeeds = function FarmIPLConnectToSeeds(source) {
// noinspection JSUnresolvedFunction
return core.lib.rxOperator(function FarmIPLConnectToSeedsOp(observer, evt) {
o.log('debug', 'Farm > [5] Connecting To Seed Server...',
{module: mod.name}, 'FARM_CONNECTING_TO_SEED_SERVER');
const connectToSeed = function FarmModuleConnectToSeed(host, port) {
const stream = evt.lib.net.connect(port);
const ms = o.scuttleBucketInstance.createStream();
stream.pipe(ms).pipe(stream);
};
if (evt.serverNotStarted) return evt;
let farm;
// noinspection JSUnresolvedVariable
if (core.cfg().farm) {
// noinspection JSUnresolvedVariable
farm = core.cfg().farm;
} else farm = {};
// noinspection JSUnresolvedVariable
if (farm.seeds) {
for (let i = 0; i < farm.seeds.length; i++) {
const host = farm.seeds[i].split(':')[0];
const port = farm.seeds[i].split(':')[1];
connectToSeed(host, port);
}
}
observer.next(evt);
}, source);
};
/**
* (Internal > Stream Methods [6]) Setup "Get From Store" & "Set Against Store" Methods
*
* @private
* @memberof Server.Modules.Farm
* @function setupGetAndSetMethods
* @ignore
* @param {observable} source - Source Observable
* @return {observable} destination - Destination Observable
*
* @description
* Tbc...
*
* @example
* Tbc...
*/
pipelines.init.setupGetAndSetMethods = function FarmIPLSetupGetAndSetMethods(source) {
// noinspection JSUnresolvedFunction
return core.lib.rxOperator(function FarmIPLSetupGetAndSetMethodsOp(observer, evt) {
o.log('debug', 'Farm > [6] Setting Up Model Get & Set Methods...',
{module: mod.name}, 'FARM_SETUP_MODEL_GET_SET');
/**
* Get Method
*
* @public
* @memberof Server.Modules.Farm
* @function get
* @param {string} key - Lookup Key
* @return {*} result - Lookup Value
*
* @description
* Tbc...
*
* @example
* Tbc...
*/
mod.get = function FarmGetDataValue(key) {
return o.serverModel.get(key);
};
/**
* Set Method
*
* @public
* @memberof Server.Modules.Farm
* @function set
* @param {string} key - Lookup Key
* @param {*} value - Value to Set
* @return {*} result - Result
*
* @description
* Tbc...
*
* @example
* Tbc...
*/
mod.set = function FarmSetDataValue(key, value) {
return o.serverModel.set(key, value);
};
observer.next(evt);
}, source);
};
/**
* (Internal > Stream Methods [7]) Setup the "isJobServer()" Method
*
* @private
* @memberof Server.Modules.Farm
* @function setupIsJobServer
* @ignore
* @param {observable} source - Source Observable
* @return {observable} destination - Destination Observable
*
* @description
* Tbc...
*
* @example
* Tbc...
*/
pipelines.init.setupIsJobServer = function FarmIPLSetupIsJobServer(source) {
// noinspection JSUnresolvedFunction
return core.lib.rxOperator(function FarmIPLSetupIsJobServerOp(observer, evt) {
o.log('debug', 'Farm > [7] Setting Up \'isJobServer\' Method...',
{module: mod.name}, 'FARM_SETUP_IS_JOB_SERVER_METHOD');
/**
* Is Job Server?
*
* @public
* @memberof Server.Modules.Farm
* @function isJobServer
* @return {boolean} result - Is Job Server Result (True | False)
*
* @description
* Tbc...
*
* @example
* Tbc...
*/
mod.isJobServer = function FarmIsJobServer() {
return o.jobServer;
};
observer.next(evt);
}, source);
};
/**
* (Internal > Stream Methods [8]) Setup Distributed Event Emitter
*
* @private
* @memberof Server.Modules.Farm
* @function setupEventEmitter
* @ignore
* @param {observable} source - Source Observable
* @return {observable} destination - Destination Observable
*
* @description
* Tbc...
*
* @example
* Tbc...
*/
pipelines.init.setupEventEmitter = function FarmIPLSetupEventEmitter(source) {
// noinspection JSUnresolvedFunction
return core.lib.rxOperator(function FarmIPLSetupEventEmitterOp(observer, evt) {
o.log('debug', 'Farm > [8] Setting Up Event Emitter...',
{module: mod.name}, 'FARM_SETUP_EMITTER');
mod.events = {
emit: function FarmEventEmitterEmit(event, data) {
return o.serverEmitter.emit(event, data);
},
on: function FarmEventEmitterOn(event, listener) {
return o.serverEmitter.on(event, listener);
},
history: function FarmEventEmitterHistory(filter) {
return o.serverEmitter.history(filter);
},
};
observer.next(evt);
}, source);
};
/**
* (Internal > Stream Methods [9]) Setup Update Router
*
* @private
* @memberof Server.Modules.Farm
* @function setupUpdateRouter
* @ignore
* @param {observable} source - Source Observable
* @return {observable} destination - Destination Observable
*
* @description
* Tbc...
*
* @example
* Tbc...
*/
pipelines.init.setupUpdateRouter = function FarmIPLSetupUpdateRouter(source) {
// noinspection JSUnresolvedFunction
return core.lib.rxOperator(function FarmIPLSetupUpdateRouterOp(observer, evt) {
o.log('debug', 'Farm > [9] Setting Up The Update Router...',
{module: mod.name}, 'FARM_SETUP_UPDATE_ROUTER');
o.serverModel.on('update', function FarmIPLSetupUpdateRouterOnUpdate(f1) {
let key = f1[0]; let val = f1[1];
if (key.startsWith('servers')) {
key = key.split('[');
const serverUri = key[1].slice(0, -1);
if (core.module('utilities').isJSON(val)) {
val = JSON.parse(val);
o.farmServers[serverUri] = val;
}
}
});
observer.next(evt);
}, source);
};
/**
* (Internal > Stream Methods [10]) Setup Job to Update Server Status Every 2 Seconds
*
* @private
* @memberof Server.Modules.Farm
* @function updateServerStatus
* @ignore
* @param {observable} source - Source Observable
* @return {observable} destination - Destination Observable
*
* @description
* This method sets up a new job to fetch the latest heartbeat from the Logger module
* AND to count the number of servers in the farm, and then to update the farm-wide property
* for this server with the latest information from these sources. This job runs every 2 seconds.
*
* @example
* Tbc...
*/
pipelines.init.updateServerStatus = function FarmIPLUpdateServerStatus(source) {
// noinspection JSUnresolvedFunction
return core.lib.rxOperator(function FarmIPLUpdateServerStatusOp(observer, evt) {
o.log('debug',
'Farm > [10] Setting Up Job to Update Server Status With Latest Heartbeat...',
{module: mod.name}, 'FARM_SETUP_HEARTBEAT_JOB');
// noinspection JSUnresolvedVariable
const dayjs = core.lib.dayjs;
setInterval(function FarmUpdateServerStatusInt() {
const latestHeartbeat = core.module('logger').getLatestHeartbeat();
let serverCount = 0;
for (const key in o.farmServers) {
// noinspection JSUnfilteredForInLoop
if (o.farmServers[key].status === 'active') serverCount++;
}
latestHeartbeat.peerCount = serverCount;
core.module('logger').updateLatestHeartbeat('peerCount', latestHeartbeat.peerCount);
const val = JSON.stringify({
status: 'active',
lastUpdated: dayjs().format(),
heartbeat: latestHeartbeat,
});
// noinspection JSUnresolvedVariable
mod.set('servers[127.0.0.1:' + core.cfg().farm.server.port + ']', val);
}, 2000);
observer.next(evt);
}, source);
};
/**
* (Internal > Stream Methods [11]) Setup Job to Inactivate Stale Servers
*
* @private
* @memberof Server.Modules.Farm
* @function inactivateStaleServers
* @ignore
* @param {observable} source - Source Observable
* @return {observable} destination - Destination Observable
*
* @description
* This method sets up a local job that runs every 2 seconds. The job checks
* the lastUpdated property against each server in the farm and if it was updated
* more than 3 seconds ago, it's status will be updated within the farm-wide
* properties to become "inactive"
*
* @example
* Tbc...
*/
pipelines.init.inactivateStaleServers = function FarmIPLInactivateStaleServers(source) {
// noinspection JSUnresolvedFunction
return core.lib.rxOperator(function FarmIPLInactivateStaleServersOp(observer, evt) {
o.log('debug',
'Farm > [11] Setting Up Job to Inactivate Stale Servers...',
{module: mod.name}, 'FARM_SETUP_INACTIVATE_STALE_JOB');
// noinspection JSUnresolvedVariable
const dayjs = core.lib.dayjs;
// noinspection JSUnfilteredForInLoop
setInterval(function FarmIPLInactivateStaleServersInt() {
const currentDateStamp = dayjs();
// eslint-disable-next-line guard-for-in
for (const server in o.farmServers) {
// noinspection JSUnfilteredForInLoop
const lastUpdated = dayjs(o.farmServers[server].lastUpdated);
// noinspection JSUnfilteredForInLoop
if (currentDateStamp.diff(lastUpdated) > 3000 && o.farmServers[server].status === 'active') {
const val = JSON.stringify({
status: 'inactive',
lastUpdated: dayjs().format(),
});
mod.set('servers[' + server + ']', val);
}
}
}, 2000);
observer.next(evt);
}, source);
};
/**
* (Internal > Stream Methods [12]) Toggle Local Server as Job Server if Not In Farm
*
* @private
* @memberof Server.Modules.Farm
* @function toggleLocalAsJobServer
* @ignore
* @param {observable} source - Source Observable
* @return {observable} destination - Destination Observable
*
* @description
* This method waits 10 seconds following server boot to see whether it ends up joining
* a server farm. If it does not, then the "Primary Job Server" Role is automatically
* applied to this server. If it later joins a farm then it will relinquish this role.
* If no other servers in the farm have this role then selection will be based on a vote.
*
* @example
* Tbc...
*/
pipelines.init.toggleLocalAsJobServer = function FarmIPLToggleAsJobServer(source) {
// noinspection JSUnresolvedFunction
return core.lib.rxOperator(function FarmIPLToggleAsJobServerOp(observer, evt) {
let serverCount = 0;
for (const key in o.farmServers) {
// noinspection JSUnfilteredForInLoop
if (o.farmServers[key].status === 'active') serverCount++;
}
if (serverCount <= 1) {
o.jobServer = true;
o.log('debug',
'Farm > [12] This stand-alone server has been toggled as the Primary Job Server',
{module: mod.name}, 'FARM_IS_PRIMARY_JOB_SERVER');
} else {
o.log('debug',
'Farm > [12] This server is part of a farm and ' +
'may be allocated the Primary Job Server role in the future',
{module: mod.name}, 'FARM_NOT_PRIMARY_JOB_SERVER');
}
observer.next(evt);
}, source);
};
/**
* (Internal > Stream Methods [13]) Check & Vote On Job Server Roles
*
* @private
* @memberof Server.Modules.Farm
* @function checkAndVoteOnJobServerRoles
* @ignore
* @param {observable} source - Source Observable
* @return {observable} destination - Destination Observable
*
* @description
* This method sets up a job that only runs where there is > 1 server in the farm
* (ie; not for stand-alone servers). When run, the job checks the farm-wide
* properties - "PrimaryJobServer" and "SecondaryJobServer". If either of
* these servers are inactive or the properties are empty, then this server will publish
* a vote event to the farm that votes for two servers from the farm to fill these roles.
* Server selection is random. Upon all votes being submitted, this and all other servers
* in the farm will tally up the votes. If an even number of votes were submitted,
* the last vote is discarded. The servers that receive the most votes will automatically
* assign themselves the corresponding role and will update the farm-wide property for
* the role that they were allocated with a value of their server IP + port.
*
* @example
* Tbc...
*/
pipelines.init.checkAndVoteOnJobServerRoles = function FarmIPLCheckAndVoteOnJobServerRoles(source) {
// noinspection JSUnresolvedFunction
return core.lib.rxOperator(function FarmIPLCheckAndVoteOnJobServerRolesOp(observer, evt) {
observer.next(evt);
}, source);
};
/**
* (Internal > Utilities) Checks if a port is already taken or in use
*
* @private
* @memberof Server.Modules.Farm
* @function utils.isPortTaken
* @ignore
* @param {number} port - The port number to check
* @param {function} cb - Callback function
*
* @description
* Tbc...
*
* @example
* Tbc...
*/
o.utils.isPortTaken = function FarmUtilsIsPortTaken(port, cb) {
const tester = require('net').createServer()
.once('error', function FarmUtilsIsPortTakenOnErr(err) {
if (err.code !== 'EADDRINUSE') return cb(err);
cb(null, true);
})
.once('listening', function FarmUtilsIsPortTakenOnListen() {
tester.once('close', function FarmUtilsIsPortTakenOnClose() {
cb(null, false);
}).close();
})
.listen(port);
};
}();