本文主要講述如何使用XMemcached客戶端與Memcached服務端進行交互。通過XMemcached的API調用與Memcached的set/get命令對比及跟蹤XMemcached源碼,使大家對XMemcached的API有更深層次的理解,能夠從底層上去了解其工作原理,從而能在項目中進行一些針對性的接口封閉及優化工作。
網上有種說法是:Memcache是這個項目的名稱,而memcached是它服務器端的主程序文件名。我又查了Memcache的官網http://memcached.org/,home頁一直引用的是Memcached。姑且不論該叫什麼名稱合適,在這裡統一稱呼為Memcached,僅代表我的個人習慣。
言歸正題,Memcached是分布式高性能內存級別的對象緩存系統,並且是開源免費項目。它的所有key-value數據全部放在內存中,這是其高效的一個原因,同時也意味著系統關閉時,全部數據就會丟失。利用Memcached作用緩存系統,可以減少動態網站數據庫查詢次數,提升網站性能,常作為web2.0網站緩存解決方案。Memcached客戶端提供多種語言API支持,像C/C++、Perl、PHP、Java、C#、Ruby等。
Memcached的Java客戶端目前有3個
前兩個客戶端的使用,這裡不做詳述。
本人是用Maven構建的項目,為了使用XMemcached,需要在pom.xml中加入
<dependency> <groupId>com.googlecode.xmemcached</groupId> <artifactId>xmemcached</artifactId> <version>1.4.3</version> </dependency>
XMemcached使用示例Demo如下
public static void main(String[] args) throws IOException {
MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses("127.0.0.1:11211"));
MemcachedClient memcachedClient = builder.build();
try {
memcachedClient.set("key", 0, "Hello World!");
String value = memcachedClient.get("key");
System.out.println("key值:" + value);
}
catch (Exception e) {
e.printStackTrace();
}
try {
memcachedClient.shutdown();
}
catch (IOException e) {
e.printStackTrace();
}
}
接下來詳細追蹤下這兩個方法的源碼
大家可以用過debug模式,一步步追蹤set及get過程,具體過程不演示了,先直接列出set方法大概的源碼調用過程如下(中間可能省略了某些方法調用)
XMemcachedClient.set()
XMemcachedClient.sendCommand()
MemcachedConnector.send()
AbstractSession.write()
MemcachedTCPSession.wrapMessage()
TextStoreCommand.encode()
TextStoreCommand.encodeValue()
SerializingTranscoder.encode()
BaseSerializingTranscoder.serialize()
先是調用XMemcacheClient.set(final String key, final int exp, final Object value)方法,key形參對應字符串“key”,exp形參對應整數0(表達緩存永不過期),value形參對應字符串“Hello World!”。經過上述一系列方法調用,最終調用到SerializingTranscoder.encode(Object o)方法,此時形參o接收到的實參值就是set的字符串“Hello World!”,該方法體代碼如下:
public final CachedData encode(Object o) {
byte[] b = null;
int flags = 0;
if (o instanceof String) {
b = encodeString((String) o);
} else if (o instanceof Long) {
if (this.primitiveAsString) {
b = encodeString(o.toString());
} else {
b = this.transcoderUtils.encodeLong((Long) o);
}
flags |= SPECIAL_LONG;
} else if (o instanceof Integer) {
if (this.primitiveAsString) {
b = encodeString(o.toString());
} else {
b = this.transcoderUtils.encodeInt((Integer) o);
}
flags |= SPECIAL_INT;
} else if (o instanceof Boolean) {
if (this.primitiveAsString) {
b = encodeString(o.toString());
} else {
b = this.transcoderUtils.encodeBoolean((Boolean) o);
}
flags |= SPECIAL_BOOLEAN;
} else if (o instanceof Date) {
b = this.transcoderUtils.encodeLong(((Date) o).getTime());
flags |= SPECIAL_DATE;
} else if (o instanceof Byte) {
if (this.primitiveAsString) {
b = encodeString(o.toString());
} else {
b = this.transcoderUtils.encodeByte((Byte) o);
}
flags |= SPECIAL_BYTE;
} else if (o instanceof Float) {
if (this.primitiveAsString) {
b = encodeString(o.toString());
} else {
b = this.transcoderUtils.encodeInt(Float
.floatToRawIntBits((Float) o));
}
flags |= SPECIAL_FLOAT;
} else if (o instanceof Double) {
if (this.primitiveAsString) {
b = encodeString(o.toString());
} else {
b = this.transcoderUtils.encodeLong(Double
.doubleToRawLongBits((Double) o));
}
flags |= SPECIAL_DOUBLE;
} else if (o instanceof byte[]) {
b = (byte[]) o;
flags |= SPECIAL_BYTEARRAY;
} else {
b = serialize(o);
flags |= SERIALIZED;
}
assert b != null;
if (this.primitiveAsString) {
// It is not be SERIALIZED,so change it to string type
if ((flags & SERIALIZED) == 0) {
flags = 0;
}
}
if (b.length > this.compressionThreshold) {
byte[] compressed = compress(b);
if (compressed.length < b.length) {
if (log.isDebugEnabled()) {
log.debug("Compressed " + o.getClass().getName() + " from "
+ b.length + " to " + compressed.length);
}
b = compressed;
flags |= COMPRESSED;
} else {
if (log.isDebugEnabled()) {
log.debug("Compression increased the size of "
+ o.getClass().getName() + " from " + b.length
+ " to " + compressed.length);
}
}
}
return new CachedData(flags, b, this.maxSize, -1);
}
先是申明了局部變量b(用來存儲需要放入memcached服務器的字節數組)及flags(用來存儲標志信息)。然後依次判斷對象o是否字符串類型、長整型類型等,並將對象o編碼成相應的字節數組存放在局部變量b中。
特別注意第57行,當o的類型不是字符串、基本類型的包裝類型及byte[]數組時,會調用BaseSerializingTranscoder.serialize()方法,該方法源代碼如下:
protected byte[] serialize(Object o) {
if (o == null) {
throw new NullPointerException("Can't serialize null");
}
byte[] rv = null;
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream os = new ObjectOutputStream(bos);
os.writeObject(o);
os.close();
bos.close();
rv = bos.toByteArray();
} catch (IOException e) {
throw new IllegalArgumentException("Non-serializable object", e);
}
return rv;
}
很明顯,該方法就是進行對象序列化,將Java對象轉化成byte數組並返回。相信大家看到這裡,應該明白了為什麼自定義對象需要實現Serializable接口才能保存進Memcached中。如果數據對象沒有實現Serializable接口,那麼在進行對象序列化時,將會拋出IOException,最終拋出IllegalArgumentException,並提示Non-serializable object。
另著重說明下CachedData類的作用,該類封裝了cas值(該值用來實現原子更新,即客戶端每次發出更新請求時,請求信息中都會附帶該cas值,memcached服務端在收到請求後,會將該cas值與服務器中存儲數據的cas值對比,如果相等,則用新的數據覆蓋老的數據;否則,更新失敗。在並發環境下特別有用)、data數據(即要緩存的數據值或者獲取到的緩存數據,以byte[]數組形式存儲),flag信息(標識byte[]數組額外數據類型信息及byte[]數組是否進行過壓縮等信息,用一個int類型存儲)及其它信息。
set源碼分析到這裡,下面說下get源碼。
同樣的,先列出get方法大概的源碼調用過程如下:
XMemcachedClient.get()
XMemcachedClient.fetch0()
XMemcachedClient.sendCommand()
MemcachedConnector.send()
AbstractSession.write()
MemcachedTCPSession.wrapMessage()
TextGetCommand.encode()
SerializingTranscoder.decode()
SerializingTranscoder.decode0()
BaseSerializingTranscoder.deserialize()
先是調用XMemcacheClient.get(final String key)方法,key形參對應字符串“key"。從該方法一直到TextGetCommand.encode()調用,可以看作是組裝get命令並發送到服務器過程,在收到服務器響應消息後,將響應消息組裝成CachedData,並調用SerializingTranscoder.decode(CachedData d)方法,即進行字節流解碼工作。該方法代碼如下:
public final Object decode(CachedData d) {
byte[] data = d.getData();
int flags = d.getFlag();
if ((flags & COMPRESSED) != 0) {
data = decompress(d.getData());
}
flags = flags & SPECIAL_MASK;
return decode0(d,data, flags);
}
先是獲取字節數組及標志信息,根據標志位決定是否要解壓縮字節數組。最後調用decode0(CachedData cachedData,byte[] data, int flags)方法,代碼如下:
protected final Object decode0(CachedData cachedData,byte[] data, int flags) {
Object rv = null;
if ((cachedData.getFlag() & SERIALIZED) != 0 && data != null) {
rv = deserialize(data);
} else {
if (this.primitiveAsString) {
if (flags == 0) {
return decodeString(data);
}
}
if (flags != 0 && data != null) {
switch (flags) {
case SPECIAL_BOOLEAN:
rv = Boolean.valueOf(this.transcoderUtils
.decodeBoolean(data));
break;
case SPECIAL_INT:
rv = Integer.valueOf(this.transcoderUtils.decodeInt(data));
break;
case SPECIAL_LONG:
rv = Long.valueOf(this.transcoderUtils.decodeLong(data));
break;
case SPECIAL_BYTE:
rv = Byte.valueOf(this.transcoderUtils.decodeByte(data));
break;
case SPECIAL_FLOAT:
rv = new Float(Float.intBitsToFloat(this.transcoderUtils
.decodeInt(data)));
break;
case SPECIAL_DOUBLE:
rv = new Double(Double
.longBitsToDouble(this.transcoderUtils
.decodeLong(data)));
break;
case SPECIAL_DATE:
rv = new Date(this.transcoderUtils.decodeLong(data));
break;
case SPECIAL_BYTEARRAY:
rv = data;
break;
default:
log
.warn(String.format("Undecodeable with flags %x",
flags));
}
} else {
rv = decodeString(data);
}
}
return rv;
}
上面方法實際上就是encode(Object o)方法的逆向實現,即將字節數組轉化成Object對象。注意第4行調用了deserialize(byte[] in)方法,該方法代碼如下(省略了catch、finally部分):
protected Object deserialize(byte[] in) {
Object rv = null;
ByteArrayInputStream bis = null;
ObjectInputStream is = null;
try {
if (in != null) {
bis = new ByteArrayInputStream(in);
is = new ObjectInputStream(bis) {
@Override
protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
try {
//When class is not found,try to load it from context class loader.
return super.resolveClass(desc);
} catch (ClassNotFoundException e) {
return Thread.currentThread().getContextClassLoader().loadClass(desc.getName());
}
}
};
rv = is.readObject();
}
}
...
return rv;
}
上述代碼就是反序列化對象並返回。每次反序列化操作,得到的都是一個全新對象,對該新對象進行的任何操作並不會影響memcached中存儲的值。
更多詳情見請繼續閱讀下一頁的精彩內容: http://www.linuxidc.com/Linux/2015-02/113101p2.htm