# -*- coding: utf-8 -*- from mrjob.job import MRJob class HitCounts(MRJob): def extract_url(self, line): """Extract the generated url from the log line.""" pass def extract_year_month(self, line): """Return the year and month portions of the timestamp.""" pass def mapper(self, _, line): """Parse each log line, extract and transform relevant lines. Emit key value pairs of the form: (2016-01, url0), 1 (2016-01, url0), 1 (2016-01, url1), 1 """ url = self.extract_url(line) period = self.extract_year_month(line) yield (period, url), 1 def reducer(self, key, values): """Sum values for each key. (2016-01, url0), 2 (2016-01, url1), 1 """ yield key, sum(values) def steps(self): """Run the map and reduce steps.""" return [ self.mr(mapper=self.mapper, reducer=self.reducer) ] if __name__ == '__main__': HitCounts.run()