歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
Linux教程網 >> Linux基礎 >> Linux教程 >> 使用Python進行穩定可靠的文件操作

使用Python進行穩定可靠的文件操作

日期:2017/2/27 15:58:31   编辑:Linux教程
程序需要更新文件。雖然大部分程序員知道在執行I/O的時候會發生不可預期的事情,但是我經常看到一些異常幼稚的代碼。在本文中,我想要分享一些如何在Python代碼中改善I/O可靠性的見解。

考慮下述Python代碼片段。對文件中的數據進行某些操作,然後將結果保存回文件中:
with open(filename) as f:
   input = f.read()
output = do_something(input)
with open(filename, 'w') as f:
   f.write(output)
看起來很簡單吧?可能看起來並不像乍一看這麼簡單。我在產品服務器中調試應用,經常會出現奇怪的行為。

這是我看過的失效模式的例子:
  • 失控的服務器進程溢出大量日志,磁盤被填滿。write()在截斷文件之後拋出異常,文件將會變成空的。
  • 應用的幾個實例並行執行。在各個實例結束之後,因為混合了多個實例的輸出,文件內容最終變成了天書。
  • 在完成了寫操作之後,應用會觸發一些後續操作。幾秒鐘後斷電。在我們重啟了服務器之後,我們再一次看到了舊的文件內容。已經傳遞給其它應用的數據與我們在文件中看到的不再一致。

下面沒有什麼新的內容。本文的目的是為在系統編程方面缺少經驗的Python開發者提供常見的方法和技術。我將會提供代碼例子,使得開發者可以很容易的將這些方法應用到自己的代碼中。

“可靠性”意味著什麼?
廣義的講,可靠性意味著在所有規定的條件下操作都能執行它所需的函數。至於文件的操作,這個函數就是創建,替換或者追加文件的內容的問題。這裡可以從數據庫理論上獲得靈感。經典的事務模型的ACID性質作為指導來提高可靠性。
開始之前,讓我們先看看我們的例子怎樣和ACID4個性質扯上關系:

  • 原子性(Atomicity)要求這個事務要麼完全成功,要麼完全失敗。在上面的實例中,磁盤滿了可能導致部分內容寫入文件。另外,如果正當在寫入內容時其它程序又在讀取文件,它們可能獲得是部分完成的版本,甚至會導致寫錯誤
  • 一致性(Consistency) 表示操作必須從系統的一個狀態到另一個狀態。一致性可以分為兩部分:內部和外部一致性。內部一致性是指文件的數據結構是一致的。外部一致性是指文件的內容與它相關的數據是相符合的。在這個例子中,因為我們不了解這個應用,所以很難推斷是否符合一致性。但是因為一致性需要原子性,我們至少可以說沒有保證內部一致性。
  • 隔離性(Isolation)如果在並發的執行事務中,多個相同的事務導致了不同的結果,就違反了隔離性。很明顯上面的代碼對操作失敗或者其它隔離性失敗都沒有保護。
  • 持久性(Durability)意味著改變是持久不變的。在我們告訴用戶成功之前,我們必須確保我們的數據存儲是可靠的並且不只是一個寫緩存。上面的代碼已經成功寫入數據的前提是假設我們調用write()函數,磁盤I/O就立即執行。但是POSIX標准是不保證這個假設的。

盡可能使用數據庫系統
如果我們能夠獲得ACID 四個性質,那麼我們增加可靠性方面取得了長遠發展。但是這需要很大的編碼功勞。為什麼重復發明輪子?大多數數據庫系統已經有ACID事務了。

可靠性數據存儲已經是一個已解決的問題。如果你需要可靠性存儲,請使用數據庫。很可能,沒有幾十年的功夫,你自己解決這方面的能力沒有那些已經專注這方面好些年的人好。如果你不想安裝一個大數據庫服務器,那麼你可以使用sqlite,它具有ACID事務,很小,免費的,而且它包含在Python的標准庫中。

文章本該在這裡就結束的,但是還有一些有根有據的原因,就是不使用數據。它們通常是文件格式或者文件位置約束。這兩個在數據庫系統中都不好控制。理由如下:

  • 我們必須處理其它應用產生的固定格式或者在固定位置的文件,
  • 我們必須為了其它應用的消耗而寫文件(和應用了同樣的限制條件)
  • 我們的文件必須方便人閱讀或者修改。
如果我們自己動手實現可靠的文件更新,那麼這裡有一些編程技術供參考。下面我將展示四種常見的操作文件更新模式。在那之後,我會討論采取哪些步驟在每個文件更新模式下滿足ACID性質。

文件更新模式
文件可以以多種方式更新,但是我認為至少有四種常見的模式。這四種模式將做為本文剩余部分的基礎。
截斷-寫
這可能是最基本的模式。在下述例子中,假設的域模型代碼讀數據,執行一些計算,然後以寫模式重新打開存在的文件:
with open(filename, 'r') as f:
   model.read(f)
model.process()
with open(filename, 'w') as f:
   model.write(f)
此模式的一個變種以讀寫模式打開文件(Python中的“加”模式),尋找到開始的位置,顯式調用truncate(),重寫文件內容。
with open(filename, 'a+') as f:
   f.seek(0)
   model.input(f.read())
   model.compute()
   f.seek(0)
   f.truncate()
   f.write(model.output())
該變種的優勢是只打開文件一次,始終保持文件打開。舉例來說,這樣可以簡化加鎖。

寫-替換
另外一種廣泛使用的模式是將新內容寫到臨時文件,之後替換原始文件:
with tempfile.NamedTemporaryFile(
      'w', dir=os.path.dirname(filename), delete=False) as tf:
   tf.write(model.output())
   tempname = tf.name
os.rename(tempname, filename)
該方法與截斷-寫方法相比對錯誤更具有魯棒性。請看下面對原子性和一致性的討論。很多應用使用該方法。

這兩個模式很常見,以至於linux內核中的ext4文件系統甚至可以自動檢測到這些模式,自動修復一些可靠性缺陷。但是不要依賴這一特性:你並不是總是使用ext4,而且管理員可能會關掉這一特性。

追加
第三種模式就是追加新數據到已存在的文件:
with open(filename, 'a') as f:
   f.write(model.output())
這個模式用來寫日志文件和其它累積處理數據的任務。從技術上講,它的顯著特點就是極其簡單。一個有趣的擴展應用就是常規操作中只通過追加操作更新,然後定期重新整理文件,使之更緊湊。

Spooldir
這裡我們將目錄做為邏輯數據存儲,為每條記錄創建新的唯一命名的文件:
with open(unique_filename(), 'w') as f:
   f.write(model.output())
該模式與附加模式一樣具有累積的特點。一個巨大的優勢是我們可以在文件名中放入少量元數據。舉例來說,這可以用於傳達處理狀態的信息。spooldir模式的一個特別巧妙的實現是maildir格式。maildirs使用附加子目錄的命名方案,以可靠的、無鎖的方式執行更新操作。md和gocept.filestore庫為maildir操作提供了方便的封裝。
如果你的文件名生成不能保證唯一的結果,甚至有可能要求文件必須實際上是新的。那麼調用具有合適標志的低等級os.open():
fd = os.open(filename, os.O_WRONLY | os.O_CREAT| os.O_EXCL, 0o666)
with os.fdopen(fd, 'w') as f:
   f.write(...)
在以O_EXCL方式打開文件後,我們用os.fdopen將原始的文件描述符轉化為普通的Python文件對象。

應用ACID屬性到文件更新
下面,我將嘗試加強文件更新模式。反過來讓我們看看可以做些什麼來滿足ACID屬性。我將會盡可能保持簡單,因為我們並不是要寫一個完整的數據庫系統。請注意本節的材料並不徹底,但是可以為你自己的實驗提供一個好的起點。

原子性
寫-替換
模式提供了原子性,因為底層的os.rename()是原子性的。這意味著在任意給定時間點,進程或者看到舊的文件,或者看到新的文件。該模式對寫錯誤具有天然的魯棒性:如果寫操作觸發異常,重命名操作就不會被執行,所有就沒有用損壞的新文件覆蓋正確的舊文件的風險。

附加模式並不是原子性的,因為有附加不完整記錄的風險。但是有個技巧可以使更新具有原子性:為每個寫操作標注校驗和。之後讀日志的時候,忽略所有沒有有效校驗和的記錄。以這種方式,只有完整的記錄才會被處理。在下面的例子中,應用做周期性的測量,每次在日志中附加一行JSON記錄。我們計算記錄的字節表示形式的CRC32校驗和,然後附加到同一行:
with open(logfile, 'ab') as f:
    for i in range(3):
        measure = {'timestamp': time.time(), 'value': random.random()}
        record = json.dumps(measure).encode()
        checksum = '{:8x}'.format(zlib.crc32(record)).encode()
        f.write(record + b' ' + checksum + b'\n')

該例子代碼通過每次創建隨機值模擬測量。

$ cat log
{"timestamp": 1373396987.258189, "value": 0.9360123151217828} 9495b87a
{"timestamp": 1373396987.25825, "value": 0.40429005476999424} 149afc22
{"timestamp": 1373396987.258291, "value": 0.232021160265939} d229d937

想要處理這個日志文件,我們每次讀一行記錄,分離校驗和,與讀到的記錄比較。

with open(logfile, 'rb') as f:
    for line in f:
        record, checksum = line.strip().rsplit(b' ', 1)
        if checksum.decode() == '{:8x}'.format(zlib.crc32(record)):
            print('read measure: {}'.format(json.loads(record.decode())))
        else:
            print('checksum error for record {}'.format(record))

現在我們通過截斷最後一行模擬被截斷的寫操作:

$ cat log
{"timestamp": 1373396987.258189, "value": 0.9360123151217828} 9495b87a
{"timestamp": 1373396987.25825, "value": 0.40429005476999424} 149afc22
{"timestamp": 1373396987.258291, "value": 0.23202

當讀日志的時候,最後不完整的一行被拒絕:

$ read_checksummed_log.py log
read measure: {'timestamp': 1373396987.258189, 'value': 0.9360123151217828}
read measure: {'timestamp': 1373396987.25825, 'value': 0.40429005476999424}
checksum error for record b'{"timestamp": 1373396987.258291, "value":'

添加校驗和到日志記錄的方法被用於大量應用,包括很多數據庫系統。
spooldir中的單個文件也可以在每個文件中添加校驗和。另外一個可能更簡單的方法是借用寫-替換模式:首先將文件寫到一邊,然後移到最終的位置。設計一個保護正在被消費者處理的文件的命名方案。在下面的例子中,所有以.tmp結尾的文件都會被讀取程序忽略,因此在寫操作的時候可以安全的使用。

newfile = generate_id()
with open(newfile + '.tmp', 'w') as f:
   f.write(model.output())
os.rename(newfile + '.tmp', newfile)
最後,截斷-寫是非原子性的。很遺憾我不能提供滿足原子性的變種。在執行完截取操作後,文件是空的,還沒有新內容寫入。如果並發的程序現在讀文件或者有異常發生,程序中止,我們既看不久的版本也看不到新的版本。

一致性
我談論的關於原子性的大部分內容也可以應用到一致性。實際上,原子性更新是內部一致性的前提條件。外部一致性意味著同步更新幾個文件。這不容易做到,鎖文件可以用來確保讀寫訪問互不干涉。考慮某目錄下的文件需要互相保持一致。常用的模式是指定鎖文件,用來控制對整個目錄的訪問。
寫程序的例子:
with open(os.path.join(dirname, '.lock'), 'a+') as lockfile:
   fcntl.flock(lockfile, fcntl.LOCK_EX)
   model.update(dirname)
讀程序的例子:
with open(os.path.join(dirname, '.lock'), 'a+') as lockfile:
   fcntl.flock(lockfile, fcntl.LOCK_SH)
   model.readall(dirname)
該方法只有控制所有讀程序才生效。因為每次只有一個寫程序活動(獨占鎖阻塞所有共享鎖),所有該方法的可擴展性有限。

更進一步,我們可以對整個目錄應用寫-替換模式。這涉及為每次更新創建新的目錄,更新完成後改變符合鏈接。舉例來說,鏡像應用維護一個包含壓縮包和列出了文件名、文件大小和校驗和的索引文件的目錄。當上流的鏡像更新,僅僅隔離地對壓縮包和索引文件進項原子性更新是不夠的。相反,我們需要同時提供壓縮包和索引文件以免校驗和不匹配。為了解決這個問題,我們為每次生成維護一個子目錄,然後改變符號鏈接激活該次生成。
mirror
|-- 483
|   |-- a.tgz
|   |-- b.tgz
|   `-- index.json
|-- 484
|   |-- a.tgz
|   |-- b.tgz
|   |-- c.tgz
|   `-- index.json
`-- current -> 483
新的生成484正在被更新的過程中。當所有壓縮包准備好,索引文件更新後,我們可以用一次原子調用os.symlink()來切換current符號鏈接。其它應用總是或者看到完全舊的或者完全新的生成。讀程序需要使用os.chdir()進入current目錄,很重要的是不要用完整路徑名指定文件。否在當讀程序打開current/index.json,然後打開current/a.tgz,但是同時符號鏈接已經改變時就會出現競爭條件。

隔離性
隔離性意味著對同一文件的並發更新是可串行化的——存在一個串行調度使得實際執行的並行調度返回相同的結果。“真實的”數據庫系統使用像MVCC這種高級技術維護可串行性,同時允許高等級的可並行性。回到我們的場景,我們最後使用加鎖來串行文件更新。
截斷-寫更新進行加鎖是容易的。僅僅在所有文件操作前獲取一個獨占鎖就可以。下面的例子代碼從文件中讀取一個整數,然後遞增,最後更新文件:
def update():
   with open(filename, 'r+') as f:
      fcntl.flock(f, fcntl.LOCK_EX)
      n = int(f.read())
      n += 1
      f.seek(0)
      f.truncate()
      f.write('{}\n'.format(n))
使用 寫-替換模式加鎖更新就有點兒技巧。像 截斷-寫那樣使用鎖可能導致更新沖突。某個幼稚的實現可能看起來像這樣:
def update():
   with open(filename) as f:
      fcntl.flock(f, fcntl.LOCK_EX)
      n = int(f.read())
      n += 1
      with tempfile.NamedTemporaryFile(
            'w', dir=os.path.dirname(filename), delete=False) as tf:
         tf.write('{}\n'.format(n))
         tempname = tf.name
      os.rename(tempname, filename)
這段代碼有什麼問題呢?設想兩個進程競爭更新某個文件。第一個進程運行在前面,但是第二個進程阻塞在fcntl.flock()調用。當第一個進程替換了文件,釋放了鎖,現在在第二個進程中打開的文件描述符指向了一個包含舊內容的“幽靈”文件(任意路徑名都不可達)。想要避免這個沖突,我們必須檢查打開的文件是否與fcntl.flock()返回的相同。所以我寫了一個新的LockedOpen上下文管理器來替換內建的open上下文。來確保我們實際打開了正確的文件:
class LockedOpen(object):

    def __init__(self, filename, *args, **kwargs):
        self.filename = filename
        self.open_args = args
        self.open_kwargs = kwargs
        self.fileobj = None

    def __enter__(self):
        f = open(self.filename, *self.open_args, **self.open_kwargs)
        while True:
            fcntl.flock(f, fcntl.LOCK_EX)
            fnew = open(self.filename, *self.open_args, **self.open_kwargs)
            if os.path.sameopenfile(f.fileno(), fnew.fileno()):
                fnew.close()
                break
            else:
                f.close()
                f = fnew
        self.fileobj = f
        return f

    def __exit__(self, _exc_type, _exc_value, _traceback):
        self.fileobj.close()
追加更新上鎖如同給截斷-寫更新上鎖一樣簡單:需要一個排他鎖,然後追加就完成了。需要長期運行的會將文件長久的打開的進程,可以在更新時釋放鎖,讓其它進入。

spooldir模式有個很優美的性質就是它不需要任何鎖。此外,你建立在使用靈活的命名模式和一個健壯的文件名分代。郵件目錄規范就是一個spooldir模式的好例子。它可以很容易的適應其它情況,不僅僅是處理郵件。

持久性
持久性有點特殊,因為它不僅依賴於應用,也與OS和硬件配置有關。理論上來說,我們可以假定,如果數據沒有到達持久存儲,os.fsync()或 os.fdatasync()調用就沒有返回結果。在實際情況中,我們有可能會遇到幾個問題:我們可能會面對不完整的fsync實現,或者糟糕的磁盤控制 器配置,它們都無法提供任何持久化的保證。有一個來自 MySQL 開發者 的討論對哪裡會發生錯誤進行了詳盡的討論。有些像PostgreSQL 之類的數據庫系統,甚至提供了持久化機制的選擇 ,以便管理員在運行時刻選擇最佳的一個。然而不走運的人只能使用os.fsync(),並期待它可以被正確的實現。

通過截斷-寫模式,在結束寫操作以後關閉文件以前,我們需要發送一個同步信號。注意通常這還牽涉到另一個層次的寫緩存。glibc 緩存 甚至會在寫操作傳遞到內核以前,在進程內部攔住它。同樣為了得到空的glibc緩存,我們需要在同步以前對它flush():
with open(filename, 'w') as f:
   model.write(f)
   f.flush()
   os.fdatasync(f)
要不,你也可以帶參數-u調用Python,以此為所有的文件I/O獲得未緩沖的寫。
大多數時候相較os.fsync()我更喜歡os.fdatasync(),以此避免同步元數據的更新(所有權、大小、mtime…)。元數據的更新可最終導致磁盤I/O搜索操作,這會使整個過程慢不少。

對寫-替換風格更新使用同樣的技巧只是成功了一半。我們得確保在代替舊文件之前,新寫入文件的內容已經寫入了非易失性存儲器上了,但是替換操作怎麼辦?我們不能保證那個目錄更新是否執行的剛剛好。在網絡上有很多關於怎麼讓同步目錄更新的長篇大論。但是在我們這種情況,舊文件和新文件都在同一個目錄下,我們可以使用簡單的解決方案來逃避這個這題。
os.rename(tempname, filename)
dirfd = os.open(os.path.dirname(filename), os.O_DIRECTORY)
os.fsync(dirfd)
os.close(dirfd)
我們調用底層的os.open()來打開目錄(Python自帶的open()方法不支持打開目錄),然後在目錄文件描述符上執行os.fsync()。

對待追加更新和我以及說過的截斷-寫是相似的。

spooldir模式與寫-替換模式同樣的目錄同步問題。幸運地是,可以使用同樣的解決方案:第一步同步文件,然後同步目錄。

總結
這使可靠的更新文件成為可能。我已經演示了滿足ACID的四大性質。這些展示的實例代碼充當一個工具箱。掌握這編程技術最大的滿足你的需求。有時,你並不需要滿足所有的ACID性質,可能僅僅需要一到兩個。我希望這篇文章可以幫助你去做已充分了解的決定,什麼該去實現以及什麼該捨棄。
from:oschina
Copyright © Linux教程網 All Rights Reserved