开发者

In MongoDB mapreduce, how can I flatten the values object?

I'm trying to use MongoDB to analyse Apache log files. I've created a receipts collection from the Apache access logs. Here's an abridged summary of what my models look like:

db.receipts.findOne()
{
    "_id" : ObjectId("4e57908c7a044a30dc03a888"),
    "path" : "/videos/1/show_invisibles.m4v",
    "issued_at" : ISODate("2011-04-08T00:00:00Z"),
    "status" : "200"
}

I've written a MapReduce function that groups all data by the issued_at date field. It summarizes the total number of requests, and provides a breakdown of the number of requests for each unique path. Here's an example of what the output looks like:

db.daily_hits_by_path.findOne()
{
    "_id" : ISODate("2011-04-08T00:00:00Z"),
    "value" : {
        "count" : 6,
        "paths" : {
            "/videos/1/show_invisibles.m4v" : {
                "count" : 2
            },
            "/videos/1/show_invisibles.ogv" : {
                "count"开发者_如何学Python : 3
            },
            "/videos/6/buffers_listed_and_hidden.ogv" : {
                "count" : 1
            }
        }
    }
}

How can I make the output look like this instead:

{
    "_id" : ISODate("2011-04-08T00:00:00Z"),
    "count" : 6,
    "paths" : {
        "/videos/1/show_invisibles.m4v" : {
            "count" : 2
        },
        "/videos/1/show_invisibles.ogv" : {
            "count" : 3
        },
        "/videos/6/buffers_listed_and_hidden.ogv" : {
            "count" : 1
        }
    }
}


It's not currently possible, but I would suggest voting for this case: https://jira.mongodb.org/browse/SERVER-2517.


Taking the best from previous answers and comments:

db.items.find().hint({_id: 1}).forEach(function(item) {
    db.items.update({_id: item._id}, item.value);
});

From http://docs.mongodb.org/manual/core/update/#replace-existing-document-with-new-document
"If the update argument contains only field and value pairs, the update() method replaces the existing document with the document in the update argument, except for the _id field."

So you need neither to $unset value, nor to list each field.

From https://docs.mongodb.com/manual/core/read-isolation-consistency-recency/#cursor-snapshot "MongoDB cursors can return the same document more than once in some situations. ... use a unique index on this field or these fields so that the query will return each document no more than once. Query with hint() to explicitly force the query to use that index."


AFAIK, by design Mongo's map reduce will spit results out in "value tuples" and I haven't seen anything that will configure that "output format". Maybe the finalize() method can be used.

You could try running a post-process that will reshape the data using

results.find({}).forEach( function(result) {
  results.update({_id: result._id}, {count: result.value.count, paths: result.value.paths})
});

Yep, that looks ugly. I know.


You can do Dan's code with a collection reference:

    function clean(collection) { 
      collection.find().forEach( function(result) {
      var value = result.value;
      delete value._id;     
      collection.update({_id: result._id}, value);     
      collection.update({_id: result.id}, {$unset: {value: 1}} ) } )};


A similar approach to that of @ljonas but no need to hardcode document fields:

db.results.find().forEach( function(result) {
    var value = result.value;
    delete value._id;
    db.results.update({_id: result._id}, value);
    db.results.update({_id: result.id}, {$unset: {value: 1}} )
} );


All the proposed solutions are far from optimal. The fastest you can do so far is something like:

var flattenMRCollection=function(dbName,collectionName) {
    var collection=db.getSiblingDB(dbName)[collectionName];

    var i=0;
    var bulk=collection.initializeUnorderedBulkOp();
    collection.find({ value: { $exists: true } }).addOption(16).forEach(function(result) {
        print((++i));
        //collection.update({_id: result._id},result.value);

        bulk.find({_id: result._id}).replaceOne(result.value);

        if(i%1000==0)
        {
            print("Executing bulk...");
            bulk.execute();
            bulk=collection.initializeUnorderedBulkOp();
        }
    });
    bulk.execute();
};

Then call it: flattenMRCollection("MyDB","MyMRCollection")

This is WAY faster than doing sequential updates.


While experimenting with Vincent's answer, I found a couple of problems. Basically, if you perform updates within a foreach loop, this will move the document to the end of the collection and the cursor will reach that document again (example). This can be circumvented if $snapshot is used. Hence, I am providing a Java example below.

final List<WriteModel<Document>> bulkUpdate = new ArrayList<>();

// You should enable $snapshot if performing updates within foreach
collection.find(new Document().append("$query", new Document()).append("$snapshot", true)).forEach(new Block<Document>() {
    @Override
    public void apply(final Document document) {
        // Note that I used incrementing long values for '_id'. Change to String if
        // you used string '_id's
        long docId = document.getLong("_id");
        Document subDoc = (Document)document.get("value");
        WriteModel<Document> m = new ReplaceOneModel<>(new Document().append("_id", docId), subDoc);
        bulkUpdate.add(m);

        // If you used non-incrementing '_id's, then you need to use a final object with a counter.
        if(docId % 1000 == 0 && !bulkUpdate.isEmpty()) {
            collection.bulkWrite(bulkUpdate);
            bulkUpdate.removeAll(bulkUpdate);
        }
    }
});
// Fixing bug related to Vincent's answer.
if(!bulkUpdate.isEmpty()) {
    collection.bulkWrite(bulkUpdate);
    bulkUpdate.removeAll(bulkUpdate);
}

Note : This snippet takes an average of 7.4 seconds to execute on my machine with 100k records and 14 attributes (IMDB dataset). Without batching, it takes an average of 25.2 seconds.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜