开发者

Iterate twice on values (MapReduce)

I receive an iterator as argument and I would like to iterate on values twice.

public void reduce(Pair<String,String> key, Iterator<IntWritable> values,
                   Context context)

Is it possible ? How ? The signature is imposed by the framework I am using (namely Hadoop).

-- edit --

Finally the real signature of the reduce method 开发者_开发问答is with an iterable. I was misled by this wiki page (which is actually the only non-deprecated (but wrong) example of wordcount I found).


Unfortunately this is not possible without caching the values as in Andreas_D's answer.

Even using the new API, where the Reducer receives an Iterable rather than an Iterator, you cannot iterate twice. It's very tempting to try something like:

for (IntWritable value : values) {
    // first loop
}

for (IntWritable value : values) {
    // second loop
}

But this won't actually work. The Iterator you receive from that Iterable's iterator() method is special. The values may not all be in memory; Hadoop may be streaming them from disk. They aren't really backed by a Collection, so it's nontrivial to allow multiple iterations.

You can see this for yourself in the Reducer and ReduceContext code.

Caching the values in a Collection of some sort may be the easiest answer, but you can easily blow the heap if you are operating on large datasets. If you can give us more specifics on your problem, we may be able to help you find a solution that doesn't involve multiple iterations.


We have to cache the values from the iterator if you want to iterate again. At least we can combine the first iteration and the caching:

Iterator<IntWritable> it = getIterator();
List<IntWritable> cache = new ArrayList<IntWritable>();

// first loop and caching
while (it.hasNext()) {
   IntWritable value = it.next();
   doSomethingWithValue();
   cache.add(value);
}

// second loop
for(IntWritable value:cache) {
   doSomethingElseThatCantBeDoneInFirstLoop(value);
}

(just to add an answer with code, knowing that you mentioned this solution in your own comment ;) )


why it's impossible without caching: an Iterator is something that implements an interface and there is not a single requirement, that the Iterator object actually stores values. Do iterate twice you either have to reset the iterator (not possible) or clone it (again: not possible).

To give an example for an iterator where cloning/resetting wouldn't make any sense:

public class Randoms implements Iterator<Double> {

  private int counter = 10;

  @Override 
  public boolean hasNext() { 
     return counter > 0; 
  }

  @Override 
  public boolean next() { 
     count--;
     return Math.random();        
  }      

  @Override 
  public boolean remove() { 
     throw new UnsupportedOperationException("delete not supported"); 
  }
}


Reusing the given iterator, no.

But you can save the values in an ArrayList when iterating through them in the first place and then iterating upon the constructed ArrayList, of course (or you can build it directly in the first place by using some fancy Collection methods and then iterating directly on the ArrayList twice. It's a matter of tastes).

Anyway, are you sure passing an Iterator is a good thing in the first place? Iterators are used to do just a linear scan through the collection, this is why they don't expose a "rewind" method.

You should pass something different, like a Collection<T> or an Iterable<T>, as already suggested in a different answer.


Iterators are one-traversal-only. Some iterator types are cloneable, and you might be able to clone it before traversing, but this isn't the general case.

You should make your function take an Iterable instead, if you can achieve that at all.


If method signature cannot be changed then I would suggest using Apache Commons IteratorUtils to convert Iterator to ListIterator. Consider this example method for iterating twice on values:

void iterateTwice(Iterator<String> it) {
    ListIterator<?> lit = IteratorUtils.toListIterator(it);
    System.out.println("Using ListIterator 1st pass");
    while(lit.hasNext())
        System.out.println(lit.next());

    // move the list iterator back to start
    while(lit.hasPrevious())
        lit.previous();

    System.out.println("Using ListIterator 2nd pass");
    while(lit.hasNext())
        System.out.println(lit.next());
}

Using code like above I was able to iterate over the list of values without saving a copy of List elements in my code.


If we are trying to iterate twice in Reducer as below

ListIterator<DoubleWritable> lit = IteratorUtils.toListIterator(it);
System.out.println("Using ListIterator 1st pass");
while(lit.hasNext())
    System.out.println(lit.next());

// move the list iterator back to start
while(lit.hasPrevious())
    lit.previous();

System.out.println("Using ListIterator 2nd pass");
while(lit.hasNext())
    System.out.println(lit.next());

We will only output as

Using ListIterator 1st pass
5.3
4.9
5.3
4.6
4.6
Using ListIterator 2nd pass
5.3
5.3
5.3
5.3
5.3

Inorder to get it in the right way we should loop like this:

ArrayList<DoubleWritable> cache = new ArrayList<DoubleWritable>();
 for (DoubleWritable aNum : values) {
    System.out.println("first iteration: " + aNum);
    DoubleWritable writable = new DoubleWritable();
    writable.set(aNum.get());
    cache.add(writable);
 }
 int size = cache.size();
 for (int i = 0; i < size; ++i) {
     System.out.println("second iteration: " + cache.get(i));
  }

Output

first iteration: 5.3
first iteration: 4.9
first iteration: 5.3
first iteration: 4.6
first iteration: 4.6
second iteration: 5.3
second iteration: 4.9
second iteration: 5.3
second iteration: 4.6
second iteration: 4.6


you can do that

MarkableIterator<Text> mitr = new MarkableIterator<Text>(values.iterator());
mitr.mark();
while (mitr.hasNext()) 
{
//do your work
}
mitr.reset();
while(mitr.hasNext()) 
{
//again do your work
}
  1. Reference Link 2

  2. Reference Link 2


Notice: if you use the cache list to cache the item, you should clone the item first then add to the cache. Otherwise you will find all the item is the same in the cache.

The situation is caused by the memory optimization of the MapReduce, In the reduce method, the Iterable reuse the item instance, for more detail can find here


Try this:

    ListIterator it = list.listIterator();

    while(it.hasNext()){

        while(it.hasNext()){
            System.out.println("back " + it.next() +" "); 
        }
        while(it.hasPrevious()){
            it.previous();
        }
    }


if you want to change values as you go, i guess it's better to use listIterator then use its set() method.

ListIterator lit = list.listIterator();
while(lit.hasNext()){
   String elem = (String) lit.next();
   System.out.println(elem);
   lit.set(elem+" modified");
}
lit = null; 
lit = list.listIterator();
while(lit.hasNext()){
   System.out.println(lit.next());
}

Instead of calling .previous(), I just get another instance of the .listIterator() on the same list iterator object.


After searching and doing so many tries and errors, I found a solution.

  1. Declare a new collection (say cache) (linked list or Arraylist or any else)

  2. Inside first iteration, assign the current iterator like below example:

    cache.add(new Text(current.get()))  
    
  3. Iterate through cache:

    for (Text count : counts) {
        //counts is iterable object of Type Text
        cache.add(new Text(count.getBytes()));
    }
    for(Text value:cache) {
        // your logic..
    }
    
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜