歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
Linux教程網 >> Linux編程 >> Linux編程 >> Python 寫的Hadoop小程序

Python 寫的Hadoop小程序

日期:2017/3/1 10:15:22   编辑:Linux編程

該程序是在python2.3上完成的,python版本間有差異。

Mapper:

import sys

line_number = 0
tab_number = 0
pv_number = 0
clk_number = 0
if_compressed_tested = 0
if_compressed = 0

#functions:
def compressed_stat(line):
global line_number
global tab_number
global pv_number
global clk_number
try:
line_number += 1
line_split_list = line.split("\t")
line_split_list_size = len(line_split_list)
tab_number += (line_split_list_size - 1)
index = 1
while index < line_split_list_size:
pv_clk_list = line_split_list[index].strip().split(" ")
pv_number += int(pv_clk_list[0])
clk_number += int(pv_clk_list[1])
index += 1
except ValueError:
print line,"\tERROR"


def before_compress_stat(line):
global line_number
global pv_number
global clk_number
try:
line_number += 1
line = line.strip()
line_split_list = line.split(" ")
pv_number += int(line_split_list[0])
clk_number += int(line_split_list[1])
except ValueError:
print line,"\tERROR"
#end functions

for line in sys.stdin:
try:
line = line.strip()
if if_compressed_tested == 0:
if_compressed_tested = 1
if line.find("\t") > 0:
if_compressed = 1
if if_compressed == 0:
before_compress_stat(line)
else:
compressed_stat(line)
except ValueError:
pass
if if_compressed == 1:
print ("%ld %ld %ld %ld"%(line_number, tab_number, pv_number,clk_number))
else:

print ("%ld %ld %ld"%(line_number,pv_number,clk_number))

Reducer:
import sys


line_number = 0
tab_number = 0
pv_number = 0
clk_number = 0
if_compressed_tested = 0
if_compressed = 0

def compressed_stat(line):
global line_number
global tab_number
global pv_number
global clk_number
pv_clk_list = line.split(" ")
if len(pv_clk_list) != 4:
print line,"\tERROR"
else:
line_number += int(pv_clk_list[0])
tab_number += int(pv_clk_list[1])
pv_number += int(pv_clk_list[2])
clk_number += int(pv_clk_list[3])

def before_compress_stat(line):
global line_number
global pv_number
global clk_number
pv_clk_list = line.split(" ")
if len(pv_clk_list) != 3:
print line,"\tERROR"
else:
line_number += int(pv_clk_list[0])
pv_number += int(pv_clk_list[1])
clk_number += int(pv_clk_list[2])
#

for line in sys.stdin:
try:
line = line.strip()
if line.count("ERROR") > 0:
print line
continue

if if_compressed_tested == 0:
if_compressed_tested = 1
if len(line.split(" ")) == 4:
if_compressed = 1
elif len(line.split(" ")) == 3:
if_compressed = 0
else:
print line,"\tERROR"
continue

if if_compressed == 0:
before_compress_stat(line)
else:
compressed_stat(line)
except ValueError:
print line, "\tERROR"
pass

if if_compressed == 0:
print "LINE_NUMBER:",line_number,"TOTAL_PV_NUMBER:",pv_number, "TOTAL_CLK_NUMBER:",clk_number
else:
print "LINE_NUMBER:",line_number,"TAB_NUMBER",tab_number,"TOTAL_PV_NUMBER:",pv_number, "TOTAL_CLK_NUMBER:",clk_number

Copyright © Linux教程網 All Rights Reserved