with open(filename) as f: input = f.read() output = do_something(input) with open(filename, 'w') as f: f.write(output)看起來很簡單吧?可能看起來並不像乍一看這麼簡單。我在產品服務器中調試應用,經常會出現奇怪的行為。
下面沒有什麼新的內容。本文的目的是為在系統編程方面缺少經驗的Python開發者提供常見的方法和技術。我將會提供代碼例子,使得開發者可以很容易的將這些方法應用到自己的代碼中。
“可靠性”意味著什麼?
廣義的講,可靠性意味著在所有規定的條件下操作都能執行它所需的函數。至於文件的操作,這個函數就是創建,替換或者追加文件的內容的問題。這裡可以從數據庫理論上獲得靈感。經典的事務模型的ACID性質作為指導來提高可靠性。
開始之前,讓我們先看看我們的例子怎樣和ACID4個性質扯上關系:
文章本該在這裡就結束的,但是還有一些有根有據的原因,就是不使用數據。它們通常是文件格式或者文件位置約束。這兩個在數據庫系統中都不好控制。理由如下:
with open(filename, 'r') as f: model.read(f) model.process() with open(filename, 'w') as f: model.write(f)此模式的一個變種以讀寫模式打開文件(Python中的“加”模式),尋找到開始的位置,顯式調用truncate(),重寫文件內容。
with open(filename, 'a+') as f: f.seek(0) model.input(f.read()) model.compute() f.seek(0) f.truncate() f.write(model.output())該變種的優勢是只打開文件一次,始終保持文件打開。舉例來說,這樣可以簡化加鎖。
with tempfile.NamedTemporaryFile( 'w', dir=os.path.dirname(filename), delete=False) as tf: tf.write(model.output()) tempname = tf.name os.rename(tempname, filename)該方法與截斷-寫方法相比對錯誤更具有魯棒性。請看下面對原子性和一致性的討論。很多應用使用該方法。
with open(filename, 'a') as f: f.write(model.output())這個模式用來寫日志文件和其它累積處理數據的任務。從技術上講,它的顯著特點就是極其簡單。一個有趣的擴展應用就是常規操作中只通過追加操作更新,然後定期重新整理文件,使之更緊湊。
with open(unique_filename(), 'w') as f: f.write(model.output())該模式與附加模式一樣具有累積的特點。一個巨大的優勢是我們可以在文件名中放入少量元數據。舉例來說,這可以用於傳達處理狀態的信息。spooldir模式的一個特別巧妙的實現是maildir格式。maildirs使用附加子目錄的命名方案,以可靠的、無鎖的方式執行更新操作。md和gocept.filestore庫為maildir操作提供了方便的封裝。
fd = os.open(filename, os.O_WRONLY | os.O_CREAT| os.O_EXCL, 0o666) with os.fdopen(fd, 'w') as f: f.write(...)在以O_EXCL方式打開文件後,我們用os.fdopen將原始的文件描述符轉化為普通的Python文件對象。
with open(logfile, 'ab') as f: for i in range(3): measure = {'timestamp': time.time(), 'value': random.random()} record = json.dumps(measure).encode() checksum = '{:8x}'.format(zlib.crc32(record)).encode() f.write(record + b' ' + checksum + b'\n')
該例子代碼通過每次創建隨機值模擬測量。
$ cat log {"timestamp": 1373396987.258189, "value": 0.9360123151217828} 9495b87a {"timestamp": 1373396987.25825, "value": 0.40429005476999424} 149afc22 {"timestamp": 1373396987.258291, "value": 0.232021160265939} d229d937
想要處理這個日志文件,我們每次讀一行記錄,分離校驗和,與讀到的記錄比較。
with open(logfile, 'rb') as f: for line in f: record, checksum = line.strip().rsplit(b' ', 1) if checksum.decode() == '{:8x}'.format(zlib.crc32(record)): print('read measure: {}'.format(json.loads(record.decode()))) else: print('checksum error for record {}'.format(record))
現在我們通過截斷最後一行模擬被截斷的寫操作:
$ cat log {"timestamp": 1373396987.258189, "value": 0.9360123151217828} 9495b87a {"timestamp": 1373396987.25825, "value": 0.40429005476999424} 149afc22 {"timestamp": 1373396987.258291, "value": 0.23202
當讀日志的時候,最後不完整的一行被拒絕:
$ read_checksummed_log.py log read measure: {'timestamp': 1373396987.258189, 'value': 0.9360123151217828} read measure: {'timestamp': 1373396987.25825, 'value': 0.40429005476999424} checksum error for record b'{"timestamp": 1373396987.258291, "value":'
添加校驗和到日志記錄的方法被用於大量應用,包括很多數據庫系統。
spooldir中的單個文件也可以在每個文件中添加校驗和。另外一個可能更簡單的方法是借用寫-替換模式:首先將文件寫到一邊,然後移到最終的位置。設計一個保護正在被消費者處理的文件的命名方案。在下面的例子中,所有以.tmp結尾的文件都會被讀取程序忽略,因此在寫操作的時候可以安全的使用。
newfile = generate_id() with open(newfile + '.tmp', 'w') as f: f.write(model.output()) os.rename(newfile + '.tmp', newfile)最後,截斷-寫是非原子性的。很遺憾我不能提供滿足原子性的變種。在執行完截取操作後,文件是空的,還沒有新內容寫入。如果並發的程序現在讀文件或者有異常發生,程序中止,我們既看不久的版本也看不到新的版本。
with open(os.path.join(dirname, '.lock'), 'a+') as lockfile: fcntl.flock(lockfile, fcntl.LOCK_EX) model.update(dirname)讀程序的例子:
with open(os.path.join(dirname, '.lock'), 'a+') as lockfile: fcntl.flock(lockfile, fcntl.LOCK_SH) model.readall(dirname)該方法只有控制所有讀程序才生效。因為每次只有一個寫程序活動(獨占鎖阻塞所有共享鎖),所有該方法的可擴展性有限。
mirror |-- 483 | |-- a.tgz | |-- b.tgz | `-- index.json |-- 484 | |-- a.tgz | |-- b.tgz | |-- c.tgz | `-- index.json `-- current -> 483新的生成484正在被更新的過程中。當所有壓縮包准備好,索引文件更新後,我們可以用一次原子調用os.symlink()來切換current符號鏈接。其它應用總是或者看到完全舊的或者完全新的生成。讀程序需要使用os.chdir()進入current目錄,很重要的是不要用完整路徑名指定文件。否在當讀程序打開current/index.json,然後打開current/a.tgz,但是同時符號鏈接已經改變時就會出現競爭條件。
def update(): with open(filename, 'r+') as f: fcntl.flock(f, fcntl.LOCK_EX) n = int(f.read()) n += 1 f.seek(0) f.truncate() f.write('{}\n'.format(n))使用 寫-替換模式加鎖更新就有點兒技巧。像 截斷-寫那樣使用鎖可能導致更新沖突。某個幼稚的實現可能看起來像這樣:
def update(): with open(filename) as f: fcntl.flock(f, fcntl.LOCK_EX) n = int(f.read()) n += 1 with tempfile.NamedTemporaryFile( 'w', dir=os.path.dirname(filename), delete=False) as tf: tf.write('{}\n'.format(n)) tempname = tf.name os.rename(tempname, filename)這段代碼有什麼問題呢?設想兩個進程競爭更新某個文件。第一個進程運行在前面,但是第二個進程阻塞在fcntl.flock()調用。當第一個進程替換了文件,釋放了鎖,現在在第二個進程中打開的文件描述符指向了一個包含舊內容的“幽靈”文件(任意路徑名都不可達)。想要避免這個沖突,我們必須檢查打開的文件是否與fcntl.flock()返回的相同。所以我寫了一個新的LockedOpen上下文管理器來替換內建的open上下文。來確保我們實際打開了正確的文件:
class LockedOpen(object): def __init__(self, filename, *args, **kwargs): self.filename = filename self.open_args = args self.open_kwargs = kwargs self.fileobj = None def __enter__(self): f = open(self.filename, *self.open_args, **self.open_kwargs) while True: fcntl.flock(f, fcntl.LOCK_EX) fnew = open(self.filename, *self.open_args, **self.open_kwargs) if os.path.sameopenfile(f.fileno(), fnew.fileno()): fnew.close() break else: f.close() f = fnew self.fileobj = f return f def __exit__(self, _exc_type, _exc_value, _traceback): self.fileobj.close()給追加更新上鎖如同給截斷-寫更新上鎖一樣簡單:需要一個排他鎖,然後追加就完成了。需要長期運行的會將文件長久的打開的進程,可以在更新時釋放鎖,讓其它進入。
with open(filename, 'w') as f: model.write(f) f.flush() os.fdatasync(f)要不,你也可以帶參數-u調用Python,以此為所有的文件I/O獲得未緩沖的寫。
os.rename(tempname, filename) dirfd = os.open(os.path.dirname(filename), os.O_DIRECTORY) os.fsync(dirfd) os.close(dirfd)我們調用底層的os.open()來打開目錄(Python自帶的open()方法不支持打開目錄),然後在目錄文件描述符上執行os.fsync()。