# -*- coding: utf-8 -*- from mrjob.job import MRJob class SalesRanker(MRJob): def within_past_week(self, timestamp): """Return True if timestamp is within past week, False otherwise.""" ... def mapper(self, _ line): """Parse each log line, extract and transform relevant lines. Emit key value pairs of the form: (foo, p1), 2 (bar, p1), 2 (bar, p1), 1 (foo, p2), 3 (bar, p3), 10 (foo, p4), 1 """ timestamp, product_id, category, quantity = line.split('\t') if self.within_past_week(timestamp): yield (category, product_id), quantity def reducer(self, key, value): """Sum values for each key. (foo, p1), 2 (bar, p1), 3 (foo, p2), 3 (bar, p3), 10 (foo, p4), 1 """ yield key, sum(values) def mapper_sort(self, key, value): """Construct key to ensure proper sorting. Transform key and value to the form: (foo, 2), p1 (bar, 3), p1 (foo, 3), p2 (bar, 10), p3 (foo, 1), p4 The shuffle/sort step of MapReduce will then do a distributed sort on the keys, resulting in: (category1, 1), product4 (category1, 2), product1 (category1, 3), product2 (category2, 3), product1 (category2, 7), product3 """ category, product_id = key quantity = value yield (category, quantity), product_id def reducer_identity(self, key, value): yield key, value def steps(self): """Run the map and reduce steps.""" return [ self.mr(mapper=self.mapper, reducer=self.reducer), self.mr(mapper=self.mapper_sort, reducer=self.reducer_identity), ] if __name__ == '__main__': HitCounts.run()