新用戶登錄后自動創建賬號
登錄大數據的批處理和實時處理模式已經存在了。但是,還沒有一種模式可以允許我們實時批處理非獨立數據。合作伙伴一旦收到操作指導,Expedia的營銷團隊需要分析相互依賴的數據集。現存的系統運行于一個本地Hadoop集群中,但是整個團隊一直在努力達到內部的SLA(服務等級協議)。這些信息也是有時效的,更快地獲取數據意味著給合作伙伴更好的操作指導。
從事于Expedia研究的Pariveda小組參與AWS的Solutions Architects(解決方案架構)研究來應對三個明顯的挑戰:如何在源數據可用后盡快傳遞分析結果;如何處理相互依賴但又在不同時間產生的數據集;如何管理不同時間達到的數據集之間的從屬關系。
在本文中,我將會描述Expedia,Pariveda和AWS團隊是如何以 AWS Lambda,Amazon DynamoDB,Amazon EMR和Amazon S3為組成部分,找出獨特方法來實時處理數據的。你將會學到在不管理任何基礎設施的情況下如何實施一個相似的傳遞途徑。
解除數據集間的相互依賴關系
我們需要的解決的一個問題是數據集間的相互依賴關系。我們的目標是為另一個系統提供統一,清潔的數據輸入源以便進行更加詳細的分析。為了創建這些數據輸入源,每天來自多個合作伙伴和內部系統的不同種類的成百上千的數據集必須經過接收,聚合和查詢。每一種數據,每一個合作伙伴的數據達到的時間都是不同的。這意味著數據處理過程需要持續,直到一個數據輸入源所需的所有數據都到達。
下面概括的解決方案就是我們的成果。我們使用從屬于一個S3桶的AWS Lambda來更新DynamoDB中定義的任務。任務定義包括名稱,非獨立文件的列表,它們的狀態(已到達還是未到達)和在EMR中運行該任務所需的參數。一旦一個特定任務所需的所有文件都到達了,lambda函數就會更新任務隊列,在EMR中啟動一個集群。EMR將結果再推回到S3以便使用S3的應用程序在需要該結果時可以提取到它們。
配置任務
系統的核心時任務。任務對象保存了所有的信息,這些信息是確定數據依賴關系,依賴關系狀態和待發生的處理結果所必需的。通過定義任務,你可以配置所有需要發生的work。從任務表中,可以輕松地看到所有的數據依賴關系。
建立S3事件與任務之間的映射
當我們從S3獲取事件時,AWS Lambda中引發的事件只有關于S3中被修改的對象的上下文。從S3中直接獲取的數據沒有關于這個任務的任何信息。但在Node.js這條只給出了一行代碼的信息中,我們確實得到了一條很珍貴的信息,那就是S3對象的新密鑰。
var srcKey = event.Records[0].s3.object.key;
從這個新密鑰,我們需要一種方式來獲取任務信息。為了達到這個目的,我們創建了FileUnit表。這一表實際上完全改變了這個任務,將S3密鑰作為打開表的范圍密鑰,而任務密鑰作為數據負載。這使我們獲取了源密鑰,通過一次DynamoDB查詢就可以算出我們擁有的任務。
從這里,我們可以更新任務,確定是否所有的依賴數據都已經到達,并啟動亞馬遜EMR。
產生的流程圖
創建DynamoDB表
我們在DynamoDB中創建如下三個表:
Task表
針對Task表,我們使用 HashKey/RangeKey Primary Key配置,以日期作為hash密鑰,以TaskKey字符串作為范圍密鑰。這個表可以使用任何名稱,只要在你所有的任務中,該名稱是唯一的即可。而針對Expedia項目,從hash密鑰的角度看,Date并不遵守時間系列數據的準則,所以你可以創建另一個可預測的hash密鑰。但是如果你的任務是以天為基礎重復的,那么Date是一個很好的hash密鑰的選擇,因為它便于后面的搜索。
這里只是一個樣例條目:
在創建表格時,我們只需關心TaskKey和Date參數。但是輸入文件(要注意這些文件在S3上的路徑)和ScriptParameters對整個系統的運行是必需的。該任務在控制臺創建。在實際操作中,這些配置信息應該在數據文件被加載前以設定的頻率從某個文件中加載。
FileUnit表
FileUnit是對使用S3路徑的Task表格的一個引用。它有三個屬性:
Date(作為hash密鑰)
Filename(作為范圍密鑰)–指明Filename文件在S3上的路徑
Task–待引用的任務
在所有的實踐中,Date并不是FileUnit表格的必須屬性。實際上,如果你能避免這一屬性而且不影響任務的運行,那會是更好的選擇,但是這一屬性卻能更好地支持我們的描述。如果你的任務名并不是以天為基礎重復,使用S3路徑作為hash密鑰,使用任務名稱作為范圍密鑰會是更好的選擇。這可以使你在根據hash密鑰查詢的同時,管理那些在多個任務之間相互依賴的數據集變得更簡單。
Batch表
應該創建Batch表格,并以Date屬性作為hash密鑰,以Task屬性為范圍密鑰。Task的值將會與Task表格中的TaskKey的值相同。為了便于查詢,我們也為Batch表格添加了全局備用索引,并以日期為hash密鑰,以ProcessingState為范圍密鑰。這幫助我們輕松地查詢未處理的項目。
測試數據
為了測試,在Task表格中創建一個與上面類似的條目。確保使用屬性名稱所指定的輸入路徑,將它們的值設為NULL。下一步,獲取這些輸入路徑,在FileUnit表格中創建條目。路徑名稱必須完全匹配Filename列的值(包括大小寫)。FileUnit表中Task的值必須匹配任務的TaskKey值。要使用上面的Task樣例,你將創建如下三個FileUnit條目:
表創建完畢,測試數據加載完畢后,我們可以實施AWS Lambda函數。
寫AWS Lambda函數
代碼框架
代碼框架與上面展示的流程圖很相似。
我們將會粉飾代碼,將已完成的項目添加到Batch表中因為該代碼與上面的updateTask函數非常相似。但我們確實是使用putItem函數而非updateItem函數。
啟動EMR
在EMR中啟動任務是很簡單的。我們啟動一個EMR集群,然后添加一個工作流程步驟。有很多的配置代碼,但是實質是簡單的。你必須安裝合適的應用來完成你的工作,在本案例中,你必須安裝Hive,所以你在啟動EMR時會看到工作流程步驟被添加到EMR中。
從這里,我們通過使用腳本參數調用addJobFlowSteps函數,將處理任務添加到工作流程中。有一個很小的轉換步驟需要在這里進行。你可以在GitHub知識庫中找到該轉換代碼。
部署AWS Lambda函數
為了在AWS Lambda函數中部署該應用,你需要:
1.從GitHub下載源代碼。
2.使用 npminstall工具來安 裝async所依賴的部件。
3.在FunctionConstants.js文件中更新logsPath的值,使其指向某一個桶,并在你希望EMR放置日志文件的路徑前加前綴。
4.將函數打包,并部署該函數,如本例或演練步驟中所示。
5.確保你的Lambda Execution IAM角色有以下權限:
a.在DynamoDB中–調用getItem,updateItem和putItem函數的權限
b.在EMR中–調用startJobFlow和addJobFlowItem函數的權限
在控制臺上進行快速測試
確保已正確配置了所有參數,你可以在AWS控制臺上打開AWS Lambda函數的Edit/Test頁面,模擬添加到S3的新文件:
使用S3樣本事件,修改s3部分和object部分的參數值,觸發模擬任務中文件的事件。
在Execution結果窗口你應該可以看到消息Files processed successfully(消息處理成功)。
將S3桶事件發布到AWS Lambda函數
從控制臺中,從Actions菜單中選擇Add event source選項,將S3桶中的Object Created事件添加到AWS Lambda函數中,將關于事件源的信息設置為S3桶:
端對端測試
既然一切都已準備就緒,你可以在S3桶中創建新文件了。你應該看到與如下截屏所類似的信息:
如果EMR集群還未啟動,你可以查看該功能所產生的CloudWatch日志,確定問題所在。你也可以通過控制臺模擬所有的文件達到情況來確定功能收到錯誤信息的部位。
祝賀你,你已經建立了一個工作系統!
優化
有若干方法可以優化該系統,取決于你的使用場景。下面是一些優化案例。
一個文件對多個任務
如果多個任務以來同一個文件,你必須適當地調整FileUnit表,然后調整任務查詢相關參數來處理結構的變化。你可以使用上面描述的格式(文件名作hash密鑰)或者你可以保留格式但是將Task表中條目的值設為一組值而非一個值。
清掃任務
如果你的任務很小,你預期數據以某一頻率到達,你可以調整批處理的任務大小到大于1 (該值是在配置文件中配置的)。如果你這樣設置了,你可能想要添加一個清掃功能,使其以定時器規定的時間來清除可能還未運行的任務。以這種方式,你獲取了效率,待處理的任務在運行前不用等待太長時間來滿足批處理的個數限制