The ServiceBroker
is the main component of Moleculer. It handles services & events, calls actions and communicates with remote nodes. You need to create an instance of ServiceBroker
for every node.
Create broker
Create broker with default settings:
const { ServiceBroker } = require("moleculer"); |
Create broker with custom settings:
const { ServiceBroker } = require("moleculer"); |
Create broker with NATS transporter:
const { ServiceBroker } = require("moleculer"); |
Create broker with cacher:
const { ServiceBroker } = require("moleculer"); |
Broker options
All available broker options:
{ |
Name | Type | Default | Description |
---|---|---|---|
namespace |
String |
"" |
Namespace of nodes. With it you can segment your nodes on the same network. |
nodeID |
String |
hostname + PID | Unique node identifier. |
logger |
Boolean or Object or Function |
null |
Logger class. During development you can set to console or true . In production you can use an external logger e.g. winston or pino. Read more. |
logLevel |
String |
info |
Log level for built-in console logger (trace, debug, info, warn, error, fatal). |
logFormatter |
String or Function |
"default" |
Log formatting for built-in console logger. Values: default , simple . It can be also Function. |
logObjectPrinter |
Function |
null |
Custom object & array printer for built-in console logger. |
transporter |
String or Object or Transporter |
null |
Transporter settings. Required if you have 2 or more nodes. Read more. |
requestTimeout |
Number |
0 |
Number of milliseconds to wait before returning a RequestTimeout error when it takes too long to return a value. Disable: 0 |
requestRetry |
Number |
0 |
Count of retries. If the request is timed out, broker will try to call again. |
maxCallLevel |
Number |
0 |
Limit of call level. If it reaches the limit, broker will throw an MaxCallLevelError error. (Infinite loop protection) |
heartbeatInterval |
Number |
5 |
Number of seconds to send heartbeat packet to other nodes. |
heartbeatTimeout |
Number |
15 |
Number of seconds to wait before setting node to unavailable status. |
trackContext |
Boolean |
false |
All services wait for all running contexts before shutdowning. |
gracefulStopTimeout |
Number |
2000 |
Max time to wait running contexts before shutdowning. |
disableBalancer |
Boolean |
false |
Disable built-in request & emit balancer. Only if the transporter support it. |
registry |
Object |
Settings of Service Registry | |
circuitBreaker |
Object |
Settings of Circuit Breaker | |
cacher |
String or Object or Cacher |
null |
Cacher settings. Read more |
serializer |
String or Serializer |
JSONSerializer |
Instance of serializer. Read more |
validation |
Boolean |
true |
Enable parameters validation. |
validator |
Validator |
null |
Custom Validator class for validation. |
metrics |
Boolean |
false |
Enable metrics function. |
metricsRate |
Number |
1 |
Rate of metrics calls. 1 means to measure every request. |
statistics |
Boolean |
false |
Enable broker statistics. Collect the requests count & latencies. |
internalServices |
Boolean |
true |
Register internal services for metrics & statistics functions. |
hotReload |
Boolean |
false |
Watch the loaded services and hot reload if they changed. Read more. |
ServiceFactory |
ServiceClass |
null |
Custom Service class. If not null , broker will use it when creating services. |
ContextFactory |
ContextClass |
null |
Custom Context class. If not null , broker will use it when creating contexts. |
middlewares |
Array<Function> |
null |
Register middlewares. Useful when you use Moleculer Runner |
started |
Function |
null |
Fired when the broker started. Useful when you use Moleculer Runner. |
stopped |
Function |
null |
Fired when the broker stopped. Useful when you use Moleculer Runner. |
created |
Function |
null |
Fired when the broker created. Useful when you use Moleculer Runner. |
transit.maxQueueSize |
Number |
50000 |
A protection against inordinate memory usages when there are too many outgoing requests. If there are more than stated outgoing live requests, the new requests will be rejected with QueueIsFullError `error |
Moleculer runnerYou don’t need to create manually ServiceBroker in your project. You can use the Moleculer Runner to create and execute a broker and load services. Read more about Moleculer Runner.
Call services
To call a service, use the broker.call
method. The broker looks for the service (and a node) which has the given action and call it. The function returns a Promise
.
Syntax
const res = await broker.call(actionName, params, opts); |
The actionName
is a dot-separated string. The first part of it is the service name, while the second part of it represents the action name. So if you have a posts
service with a create
action, you can call it as posts.create
.
The params
is an object which is passed to the action as a part of the Context. It is optional.
The opts
is an object to set/override some request parameters, e.g.: timeout
, retryCount
. It is optional.
Available calling options:
Name | Type | Default | Description |
---|---|---|---|
timeout |
Number |
requestTimeout of broker |
Timeout of request in milliseconds. If the request is timed out and you don’t define fallbackResponse , broker will throw a RequestTimeout error. To disable set 0 or null |
retryCount |
Number |
requestRetry of broker |
Count of retry of request. If the request is timed out, broker will try to call again. |
fallbackResponse |
Any |
null |
Returns it, if the request has failed. More info |
nodeID |
String |
null |
Target nodeID. If set, it will make a direct call to the given node. |
meta |
Object |
null |
Metadata of request. Access it via ctx.meta in handlers. It will be transferred in sub-calls as well. |
Usage
// Call without params |
Request timeout & fallback response
If the timeout
option is defined and the request is timed out, broker will throw a RequestTimeoutError
error.
But if you set fallbackResponse
in options, broker won’t throw error. Instead, it will return this given value. The fallbackResponse
can be an Object
, Array
…etc.
The fallbackResponse
can also be a Function
, which returns a Promise
. In this case, the broker passes the current Context
& Error
objects to this function as arguments.
broker.call("user.recommendation", { limit: 5 }, { |
Distributed timeouts
Moleculer uses distributed timeouts. In case of nested calls, the timeout value is decremented with the elapsed time. If the timeout value is less or equal than 0, the next nested calls are skipped (RequestSkippedError
) because the first call has already been rejected a RequestTimeoutError
error.
Retries
If the retryCount
is defined in calling options and the request returns a MoleculerRetryableError
error, broker will recall the action with the same parameters as long as retryCount
is greater than 0
.
broker.call("user.list", { limit: 5 }, { timeout: 500, retryCount: 3 }) |
Metadata
With meta
you can send meta informations to services. Access it via ctx.meta
in action handlers. Please note in nested calls the meta is merged.
broker.createService({ |
Since v0.12, ctx.meta
is sent back to the caller service. You can use it to send extra meta information back to the caller. E.g.: send response headers back to API gateway or set resolved logged in user to metadata.
broker.createService({ |
Emit events
Broker has a built-in balanced event bus to support Event-driven architecture. You can send events to the local and remote services.
Balanced events with grouping
You can send balanced events with emit
functions. In this case, only one instance per service receives the event.
Example: you have 2 main services:
users
&payments
. Both subscribe to theuser.created
event. You start 3 instances fromusers
service and 2 instances frompayments
service. When you emit theuser.created
event, only oneusers
and onepayments
service instance will receive the event.
The first parameter is the name of the event, the second parameter is the payload. If you want to send multiple values, you should wrap them in an object.
// The `user` will be serialized to transportation. |
With third parameter, you can specify which groups/services receive the event:
// Only the `mail` & `payments` services receives it |
Broadcast event
With broker.broadcast
method you can send events to all local remote services. It is not balanced, all service instances receive it.
broker.broadcast("config.changed", config); |
With third parameter, you can specify which groups/services receive the event:
// Send to all "mail" service instances |
Local events
Every local event must start with $
(dollar sign). E.g.: $node.connected
.
Please note, if you publish these events with emit
or broadcast
, only local services will receive them, don’t be transferred to remote nodes.
Subscribe to events
You can subscribe to events in ‘events’ property of services.
In event names wildcards is available.
module.exports = { |
Start & Stop
Starting logic
The broker starts transporter connecting but it doesn’t publish the local service list to remote nodes. When it’s done, it starts all services (calls service started
handlers). Once all services start successfully, broker publishes the local service list to remote nodes. Hence other nodes send requests only after all local service started properly.
Avoid deadlocksYou can make dead-locks when two services wait for each other. E.g.:
users
service hasdependencies: [posts]
andposts
service hasdependencies: [users]
. To avoid it remove the concerned service fromdependencies
and usewaitForServices
method out ofstarted
handler instead.
Stopping logic
When you call broker.stop
or stop the process, the broker starts stopping all local services. Afterwards, the transporter disconnects.
Middlewares
Broker supports middlewares. You can add your custom middlewares, and it will be called before/after every local request. The middleware is a Function
that returns a wrapped action handler.
Example middleware from the validator modules
return function validatorMiddleware(handler, action) { |
The handler
is the action handler, which is defined in Service schema. The action
is the action definition object from Service schema. The middleware should return either the original handler
or a new wrapped handler. As you can see above, we check whether the action has a params
props. If yes, we’ll return a wrapped handler which calls the validator module before calling the original handler
.
If the params
property is not defined, we will return the original handler
(skipped wrapping).
If you don’t call the original
handler
in the middleware it will break the request. You can use it in cachers. For example, if it finds the requested data in the cache, it’ll return the cached data instead of calling thehandler
.
Example code from cacher middleware
return (handler, action) => { |
The
handler
always returns aPromise
. It means that you can access to responses and manipulate them.
Wait for services
To wait for services, use the waitForServices
method. It returns a Promise
which will be resolved, when all services are available.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
services |
String or Array |
- | Service list to waiting |
timeout |
Number |
0 |
Waiting timeout. 0 means no timeout. If reached, a MoleculerServerError will be rejected. |
interval |
Number |
1000 |
Frequency of watches in milliseconds |
Example
broker.waitForServices(["posts", "users"]).then(() => { |
Set timeout & interval
broker.waitForServices("accounts", 10 * 1000, 500).then(() => { |
Versioned services
broker.waitForServices([ |
Internal services
The broker contains some internal services to check the node health or get broker statistics. You can disable to load them with the internalServices: false
broker option.
List of nodes
It lists all known nodes (including local node).
broker.call("$node.list").then(res => console.log(res)); |
List of services
It lists all registered services (local & remote).
broker.call("$node.services").then(res => console.log(res)); |
It has some options which you can declare within params
.
Options
Name | Type | Default | Description |
---|---|---|---|
onlyLocal |
Boolean |
false |
List only local services. |
skipInternal |
Boolean |
false |
Skip the internal services ($node ). |
withActions |
Boolean |
false |
List with actions. |
onlyAvailable |
Boolean |
false |
List only services that are available. |
List of local actions
It lists all registered actions (local & remote).
broker.call("$node.actions").then(res => console.log(res)); |
It has some options which you can declare within params
.
Options
Name | Type | Default | Description |
---|---|---|---|
onlyLocal |
Boolean |
false |
List only local actions. |
skipInternal |
Boolean |
false |
Skip the internal actions ($node ). |
withEndpoints |
Boolean |
false |
List with endpoints (nodes). |
onlyAvailable |
Boolean |
false |
List only actions that are available. |
List of local events
It lists all event subscriptions.
broker.call("$node.events").then(res => console.log(res)); |
It has some options which you can declare within params
.
Options
Name | Type | Default | Description |
---|---|---|---|
onlyLocal |
Boolean |
false |
List only local subscriptions. |
skipInternal |
Boolean |
false |
Skip the internal event subscriptions $ . |
withEndpoints |
Boolean |
false |
List with endpoints (nodes). |
onlyAvailable |
Boolean |
false |
List only subscriptions that are available. |
Health of node
It returns the health info of local node (including process & OS information).
broker.call("$node.health").then(res => console.log(res)); |
Example health info:
{ |
Statistics
It returns the request statistics if the statistics
is enabled in broker options.
broker.call("$node.stats").then(res => console.log(res)); |
Example statistics:
{ |
Internal events
The broker broadcasts some local events.
$services.changed
The broker sends this event if some node loads or destroys services after broker.start
.
Payload
Name | Type | Description |
---|---|---|
localService |
Boolean |
True if a local service changed. |
$circuit-breaker.opened
The broker sends this event when the circuit breaker module change its state to open
.
Payload
Name | Type | Description |
---|---|---|
nodeID |
String |
Node ID |
action |
String |
Action name |
failures |
Number |
Count of failures |
$circuit-breaker.half-opened
The broker sends this event when the circuit breaker module change its state to half-open
.
Payload
Name | Type | Description |
---|---|---|
nodeID |
String |
Node ID |
action |
String |
Action name |
$circuit-breaker.closed
The broker sends this event when the circuit breaker module change its state to closed
.
Payload
Name | Type | Description |
---|---|---|
nodeID |
String |
Node ID |
action |
String |
Action name |
$node.connected
The broker sends this event when a node connected or reconnected.
Payload
Name | Type | Description |
---|---|---|
node |
Node |
Node info object |
reconnected |
Boolean |
Is reconnected? |
$node.updated
The broker sends this event when it has received an INFO message from a node, i.e. changed the service list on the node.
Payload
Name | Type | Description |
---|---|---|
node |
Node |
Node info object |
$node.disconnected
The broker sends this event when a node disconnected.
Payload
Name | Type | Description |
---|---|---|
node |
Node |
Node info object |
unexpected |
Boolean |
true - Not received heartbeat, false - Received DISCONNECT message from node. |
$broker.started
The broker sends this event once broker.start()
is called and all local services are started.
$broker.stopped
The broker sends this event once broker.stop()
is called and all local services are stopped.