first_imgIn Pt1 of this blog post I looked at a SQL Query and data set to run in Hadoop and in Pt2 wrote the Map function to extract the relevant fields from the data set to satisfy the query. At this point however we still have not implemented any of the aggregate functions and still have a large key and value intermediate data set. The only data eliminated so far has been the lines examined where the date was not less than or equal to 11-AUG-98. On the test data set out of the initial 600037902 lines of data we now have 586996074 lines remaining, to complete the query we now need to write the reduce phase. The Reduce method will extend the Reducer class. This needs to accept the intermediate key value pairs output by the mapper and therefore will receive as input the key which is fields 9 and 10  concatenated and the DoubleArrayWritable containing the values. For every key we need to iterate through the values and calcuate the totals required for the SUM(), AVG() and COUNT() functions. Once these have been calculated we can format the output as text to be written to a file that will give us exactly the same result as if the query had been processed by a relational database. This reduce phase will look something as follows by simply adding all of the values in the array for the SUM() functions and then dividing by the COUNT() value to calculate the result of the AVG() functions.nfor (DoubleArrayWritable val : values) {x = (DoubleWritable[]) val.toArray();sum_qty += x[0].get();sum_base_price += x[1].get();sum_discount += x[2].get();count_star += x[3].get();sum_disc_price += x[4].get();sum_charge += x[5].get();        }avg_qty = sum_qty/count_star;avg_price = sum_base_price/count_star;avg_disc = sum_discount/count_star;/* Format and collect the output */Text tpchq1redval = new Text(” “+sum_qty+” “+sum_base_price+” “+sum_disc_price+” “+sum_charge+” “+avg_qty+” “+avg_price+” “+avg_disc+” “+count_star);       context.write(key, tpchq1redval);       }  }nCoupled with the Map phase and a Job Control section (this will be covered in the next post on running the job) this Job is now ready to run. However as we have noted previously just for our 100GB data set the map phase will output over 58 million lines of data which will involve a lot of network traffic and disk writes. We can make this more efficient by writing a Combiner.The Combiner also extends the Reducer and in simple cases but not all (as we will cover in a moment) can be exactly the same as the Reducer. The aim of the combiner is to perform a Reducer type operation on the subset of data produced by each Mapper which will then minimise the amount of data that needs to be transferred throughout the cluster from Map to Reduce. The single most important thing about the Combiner is that there is no certainty that it will run. It is available as an optimization but for a particular Map output it might not run at all and there is no way to force it to run. From a development perspective this has important consequences, you should be able to comment out the line in the Job Control section that calls the Combiner and the result produced by the MapReduce Job stays exactly the same. Additionally the input fields for the Combiner must be exactly the same as expected by the Reducer to operate on the Map output and the Combiner output must also correspond to the input expected by the Reducer.  If you Combiner does not adhere to these restrictions your job may compile and run and you will not receive an error, however if not implemented correctly your results may change on each run from additional factors such as changing the imput block size. Finally the Combiner operation must be both commutative and associative. In other words the Combiner operation must ensure that both changing the order of the operands as well as the grouping of the operations you perform does not change the result. In our example the SUM() function is both commutative and associative, the numbers can be summed in any order and we can perform the sum operation on different groups and the result will always remain the same. AVG() on the other hand is commutative but not associative. We can calculate the average with the input data in any order, however we cannot take an average of smaller groups of values and then take the average of this intermediate data and expect the result to be the same. For this reason the Combiner can perform the SUM() operation but not the AVG() and can look as follows producing the intermediate sum values only for the Reducer.nfor (DoubleArrayWritable val : values) { x = (DoubleWritable[]) val.toArray();sum_qty += x[0].get();sum_base_price += x[1].get();sum_discount += x[2].get();count_star += x[3].get();sum_disc_price += x[4].get();sum_charge += x[5].get();  }outArray[0] = new DoubleWritable(sum_qty); outArray[1] = new DoubleWritable(sum_base_price); outArray[2] = new DoubleWritable(sum_discount); outArray[3] = new DoubleWritable(count_star); outArray[4] = new DoubleWritable(sum_disc_price);outArray[5] = new DoubleWritable(sum_charge);DoubleArrayWritable da = new DoubleArrayWritable();da.set(outArray);context.write(key, da);     }  nAt this stage we have written the Mapper, Reducer and Combiner and in Pt4 will look at adding the Job Control section to produce the completed MapReduce job. We will then consider compiling and running the job and tuning for performance.last_img read more