Map-reduce is perhaps the most versatile of the aggregation operations that MongoDB supports.
Map-Reduce is a popular programming model that originated at Google for processing and aggregating large volumes of data in parallel. A detailed discussion on Map-Reduce is out of the scope of this article but essentially it is a multi-step aggregation process. The most important two steps are the map stage (process each document and emit results) and the reduce stage (collates results emitted during the map stage).
MongoDB supports three kinds of aggregation operations: Map-Reduce, aggregation pipeline and single purpose aggregation commands. You can use this MongoDB comparison document to see which fits your needs.https://scalegrid.io/blog/mongodb-performance-running-mongodb-map-reduce-operations-on-secondaries/
In my last post, we saw, with examples, how to run Aggregation pipelines on secondaries. In this post, we will walk through running Map-Reduce jobs on the MongoDB secondary replicas.
MongoDB Map-Reduce
MongoDB supports running Map-Reduce jobs on the database servers. This offers the flexibility to write complex aggregation tasks that aren’t as easily done via aggregation pipelines. MongoDB lets you write custom map and reduce functions in Javascript that can be passed to the database via Mongo shell or any other client. On large and constantly growing data sets, one can even consider running incremental Map-Reduce jobs to avoid processing older data every time.
Historically, the map and the reduce methods used to be executed in a single-threaded context. However, that limitation was removed in version 2.4.
Why run Map-Reduce jobs on the Secondary?
Like other aggregation jobs, Map-Reduce too is a resource intensive ‘batch’ job so it is a good fit for running on read-only replicas. The caveats in doing so are:
1) It should be ok to use slightly stale data. Or you can tweak the write concern to ensure replicas are always in sync with the primary. This second option assumes that taking a hit on the write performance is acceptable.
2) The output of the Map-Reduce job shouldn’t be written to another collection within the database but rather be returned to the application (i.e. no writes to the database).
Let’s look at how to do this via examples, both from the mongo shell and the Java driver.
Map-Reduce on Replica Sets
Data Set
For illustration, we will use a rather simple data set: A daily transaction record dump from a retailer. A sample entry looks like:
RS-replica-0:PRIMARY> use test switched to db test RS-replica-0:PRIMARY> show tables txns RS-replica-0:PRIMARY> db.txns.findOne() { "_id" : ObjectId("584a3b71cdc1cb061957289b"), "custid" : "cust_66", "txnval" : 100, "items" : [{"sku": sku1", "qty": 1, "pr": 100}, ...], ... }
In our examples, we will calculate the total expenditure of a given customer on that day. Thus, given our schema, the map and reduce methods will look like:
var mapFunction = function() { emit(this.custid, this.txnval); } // Emit the custid and txn value from each record var reduceFunction = function(key, values) { return Array.sum(values); } // Sum all the txn values for a given custid
With our schema established, let’s look at Map-Reduce in action.
MongoDB Shell
In order to ensure that a Map-Reduce job is executed on the secondary, the read preference should be set to secondary. Like we said above, in order for a Map-Reduce to run on a secondary, the output of the result must be inline (In fact, that’s is the only out value allowed on secondaries). Let’s see how it works.
$ mongo -u admin -p pwd --authenticationDatabase admin --host RS-replica-0/server-1.servers.example.com:27017,server-2.servers.example.com:27017 MongoDB shell version: 3.2.10 connecting to: RS-replica-0/server-1.servers.example.com:27017,server-2.servers.example.com:27017/test 2016-12-09T08:15:19.347+0000 I NETWORK [thread1] Starting new replica set monitor for server-1.servers.example.com:27017,server-2.servers.example.com:27017 2016-12-09T08:15:19.349+0000 I NETWORK [ReplicaSetMonitorWatcher] starting RS-replica-0:PRIMARY> db.setSlaveOk() RS-replica-0:PRIMARY> db.getMongo().setReadPref('secondary') RS-replica-0:PRIMARY> db.getMongo().getReadPrefMode() secondary RS-replica-0:PRIMARY> var mapFunc = function() { emit(this.custid, this.txnval); } RS-replica-0:PRIMARY> var reduceFunc = function(key, values) { return Array.sum(values); } RS-replica-0:PRIMARY> db.txns.mapReduce(mapFunc, reduceFunc, {out: { inline: 1 }}) { "results" : [ { "_id" : "cust_0", "value" : 72734 }, { "_id" : "cust_1", "value" : 67737 }, ... ] "timeMillis" : 215, "counts" : { "input" : 10000, "emit" : 10000, "reduce" : 909, "output" : 101 }, "ok" : 1 }
A peek at the logs on the secondary confirms that the job indeed ran on the secondary.
... 2016-12-09T08:17:24.842+0000 D COMMAND [conn344] mr ns: test.txns 2016-12-09T08:17:24.843+0000 I COMMAND [conn344] command test.$cmd command: listCollections { listCollections: 1, filter: { name: "txns" }, cursor: {} } keyUpdates:0 writeConflicts:0 numYields:0 reslen:150 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 1, R: 1 } }, Collection: { acquireCount: { r: 1 } } } protocol:op_query 0ms 2016-12-09T08:17:24.865+0000 I COMMAND [conn344] query test.system.js planSummary: EOF ntoreturn:0 ntoskip:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:0 nreturned:0 reslen:20 locks:{ Global: { acquireCount: { r: 6 } }, Database: { acquireCount: { r: 2, R: 1 } }, Collection: { acquireCount: { r: 2 } } } 0ms 2016-12-09T08:17:25.063+0000 I COMMAND [conn344] command test.txns command: mapReduce { mapreduce: "txns", map: function () { emit(this.custid, this.txnval); }, reduce: function (key, values) { return Array.sum(values); }, out: { inline: 1.0 } } planSummary: COUNT keyUpdates:0 writeConflicts:0 numYields:78 reslen:4233 locks:{ Global: { acquireCount: { r: 366 } }, Database: { acquireCount: { r: 3, R: 180 } }, Collection: { acquireCount: { r: 3 } } } protocol:op_command 220ms ...
Java
Now let’s try to run a Map-Reduce job on the read replicas from a Java application. On the MongoDB Java driver, setting the read Preference does the trick. The output is inline by default so no additional parameters need to be passed. Here’s an example using driver version 3.2.2:
public class MapReduceExample { private static final String MONGO_END_POINT = "mongodb://admin:pwd@server-1.servers.example.com:27017,server-2.servers.example.com:27017/admin?replicaSet=RS-replica-0"; private static final String COL_NAME = "txns"; private static final String DEF_DB = "test"; public MapReduceExample() { } public static void main(String[] args) { MapReduceExample writer = new MapReduceExample(); writer.mapReduce(); } public static final String mapfunction = "function() { emit(this.custid, this.txnval); }"; public static final String reducefunction = "function(key, values) { return Array.sum(values); }"; private void mapReduce() { printer("Initializing..."); Builder options = MongoClientOptions.builder().readPreference(ReadPreference.secondary()); MongoClientURI uri = new MongoClientURI(MONGO_END_POINT, options); MongoClient client = new MongoClient(uri); MongoDatabase database = client.getDatabase(DEF_DB); MongoCollection collection = database.getCollection(COL_NAME); MapReduceIterable iterable = collection.mapReduce(mapfunction, reducefunction); // inline by default MongoCursor cursor = iterable.iterator(); while (cursor.hasNext()) { Document result = cursor.next(); printer("Customer: " + result.getString("_id") + ", Total Txn value: " + result.getDouble("value")); } printer("Done..."); } ... }
As evident from the logs, the job ran on the secondary:
... 2016-12-09T08:32:31.419+0000 D COMMAND [conn371] mr ns: test.txns 2016-12-09T08:32:31.420+0000 I COMMAND [conn371] command test.$cmd command: listCollections { listCollections: 1, filter: { name: "txns" }, cursor: {} } keyUpdates:0 writeConflicts:0 numYields:0 reslen:150 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 1, R: 1 } }, Collection: { acquireCount: { r: 1 } } } protocol:op_query 0ms 2016-12-09T08:32:31.444+0000 I COMMAND [conn371] query test.system.js planSummary: EOF ntoreturn:0 ntoskip:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:0 nreturned:0 reslen:20 locks:{ Global: { acquireCount: { r: 6 } }, Database: { acquireCount: { r: 2, R: 1 } }, Collection: { acquireCount: { r: 2 } } } 0ms 2016-12-09T08:32:31.890+0000 I COMMAND [conn371] command test.txns command: mapReduce { mapreduce: "txns", map: function() { emit(this.custid, this.txnval); }, reduce: function(key, values) { return Array.sum(values); }, out: { inline: 1 }, query: null, sort: null, finalize: null, scope: null, verbose: true } planSummary: COUNT keyUpdates:0 writeConflicts:0 numYields:156 reslen:4331 locks:{ Global: { acquireCount: { r: 722 } }, Database: { acquireCount: { r: 3, R: 358 } }, Collection: { acquireCount: { r: 3 } } } protocol:op_query 470ms ...
MongoDB Map-Reduce on Sharded clusters
MongoDB supports Map-Reduce on sharded clusters, both when a sharded collection is the input and when it is the output of a Map-Reduce job. However, MongoDB currently doesn’t support running map-reduce jobs on secondaries of a sharded cluster. So even if the out option is set to inline, Map-Reduce jobs will always run on the primaries of a sharded cluster. This issue is being tracked through this JIRA bug.
The syntax of executing a Map-Reduce job on a sharded cluster is same as that on a replica set. So the examples provided in the above section hold. If the above Java example is run on a sharded cluster, log messages appear on the primaries indicating that the command ran there.
... 2016-11-24T08:46:30.828+0000 I COMMAND [conn357] command test.$cmd command: mapreduce.shardedfinish { mapreduce.shardedfinish: { mapreduce: "txns", map: function() { emit(this.custid, this.txnval); }, reduce: function(key, values) { return Array.sum(values); }, out: { in line: 1 }, query: null, sort: null, finalize: null, scope: null, verbose: true, $queryOptions: { $readPreference: { mode: "secondary" } } }, inputDB: "test", shardedOutputCollection: "tmp.mrs.txns_1479977190_0", shards: { Shard-0/primary.shard0.example.com:27017,secondary.shard0.example.com:27017: { result: "tmp.mrs.txns_1479977190_0", timeMillis: 123, timing: { mapTime: 51, emitLoop: 116, reduceTime: 9, mode: "mixed", total: 123 }, counts: { input: 9474, emit: 9474, reduce: 909, output: 101 }, ok: 1.0, $gleS tats: { lastOpTime: Timestamp 1479977190000|103, electionId: ObjectId('7fffffff0000000000000001') } }, Shard-1/primary.shard1.example.com:27017,secondary.shard1.example.com:27017: { result: "tmp.mrs.txns_1479977190_0", timeMillis: 71, timing: { mapTime: 8, emitLoop: 63, reduceTime: 4, mode: "mixed", total: 71 }, counts: { input: 1526, emit: 1526, reduce: 197, output: 101 }, ok: 1.0, $gleStats: { lastOpTime: Timestamp 1479977190000|103, electionId: ObjectId('7fffffff0000000000000001') } } }, shardCounts: { Sha rd-0/primary.shard0.example.com:27017,secondary.shard0.example.com:27017: { input: 9474, emit: 9474, reduce: 909, output: 101 }, Shard-1/primary.shard1.example.com:27017,secondary.shard1.example.com:27017: { inpu t: 1526, emit: 1526, reduce: 197, output: 101 } }, counts: { emit: 11000, input: 11000, output: 202, reduce: 1106 } } keyUpdates:0 writeConflicts:0 numYields:0 reslen:4368 locks:{ Global: { acquireCount: { r: 2 } }, Database: { acquireCount: { r: 1 } }, Collection: { acqu ireCount: { r: 1 } } } protocol:op_command 115ms 2016-11-24T08:46:30.830+0000 I COMMAND [conn46] CMD: drop test.tmp.mrs.txns_1479977190_0 ...
Please visit our MongoDB product page to find out about our extensive feature list.