MongoDB® Performance: Running MongoDB Map-Reduce Operations On Secondaries

6 min read
MongoDB® Performance: Running MongoDB Map-Reduce Operations On Secondaries

SHARE THIS ARTICLE

Map-reduce is perhaps the most versatile of the aggregation operations that MongoDB supports.

Map-Reduce is a popular programming model that originated at Google for processing and aggregating large volumes of data in parallel.  A detailed discussion on Map-Reduce is out of the scope of this article but essentially it is a multi-step aggregation process. The most important two steps are the map stage (process each document and emit results) and the reduce stage (collates results emitted during the map stage).

MongoDB supports three kinds of aggregation operations: Map-Reduce, aggregation pipeline and single purpose aggregation commands. You can use this MongoDB comparison document to see which fits your needs.https://scalegrid.io/blog/mongodb-performance-running-mongodb-map-reduce-operations-on-secondaries/

In my last post, we saw, with examples, how to run Aggregation pipelines on secondaries. In this post, we will walk through running Map-Reduce jobs on the MongoDB secondary replicas.

MongoDB Map-Reduce

MongoDB supports running Map-Reduce jobs on the database servers. This offers the flexibility to write complex aggregation tasks that aren’t as easily done via aggregation pipelines. MongoDB lets you write custom map and reduce functions in Javascript that can be passed to the database via Mongo shell or any other client. On large and constantly growing data sets, one can even consider running incremental Map-Reduce jobs to avoid processing older data every time.

Historically, the map and the reduce methods used to be executed in a single-threaded context. However, that limitation was removed in version 2.4.

Why run Map-Reduce jobs on the Secondary?

Like other aggregation jobs, Map-Reduce too is a resource intensive ‘batch’ job so it is a good fit for running on read-only replicas. The caveats in doing so are:

1) It should be ok to use slightly stale data. Or you can tweak the write concern to ensure replicas are always in sync with the primary. This second option assumes that taking a hit on the write performance is acceptable.

2) The output of the Map-Reduce job shouldn’t be written to another collection within the database but rather be returned to the application (i.e. no writes to the database).

Let’s look at how to do this via examples, both from the mongo shell and the Java driver.

Map-Reduce on Replica Sets

Data Set

For illustration, we will use a rather simple data set: A daily transaction record dump from a retailer. A sample entry looks like:

RS-replica-0:PRIMARY> use test
switched to db test
RS-replica-0:PRIMARY> show tables
txns
RS-replica-0:PRIMARY> db.txns.findOne()
{
    "_id" : ObjectId("584a3b71cdc1cb061957289b"),
    "custid" : "cust_66",
    "txnval" : 100,
    "items" : [{"sku": sku1", "qty": 1, "pr": 100}, ...],
...
}

In our examples, we will calculate the total expenditure of a given customer on that day. Thus, given our schema, the map and reduce methods will look like:

var mapFunction = function() { emit(this.custid, this.txnval); } // Emit the custid and txn value from each record
var reduceFunction = function(key, values) { return Array.sum(values); } // Sum all the txn values for a given custid

With our schema established, let’s look at Map-Reduce in action.

MongoDB Shell

In order to ensure that a Map-Reduce job is executed on the secondary, the read preference should be set to secondary. Like we said above, in order for a Map-Reduce to run on a secondary, the output of the result must be inline (In fact, that’s is the only out value allowed on secondaries). Let’s see how it works.

$ mongo -u admin -p pwd --authenticationDatabase admin --host RS-replica-0/server-1.servers.example.com:27017,server-2.servers.example.com:27017
MongoDB shell version: 3.2.10
connecting to: RS-replica-0/server-1.servers.example.com:27017,server-2.servers.example.com:27017/test
2016-12-09T08:15:19.347+0000 I NETWORK  [thread1] Starting new replica set monitor for server-1.servers.example.com:27017,server-2.servers.example.com:27017
2016-12-09T08:15:19.349+0000 I NETWORK  [ReplicaSetMonitorWatcher] starting
RS-replica-0:PRIMARY> db.setSlaveOk()
RS-replica-0:PRIMARY> db.getMongo().setReadPref('secondary')
RS-replica-0:PRIMARY> db.getMongo().getReadPrefMode()
secondary
RS-replica-0:PRIMARY> var mapFunc = function() { emit(this.custid, this.txnval); }
RS-replica-0:PRIMARY> var reduceFunc = function(key, values) { return Array.sum(values); }
RS-replica-0:PRIMARY> db.txns.mapReduce(mapFunc, reduceFunc, {out: { inline: 1 }})
{
    "results" : [
        {
            "_id" : "cust_0",
            "value" : 72734
        },
        {
            "_id" : "cust_1",
            "value" : 67737
        },
...
    ]
    "timeMillis" : 215,
    "counts" : {
        "input" : 10000,
        "emit" : 10000,
        "reduce" : 909,
        "output" : 101
    },
    "ok" : 1

}

A peek at the logs on the secondary confirms that the job indeed ran on the secondary.

...
2016-12-09T08:17:24.842+0000 D COMMAND  [conn344] mr ns: test.txns
2016-12-09T08:17:24.843+0000 I COMMAND  [conn344] command test.$cmd command: listCollections { listCollections: 1, filter: { name: "txns" }, cursor: {} } keyUpdates:0 writeConflicts:0 numYields:0 reslen:150 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 1, R: 1 } }, Collection: { acquireCount: { r: 1 } } } protocol:op_query 0ms
2016-12-09T08:17:24.865+0000 I COMMAND  [conn344] query test.system.js planSummary: EOF ntoreturn:0 ntoskip:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:0 nreturned:0 reslen:20 locks:{ Global: { acquireCount: { r: 6 } }, Database: { acquireCount: { r: 2, R: 1 } }, Collection: { acquireCount: { r: 2 } } } 0ms
2016-12-09T08:17:25.063+0000 I COMMAND  [conn344] command test.txns command: mapReduce { mapreduce: "txns", map: function () { emit(this.custid, this.txnval); }, reduce: function (key, values) { return Array.sum(values); }, out: { inline: 1.0 } } planSummary: COUNT keyUpdates:0 writeConflicts:0 numYields:78 reslen:4233 locks:{ Global: { acquireCount: { r: 366 } }, Database: { acquireCount: { r: 3, R: 180 } }, Collection: { acquireCount: { r: 3 } } } protocol:op_command 220ms
...

Java

Now let’s try to run a Map-Reduce job on the read replicas from a Java application. On the MongoDB Java driver, setting the read Preference does the trick. The output is inline by default so no additional parameters need to be passed. Here’s an example using driver version 3.2.2:

public class MapReduceExample {

    private static final String MONGO_END_POINT = "mongodb://admin:pwd@server-1.servers.example.com:27017,server-2.servers.example.com:27017/admin?replicaSet=RS-replica-0";
    private static final String COL_NAME = "txns";
    private static final String DEF_DB = "test";

    public MapReduceExample() { }

    public static void main(String[] args) {
        MapReduceExample writer = new MapReduceExample();
        writer.mapReduce();
    }

    public static final String mapfunction = "function() { emit(this.custid, this.txnval); }";
    public static final String reducefunction = "function(key, values) { return Array.sum(values); }";

    private void mapReduce() {
        printer("Initializing...");
        Builder options = MongoClientOptions.builder().readPreference(ReadPreference.secondary());
        MongoClientURI uri = new MongoClientURI(MONGO_END_POINT, options);
        MongoClient client = new MongoClient(uri);
        MongoDatabase database = client.getDatabase(DEF_DB);
        MongoCollection collection = database.getCollection(COL_NAME);
        MapReduceIterable iterable = collection.mapReduce(mapfunction, reducefunction); // inline by default
        MongoCursor cursor = iterable.iterator();
        while (cursor.hasNext()) {
           Document result = cursor.next();
           printer("Customer: " + result.getString("_id") + ", Total Txn value: " + result.getDouble("value"));
        }
        printer("Done...");
    }
...
}

As evident from the logs, the job ran on the secondary:

...
2016-12-09T08:32:31.419+0000 D COMMAND  [conn371] mr ns: test.txns
2016-12-09T08:32:31.420+0000 I COMMAND  [conn371] command test.$cmd command: listCollections { listCollections: 1, filter: { name: "txns" }, cursor: {} } keyUpdates:0 writeConflicts:0 numYields:0 reslen:150 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 1, R: 1 } }, Collection: { acquireCount: { r: 1 } } } protocol:op_query 0ms
2016-12-09T08:32:31.444+0000 I COMMAND  [conn371] query test.system.js planSummary: EOF ntoreturn:0 ntoskip:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:0 nreturned:0 reslen:20 locks:{ Global: { acquireCount: { r: 6 } }, Database: { acquireCount: { r: 2, R: 1 } }, Collection: { acquireCount: { r: 2 } } } 0ms
2016-12-09T08:32:31.890+0000 I COMMAND  [conn371] command test.txns command: mapReduce { mapreduce: "txns", map: function() { emit(this.custid, this.txnval); }, reduce: function(key, values) { return Array.sum(values); }, out: { inline: 1 }, query: null, sort: null, finalize: null, scope: null, verbose: true } planSummary: COUNT keyUpdates:0 writeConflicts:0 numYields:156 reslen:4331 locks:{ Global: { acquireCount: { r: 722 } }, Database: { acquireCount: { r: 3, R: 358 } }, Collection: { acquireCount: { r: 3 } } } protocol:op_query 470ms
...

MongoDB Map-Reduce on Sharded clusters

MongoDB supports Map-Reduce on sharded clusters, both when a sharded collection is the input and when it is the output of a Map-Reduce job. However, MongoDB currently doesn’t support running map-reduce jobs on secondaries of a sharded cluster. So even if the out option is set to inline, Map-Reduce jobs will always run on the primaries of a sharded cluster. This issue is being tracked through this JIRA bug.

The syntax of executing a Map-Reduce job on a sharded cluster is same as that on a replica set. So the examples provided in the above section hold. If the above Java example is run on a sharded cluster, log messages appear on the primaries indicating that the command ran there.

...
2016-11-24T08:46:30.828+0000 I COMMAND  [conn357] command test.$cmd command: mapreduce.shardedfinish { mapreduce.shardedfinish: { mapreduce: "txns", map: function() { emit(this.custid, this.txnval); }, reduce: function(key, values) { return Array.sum(values); }, out: { in
line: 1 }, query: null, sort: null, finalize: null, scope: null, verbose: true, $queryOptions: { $readPreference: { mode: "secondary" } } }, inputDB: "test", shardedOutputCollection: "tmp.mrs.txns_1479977190_0", shards: { Shard-0/primary.shard0.example.com:27017,secondary.shard0.example.com:27017: { result: "tmp.mrs.txns_1479977190_0", timeMillis: 123, timing: { mapTime: 51, emitLoop: 116, reduceTime: 9, mode: "mixed", total: 123 }, counts: { input: 9474, emit: 9474, reduce: 909, output: 101 }, ok: 1.0, $gleS
tats: { lastOpTime: Timestamp 1479977190000|103, electionId: ObjectId('7fffffff0000000000000001') } }, Shard-1/primary.shard1.example.com:27017,secondary.shard1.example.com:27017: { result: "tmp.mrs.txns_1479977190_0", timeMillis: 71, timing:
 { mapTime: 8, emitLoop: 63, reduceTime: 4, mode: "mixed", total: 71 }, counts: { input: 1526, emit: 1526, reduce: 197, output: 101 }, ok: 1.0, $gleStats: { lastOpTime: Timestamp 1479977190000|103, electionId: ObjectId('7fffffff0000000000000001') } } }, shardCounts: { Sha
rd-0/primary.shard0.example.com:27017,secondary.shard0.example.com:27017: { input: 9474, emit: 9474, reduce: 909, output: 101 }, Shard-1/primary.shard1.example.com:27017,secondary.shard1.example.com:27017: { inpu
t: 1526, emit: 1526, reduce: 197, output: 101 } }, counts: { emit: 11000, input: 11000, output: 202, reduce: 1106 } } keyUpdates:0 writeConflicts:0 numYields:0 reslen:4368 locks:{ Global: { acquireCount: { r: 2 } }, Database: { acquireCount: { r: 1 } }, Collection: { acqu
ireCount: { r: 1 } } } protocol:op_command 115ms
2016-11-24T08:46:30.830+0000 I COMMAND  [conn46] CMD: drop test.tmp.mrs.txns_1479977190_0
...

Please visit our MongoDB product page to find out about our extensive feature list.

For more information, please visit www.scalegrid.io. Connect with ScaleGrid on LinkedIn, X, Facebook, and YouTube.
Table of Contents

Stay Ahead with ScaleGrid Insights

Dive into the world of database management with our monthly newsletter. Get expert tips, in-depth articles, and the latest news, directly to your inbox.

Related Posts

blog-feature-img_whats-new-at-scalegrid

What’s New at ScaleGrid – September 2024

At ScaleGrid, we’re always pushing the boundaries to offer more flexibility and scalability to our customers. Over the past few...

Managing PostgreSQL® High Availability – Part I: PostgreSQL Automatic Failover

Managing High Availability (HA) in your PostgreSQL hosting is very important to ensuring your database deployment clusters maintain exceptional uptime...

RabbitMQ Security Compliance - ScaleGrid

RabbitMQ Security and Compliance

Follow fundamental procedures in authentication, encryption, and commitment to RabbitMQ security protocols to protect your RabbitMQ system and secure messages....

NEWS

Add Headline Here