歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
Linux教程網 >> Linux編程 >> Linux編程 >> MapReduce的一對多連接操作

MapReduce的一對多連接操作

日期:2017/3/1 9:53:28   编辑:Linux編程

問題描述:
一個trade table表
product1"trade1
product2"trade2
product3"trade3
一個pay table表
product1"pay1
product2"pay2
product2"pay3
product1"pay4
product3"pay5
product3"pay6

建立兩個表之間的連接,該兩表是一對多關系的
如下:
trade1pay1
trade1pay4
trade2pay2
...

思路:

為了將兩個表整合到一起,由於有相同的第一列,且第一個表與第二個表是一對多關系的。
這裡依然采用分組,以及組內排序,只要保證一方最先到達reduce端,則就可以進行迭代處理了。
為了保證第一個表先到達reduce端,可以為定義一個組合鍵,包含兩個值,第一個值為product,第二個值為0或者1,來分別代表第一個表和第二個表,只要按照組內升序排列即可。

具體代碼:

自定義組合鍵策略

package whut.onetomany;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.Hadoop.io.WritableComparable;
public class TextIntPair implements WritableComparable{
//product1 0/1
private String firstKey;//product1
private int secondKey;//0,1;0代表是trade表,1代表是pay表
//只需要保證trade表在pay表前面就行,則只需要對組順序排列

public String getFirstKey() {
return firstKey;
}
public void setFirstKey(String firstKey) {
this.firstKey = firstKey;
}
public int getSecondKey() {
return secondKey;
}
public void setSecondKey(int secondKey) {
this.secondKey = secondKey;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(firstKey);
out.writeInt(secondKey);
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
firstKey=in.readUTF();
secondKey=in.readInt();
}

@Override
public int compareTo(Object o) {
// TODO Auto-generated method stub
TextIntPair tip=(TextIntPair)o;
return this.getFirstKey().compareTo(tip.getFirstKey());
}
}

分組策略

package whut.onetomany;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class TextComparator extends WritableComparator{
protected TextComparator() {
super(TextIntPair.class,true);//注冊比較器
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
// TODO Auto-generated method stub
TextIntPair tip1=(TextIntPair)a;
TextIntPair tip2=(TextIntPair)b;
return tip1.getFirstKey().compareTo(tip2.getFirstKey());
}
}

組內排序策略:目的是保證第一個表比第二個表先到達
package whut.onetomany;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
//分組內部進行排序,按照第二個字段進行排序
public class TextIntComparator extends WritableComparator {
public TextIntComparator()
{
super(TextIntPair.class,true);
}
//這裡可以進行排序的方式管理
//必須保證是同一個分組的
//a與b進行比較
//如果a在前b在後,則會產生升序
//如果a在後b在前,則會產生降序
@Override
public int compare(WritableComparable a, WritableComparable b) {
// TODO Auto-generated method stub
TextIntPair ti1=(TextIntPair)a;
TextIntPair ti2=(TextIntPair)b;
//首先要保證是同一個組內,同一個組的標識就是第一個字段相同
if(!ti1.getFirstKey().equals(ti2.getFirstKey()))
return ti1.getFirstKey().compareTo(ti2.getFirstKey());
else
return ti1.getSecondKey()-ti2.getSecondKey();//0,-1,1
}

}

分區策略:

package whut.onetomany;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class PartitionByText extends Partitioner<TextIntPair, Text> {
@Override
public int getPartition(TextIntPair key, Text value, int numPartitions) {
// TODO Auto-generated method stub
return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%numPartitions;
}
}

Copyright © Linux教程網 All Rights Reserved