ClusterX as a global system manages all clusters, can be extended with ServerX, ClientFramework, and PayloadHandler. Anything executed once, must be declared between `if(cs.clusterMaster)..` such as jobINIT and express.js, on else.. new cluster worker starts executing nextCluster((..)) script.
Try the demo system, and see what it can do for you. Visit
`new CLUSTERX({ cache: 0.01 }, debug)` : when declaring new cluster server you can set the cache expiry, `debug:true` : gives friendly notices.`jobINIT(payload[...],callback:null, jobRef)` : when job is executed, cluster will run full cycle with as many workers available. Each payload item[x] represents 1 slave worker, so check `cs.clusterSize` to know how many CPUs your machine supports, and chunk your payload to that size or less! You Can execute as many `jobINIT` as needed, make sure to give each a different `jobRef` or it will be overwritten! Callback for cache will be supported in next version.
`nextCluster((workerIndex, nextData, jobRef)=>))` : should be executed outside `if(cs.clusterMaster)..`, as its already happening inside the slave worker. Callback is made to our logic operator for each payload[x], can also accept promise:
# `workerIndex` number of each worker being executed
# `nextData`current data item being processed
# `jobRef` which job does it belong to
# `cs.nextQueData` can access same attribute data from ClusterX class within each callback
`onDone((jobData, jobIndex, jobRef, error))` : Final return for each job/s already called inside `clusterMaster` when work is done for each `jobRef`:
# `jobData` unformatted output with {timestamp}, {pid}, {data}, {jobRef}, {index}`
# `jobIndex` incrementing job process index, depends on how many you run
# `jobRef` current jobINIT that completed
# `error` notify you if any errors have accrued!
# `completedIn` returns time it toke to complete each job!
`isDoneData([...jobRefs],userData=>{})` : Include your jobs into a batch assignment and they will be returned on callback. From here you can do any final processing.
# `killWorker(pid)` : kill worker at anytime by specifying `pid`, will renew it automatically.
# `killAllClusters()` : kill all cluster workers at one go. Will renew all automatically.
# `status((pid, status, allCache))` : will let you know when new worker goes online, or offline, will return all available cache from previous jobs
//// example cluster.server.js declaration
// ClusterX (StandAlone)
// cache => 1 = hour
var cs = new CLUSTERX({ cache: 0.01 }, debug)
if (cs.clusterMaster) {
var payload = [{ custom: 'e' }, { custom: 'f' }, { custom: 'g', d: 999 }] // worker per array item
cs.jobINIT(payload, null, 'payload-a-job-1') // initiate job
var payload = [{ data: 'z' }, { data: 'a' }, { data: 'm', d: 555 }]
cs.jobINIT(payload, null, 'payload-a-job-2') // initiate job
var payload = [{ clusters: cs.clusterSize, cache: cs.cache }]
cs.jobINIT(payload, null, 'config-1')
}
// ON SLAVE, your logic operator
cs.nextCluster((workerIndex, nextData, jobRef) => {
// payloads should be chunked to size of available workers or less
// supports promise as well
// {nextData} is item index from `payload[index]`
// function attributes also available from `cs.nextQueData`
if (jobRef === 'payload-a-job-1' && nextData) {
/// ...
return { data: merge.apply(null, [nextData, []]),
total: cs.clusterSize /** or payload size*/ }
}
if (jobRef === 'payload-a-job-2' && nextData) {
/// ...
return { data: merge.apply(null, [nextData, []]),
total: cs.clusterSize /** or payload size*/ }
}
// can accept promise
if (itemData) {
var defer = q.defer()
// {data} property cannot change
// call to some api d = await apiData(...)
defer.resolve({ data: merge.apply(null, [itemData, []]),
total: cs.clusterSize /** or payload size*/ })
return defer.promise
} else {
return { data: null, total: cs.clusterSize }
}
}).init()
cs.onDone((jobData, jobIndex, jobRef, error) => {
if (error) {
// console.log(error)
}
cs.isDoneData(['payload-a-job-1', 'payload-a-job-2'], data => {
console.log('payload-a-job-1 and payload-a-job-2 done', data)
// data output >>
// { 'payload-a-job-1':
// { data:
// [ { custom: 'e' }, { custom: 'f' }, { custom: 'g', d: 999 } ],
// pid: [ '8400', '9936', '10232' ],
// completedIn: 0.569 },
// 'payload-a-job-2':
// { data: [ { data: 'z' }, { data: 'a' }, { data: 'm', d: 555 } ],
// pid: [ '8400', '9936', '10232' ],
// completedIn: 0.111 } }
// }
})
cs.isDoneData(['config-1'], data => {
console.log('payload-a-job-1 and payload-a-job-2 done', data)
})
})
// end
`ClientFramework` code snippet has been included below. For clarity you should label your jobs as `batch{Index}` where `index` corresponds to your route and controller name, and within each batch you declare your jobs, then `next()` method is called from `nextCluster` callback.
/**
* @batchIndex
* * `payloadHandlerX.watch` eventName refers to route name, its where this call initiates from
* so the watch will wait for `/index` route event and then fire to initiate your batch job
* * you need to set to watch for these jobs in `next()`
*/
batchIndex() {
var ref = this.validRef('index')
payloadHandlerX.watch(ref, (params) => {
var p = params.data
// `params` from http request
// var data = [{ custom: 'e' }, { custom: 'f' }, { custom: 'g', d: 999 }]
// clusterX.jobINIT(data, null, 'index-1') // initiate job
/**
* each `jobINIT` calls to available workers/ your CPU count
* for example: 5-cpus > 5 workers > you should have array of 5 items or less in your payload
* each item per worker will be executed simultaneously
*/
var data1 = [{ test: 1 }, { test: 2 }, { test: 3 }]
clusterX.jobINIT(data1, err => {
// on error all `jobINIT` calls are rest
payloadHandlerX.sendUpdate('index', err)
}, 'index-1')
var data2 = [{ id: 1 }, { id: p }, { id: p + 1 }]
clusterX.jobINIT(data2, null, 'index-2')
// clusterX.jobINIT(someData, null, 'index-3')
// clusterX.jobINIT(someData, null, 'index-4') /// etc
})
}
/**
* @next
* * cluster assignment is passed to `nextCluster` callback method
* * when this method is called you are already in the worker environment,
so your logic and any async calls should be done here!
* You have to return value via `clusterXFormat(...)`
* Each `jobRef` watches for job initiated by your `batchIndex` or `batchSingle`
per example application
*/
async next() {
// `config` and `cache` routes are declared from `cluster.server.js`
/// index
if (this.nextQueData.jobRef === 'index-1') {
// just forward without any changes
return this.clusterXFormat(this.nextQueData.nextData, 3)
}
// NOTE multi worker process and logic operator
if (this.nextQueData.jobRef === 'index-2') {
var id = this.nextQueData.nextData.id
try {
var d = await dataAPI(id)
if (!(d || {}).data.length) throw ('no data found')
// add up all larger and smaller
var larger = 0
var smaller = 0
for (var i = 0; i < d.data.length; i++) {
var item = d.data[i]
if (item.larger) larger += item.size
else smaller += item.size
}
return this.clusterXFormat({ larger, smaller }, 3)
} catch (err) {
return this.clusterXFormat({ error: err }, 1)
}
}
/// index end
/**
* @jobOnDone
* * accept callback in raw data format for each `jobINIT` completed
* * this call is made on comletion of each `jobINIT`,
its already in clusterMaster environment
*/
jobOnDone(cb) {
clusterX.onDone((jobData, jobIndex, jobRef, error) => {
if (error) {
notify.ulog({ message: 'last job received due to error', jobIndex }, true)
cb({ error })
}
clusterX.isDoneData(['single-1'], data => {
payloadHandlerX.sendUpdate('single', data)
})
// waiting until 2 jobs are completed, and then the event is trigered
clusterX.isDoneData(['index-1', 'index-2'], data => {
// do final data processing before sending to GET /index response
payloadHandlerX.sendUpdate('index', data)
})
...
/**
// cluster.server.js file
clusterX.onDone((jobData, jobIndex, jobRef, error) => {
// {jobData} raw data output per job completion
// { '1':
// { timestamp: 1569293898957,
// jobRef: 'index-2',
// resp: { data: { larger: 8265232, smaller: 4148819 } },
// pid: '4180' },
// '2':
// { timestamp: 1569293898957,
// jobRef: 'index-2',
// resp: { data: { larger: 8339097, smaller: 4294688 } },
// pid: '7820' },
// '3':
// { timestamp: 1569293898958,
// jobRef: 'index-2',
// resp: { data: { larger: 8313370, smaller: 4163415 } },
// pid: '10076' } }
// })
**/
`PayloadhandlerX` - event processing middleware, extends functionality of LiveEvent with 4 methods: watch(), dispatch(), sendUpdate(), delete().
Its clever a middleware to send data across nested applications, if you want 2-way data flow, `dispatch`<>`watch`, use `sendUpdate`, then add callback to `dispatch`. Order of declaration does not matter, but only for `dispatch` and `watch`. Look at readme.md for instructions.
Basic use:
//// example declaration
// payloadHandlerX is an extension of LiveEvent
const payloadHandlerX = new PayloadHandlerX({ debug: true })
const eventName = "testJob-1"
// callback will only work if sendUpdate was called
// at any time before delete event triggered
payloadHandlerX.dispatch(eventName, { data: ... }, update => {
// waiting for update from sendUpdate
})
payloadHandlerX.watch(eventName,newData=>{
// waiting for dispatch data to arrive
})
// this method is optional.
payloadHandlerX.sendUpdate(eventName,data)
// will delete the event all together
payloadHandlerX.delete(eventName)
// end
`SimpleCacheX` : Save to file cache
lives INSIDE your app, and has expire option. This is just an example to show how you can use caching inside
application. I would also recommend something like `Redis` or `database` etc... Check out Github repo
`ServerX` : Extended Express.js, works together with `PayloadHandlerX` to exchange 2 way data handling.
# `routes` : After declaring route in `ClientFramework`, you set new controller in `controllers.js`, add the route in `server.routes.js` (same label name), and add it to `allowedRoutes.js`
# `./serverX/allowedRoutes.js` : Include all your route names inside of this file, and make sure to follow naming strategy: controller/route/eventName to have same name label's. This script is declared in `Authentication.js` and `client.app.js` for validation purposes.
# `./clientFramework/client.app.js` : An example client application integration with ClusterX with ServerX
# `./cluster.server.js` everything is initiated from this file
code snippet:
#Controller.js snippet
/**
* @index
* * After get /index request is made it will send the `dispatch`
data to your application to start your jobs, and it will wait on
callback until the job is completed. In case the request was canceled,
`onAbort` will cancel the job and exit
*/
async index(req, res) {
var token = req.session.accessToken
if (!token) {
return res.status(400).json({ error: true, message: 'your session expired' })
}
if (token) {
var access = await this.verifyAccess(req, token)
if (access) {
this.onAbort(req, res, 'index')
var routeName = ServerXControllers.prototype.index.name
// `data` attribute must be set
var exampleData = { data: req.params.something || 2 }
payloadHandlerX.dispatch(routeName, exampleData, update => {
this.routeStatus['index'] = 'done'
this.cleanEnd(res, update)
})
} else {
req.session.accessToken = null
console.log('that is the wrong token', token)
return res.status(400).json({ error: true,
message: 'your session expired' })
}
}
}
// end
If you are interested in ClusterX, please contact us at: eaglex.net and submit your enquiry. Thanks!