Functionality

ClusterX as a global system manages all clusters, can be extended with ServerX, ClientFramework, and PayloadHandler. Anything executed once, must be declared between `if(cs.clusterMaster)..` such as jobINIT and express.js, on else.. new cluster worker starts executing nextCluster((..)) script.

Try it out!

Try the demo system, and see what it can do for you. Visit

Example Implementation

  • `new CLUSTERX({ cache: 0.01 }, debug)` : when declaring new cluster server you can set the cache expiry, `debug:true` : gives friendly notices.
  • `jobINIT(payload[...],callback:null, jobRef)` : when job is executed, cluster will run full cycle with as many workers available. Each payload item[x] represents 1 slave worker, so check `cs.clusterSize` to know how many CPUs your machine supports, and chunk your payload to that size or less! You Can execute as many `jobINIT` as needed, make sure to give each a different `jobRef` or it will be overwritten! Callback for cache will be supported in next version.
  • `nextCluster((workerIndex, nextData, jobRef)=>))` : should be executed outside `if(cs.clusterMaster)..`, as its already happening inside the slave worker. Callback is made to our logic operator for each payload[x], can also accept promise:
    # `workerIndex` number of each worker being executed # `nextData`current data item being processed # `jobRef` which job does it belong to # `cs.nextQueData` can access same attribute data from ClusterX class within each callback
  • `onDone((jobData, jobIndex, jobRef, error))` : Final return for each job/s already called inside `clusterMaster` when work is done for each `jobRef`:
    # `jobData` unformatted output with {timestamp}, {pid}, {data}, {jobRef}, {index}` # `jobIndex` incrementing job process index, depends on how many you run # `jobRef` current jobINIT that completed # `error` notify you if any errors have accrued! # `completedIn` returns time it toke to complete each job!
  • `isDoneData([...jobRefs],userData=>{})` : Include your jobs into a batch assignment and they will be returned on callback. From here you can do any final processing.
    # `killWorker(pid)` : kill worker at anytime by specifying `pid`, will renew it automatically. # `killAllClusters()` : kill all cluster workers at one go. Will renew all automatically. # `status((pid, status, allCache))` : will let you know when new worker goes online, or offline, will return all available cache from previous jobs

    //// example cluster.server.js declaration
    // ClusterX (StandAlone)

    // cache => 1 = hour
    var cs = new CLUSTERX({ cache: 0.01 }, debug)
    if (cs.clusterMaster) {
    
        var payload = [{ custom: 'e' }, { custom: 'f' }, { custom: 'g', d: 999 }] // worker per array item
        cs.jobINIT(payload, null, 'payload-a-job-1') // initiate job
    
        var payload = [{ data: 'z' }, { data: 'a' }, { data: 'm', d: 555 }]
        cs.jobINIT(payload, null, 'payload-a-job-2') // initiate job   
    
        var payload = [{ clusters: cs.clusterSize, cache: cs.cache }]
        cs.jobINIT(payload, null, 'config-1')
    }
    
    // ON SLAVE,  your logic operator
    cs.nextCluster((workerIndex, nextData, jobRef) => {
        // payloads should be chunked to size of available workers or less
        // supports promise as well
        // {nextData} is item index from `payload[index]`
        // function attributes also available from `cs.nextQueData`
        if (jobRef === 'payload-a-job-1' && nextData) {
            /// ... 
            return { data: merge.apply(null, [nextData, []]), 
              total: cs.clusterSize /** or payload size*/ }
        }
    
        if (jobRef === 'payload-a-job-2' && nextData) {
            /// ... 
            return { data: merge.apply(null, [nextData, []]), 
                    total: cs.clusterSize /** or payload size*/ }
        }
    
        // can accept promise
        if (itemData) {
            var defer = q.defer()
            // {data} property cannot change
            // call to some api d = await apiData(...)
            defer.resolve({ data: merge.apply(null, [itemData, []]), 
                            total: cs.clusterSize /** or payload size*/ })
            return defer.promise
        } else {
            return { data: null, total: cs.clusterSize }
        }
    }).init()
    
    cs.onDone((jobData, jobIndex, jobRef, error) => {
    
        if (error) {
            // console.log(error)
        }
        
          cs.isDoneData(['payload-a-job-1', 'payload-a-job-2'], data => {
              console.log('payload-a-job-1 and payload-a-job-2 done', data)

              // data output >>
              // { 'payload-a-job-1':
              //     { data:
              //     [ { custom: 'e' }, { custom: 'f' }, { custom: 'g', d: 999 } ],
              //     pid: [ '8400', '9936', '10232' ],
              //     completedIn: 0.569 },
              // 'payload-a-job-2':
              //     { data: [ { data: 'z' }, { data: 'a' }, { data: 'm', d: 555 } ],
              //     pid: [ '8400', '9936', '10232' ],
              //     completedIn: 0.111 } } 
              // }
              
          })
    
        cs.isDoneData(['config-1'], data => {
            console.log('payload-a-job-1 and payload-a-job-2 done', data)
        })
    })
    
    // end

ClusterX Server Application (overview)

  • `ClientFramework` code snippet has been included below. For clarity you should label your jobs as `batch{Index}` where `index` corresponds to your route and controller name, and within each batch you declare your jobs, then `next()` method is called from `nextCluster` callback.
        /**
         * @batchIndex
         * * `payloadHandlerX.watch` eventName refers to route name, its where this call initiates from
         * so the watch will wait for `/index` route event and then fire to initiate your batch job
         * * you need to set to watch for these jobs in `next()`
         */
        batchIndex() {
            var ref = this.validRef('index')
            payloadHandlerX.watch(ref, (params) => {
                var p = params.data

                // `params` from http request
                //   var data = [{ custom: 'e' }, { custom: 'f' }, { custom: 'g', d: 999 }]
                //   clusterX.jobINIT(data, null, 'index-1') // initiate job

                /**
                * each `jobINIT` calls to available workers/ your CPU count
                * for example: 5-cpus > 5 workers > you should have array of 5 items or less in your payload
                * each item per worker will be executed simultaneously
                */
                var data1 = [{ test: 1 }, { test: 2 }, { test: 3 }]
                clusterX.jobINIT(data1, err => {
                    // on error all `jobINIT` calls are rest
                    payloadHandlerX.sendUpdate('index', err)
                }, 'index-1')

                var data2 = [{ id: 1 }, { id: p }, { id: p + 1 }]
                clusterX.jobINIT(data2, null, 'index-2')
                // clusterX.jobINIT(someData, null, 'index-3')
                // clusterX.jobINIT(someData, null, 'index-4') /// etc
            })
        }

        /**
        * @next
        * * cluster assignment is passed to `nextCluster` callback method
        * * when this method is called you are already in the worker environment, 
           so your logic and any async calls should be done here!
        * You have to return value via `clusterXFormat(...)`
        * Each `jobRef` watches for job initiated by your `batchIndex` or `batchSingle` 
          per example application
        */
       async next() {
           // `config` and `cache` routes are declared from `cluster.server.js`

           /// index
           if (this.nextQueData.jobRef === 'index-1') {
              // just forward without any changes
               return this.clusterXFormat(this.nextQueData.nextData, 3)
           }
           // NOTE multi worker process and logic operator
           if (this.nextQueData.jobRef === 'index-2') {
               var id = this.nextQueData.nextData.id
               try {
                   var d = await dataAPI(id)
                   if (!(d || {}).data.length) throw ('no data found')
                   // add up all larger and smaller
                   var larger = 0
                   var smaller = 0
                   for (var i = 0; i < d.data.length; i++) {
                       var item = d.data[i]
                       if (item.larger) larger += item.size
                       else smaller += item.size
                   }
                   return this.clusterXFormat({ larger, smaller }, 3)
               } catch (err) {
                   return this.clusterXFormat({ error: err }, 1)
               }
           }
           ///  index end  

           /**
           * @jobOnDone
           * * accept callback in raw data format for each `jobINIT` completed
           * * this call is made on comletion of each `jobINIT`, 
                its already in clusterMaster environment
           */
          jobOnDone(cb) {
              clusterX.onDone((jobData, jobIndex, jobRef, error) => {
                  if (error) {
                      notify.ulog({ message: 'last job received due to error', jobIndex }, true)
                      cb({ error })
                  }
  
                  clusterX.isDoneData(['single-1'], data => {
                      payloadHandlerX.sendUpdate('single', data)
                  })

                  // waiting until 2 jobs are completed, and then the event is trigered
                  clusterX.isDoneData(['index-1', 'index-2'], data => {
                      // do final data processing before sending to GET /index response
                      payloadHandlerX.sendUpdate('index', data)
                  })

           ...     

           /**
           // cluster.server.js file
           clusterX.onDone((jobData, jobIndex, jobRef, error) => {
            // {jobData} raw data output per job completion     
            // { '1':
            // { timestamp: 1569293898957,
            //   jobRef: 'index-2',
            //   resp: { data: { larger: 8265232, smaller: 4148819 } },
            //   pid: '4180' },
            // '2':
            // { timestamp: 1569293898957,
            //   jobRef: 'index-2',
            //   resp: { data: { larger: 8339097, smaller: 4294688 } },
            //   pid: '7820' },
            // '3':
            // { timestamp: 1569293898958,
            //   jobRef: 'index-2',
            //   resp: { data: { larger: 8313370, smaller: 4163415 } },
            //   pid: '10076' } }
            
            // })

           **/

  • `PayloadhandlerX` - event processing middleware, extends functionality of LiveEvent with 4 methods: watch(), dispatch(), sendUpdate(), delete().

    Its clever a middleware to send data across nested applications, if you want 2-way data flow, `dispatch`<>`watch`, use `sendUpdate`, then add callback to `dispatch`. Order of declaration does not matter, but only for `dispatch` and `watch`. Look at readme.md for instructions.

    Basic use:

    
    //// example declaration
    // payloadHandlerX is an extension of LiveEvent

    const payloadHandlerX = new PayloadHandlerX({ debug: true })
    const eventName = "testJob-1"
     
    // callback will only work if sendUpdate was called 
    // at any time before delete event triggered
    payloadHandlerX.dispatch(eventName, { data: ... }, update => { 
        // waiting for update from sendUpdate
    })
   
    payloadHandlerX.watch(eventName,newData=>{
      // waiting for dispatch data to arrive
    })
   
    // this method is optional.
    payloadHandlerX.sendUpdate(eventName,data) 
    
    // will delete the event all together
    payloadHandlerX.delete(eventName)

   // end
  • `SimpleCacheX` : Save to file cache lives INSIDE your app, and has expire option. This is just an example to show how you can use caching inside application. I would also recommend something like `Redis` or `database` etc... Check out Github repo
  • `ServerX` : Extended Express.js, works together with `PayloadHandlerX` to exchange 2 way data handling.
    # `routes` : After declaring route in `ClientFramework`, you set new controller in `controllers.js`, add the route in `server.routes.js` (same label name), and add it to `allowedRoutes.js` # `./serverX/allowedRoutes.js` : Include all your route names inside of this file, and make sure to follow naming strategy: controller/route/eventName to have same name label's. This script is declared in `Authentication.js` and `client.app.js` for validation purposes. # `./clientFramework/client.app.js` : An example client application integration with ClusterX with ServerX # `./cluster.server.js` everything is initiated from this file

    code snippet:


  #Controller.js snippet

  /**
  * @index
  * * After get /index request is made it will send the `dispatch` 
      data to your application to start your jobs, and it will wait on 
      callback until the job is completed. In case the request was canceled, 
      `onAbort` will cancel the job and exit
  */
  async index(req, res) {
    var token = req.session.accessToken

    if (!token) {
        return res.status(400).json({ error: true, message: 'your session expired' })
    }
    if (token) {
        var access = await this.verifyAccess(req, token)
        if (access) {
            this.onAbort(req, res, 'index')
            var routeName = ServerXControllers.prototype.index.name

            // `data` attribute must be set
            var exampleData = { data: req.params.something || 2 } 
            payloadHandlerX.dispatch(routeName, exampleData, update => {
                this.routeStatus['index'] = 'done'
                this.cleanEnd(res, update)
            })
        } else {
            req.session.accessToken = null
            console.log('that is the wrong token', token)
            return res.status(400).json({ error: true, 
                                          message: 'your session expired' })
        }
    }
  }

  // end

Business enquiries

If you are interested in ClusterX, please contact us at: eaglex.net and submit your enquiry. Thanks!