Azure Databricks大數據構建營 小試牛刀,大數據基礎組件Azure Databricks大數據構建營 小試牛刀看過美劇《絕命毒師》(Breaking Bad)的童鞋,恐怕都會對「老白」的化學知識和運用這些知識的能力留下深刻印象。畢竟,生活中一些常見的物品,通過對它們的成分進行一定組合往往能制造出很「不......
看過美劇《絕命毒師》(Breaking Bad)的童鞋,恐怕都會對「老白」的化學知識和運用這些知識的能力留下深刻印象。畢竟,生活中一些常見的物品,通過對它們的成分進行一定組合往往能制造出很「不一般」的東西。
所以接下來你面對了一個新需求,老板要你實現這樣的功能:一家連鎖藥店,希望通過抽樣的方式檢查止咳糖漿的銷量,如果發現有人一次性購買10瓶以上就進行實時告警。你準備怎么做?
基于Azure Databricks的流計算就可以輕松實現。一起來看看吧。
本次《Azure Databricks大數據構建營》系列文章,將主要圍繞Azure Databricks以及其他配套服務,通過一系列實戰教程告訴大家如何基于Azure云打造完全運行在云端的閉環大數據平臺。
內容涵蓋:Azure Databricks的基礎知識,以及如何借助Azure Databricks實現流計算(Structure Streaming)、批處理(Spark SQL)、安全訪問控制以及機器學習能力等。
本系列的第一篇文章概括介紹了有關Azure Databricks的基礎知識,錯過的童鞋可以點擊這里回看。而本篇,將帶領大家小試牛刀,基于Azure Databricks開發上文提到的那種實時告警系統。準備好,我們這就開始。
流計算架構說明
本篇涉及到的服務全都部署在中國區Azure上。根據模擬場景的描述,首先我們可以設計出如下所示的架構:
該解決方案將用到如下的產品和服務:
Azure虛擬機:作為數據源,扮演Producer生產者,通過Python代碼模擬客戶購買行為,生成示例數據并通過SDK發快遞數據到Azure EventHub。
Azure EventHub:消息隊列,做上下游生產者消費者服務的解耦,Entity ingestion負責接收Producer發快遞的數據,Entity alerting接收經過Databricks實時計算后的數據。
Azure Databricks:訂閱EventHub Entity ingestion作為數據源,通過Structure Streaming對數據進行實時處理后發快遞給Entity alerting。
Azure LogicApp:訂閱EventHub Entity alerting并做郵件實時告警。
完整過程中的數據流是這樣的:
1.Producer生產者發快遞數據
2.EventHub Entity ingestion(Ingestion實體)
3.Databricks Structured Streaming(流計算框架)
4.EventHub Entity alerting(Alerting實體)
5.Logic App
Azure Databrick Structure Streaming的實現
1.Terraform自動化部署
通過Terraform部署的服務組件包括Azure虛擬機、Azure Databricks、EventHub、Logic App,具體的tf文件和變量可參見這里。每項服務Terraform Azure Provider都由Resource支持,具體可參考官方文檔。部署完成后的資源清單如下圖所示,所有資源都部署在中國北二區域。
2.Producer代碼發布
模擬的生產者代碼通過虛擬機發布,通過調用Azure EventHub的SDK將消息寫入,具體代碼可見這里,幾個重要配置簡單說明下:
azure.eventhub:Azure EventHub SDK包,需要通過pip3 install azure.eventhub來指定安裝。
create_batch():通過該方法批量發快遞數據,本次示例以1條消息為1個批次發國際快遞EventHub Entity ingestion。
CONNECTION_STR:Azure EventHub Endpoint,該連接字符串可以在門戶上Shared access policies的Connection string–primary key中查看。
EVENTHUB_NAME:寫入的EventHub Entity Name。
3.Azure Databricks集群配置及Structure Streaming Notebook的集成
Azure Databricks的創建過程是:首先在Azure上創建一個Databricks實體,然后在此基礎上在實體內部創建Workspace以及Cluster,再提交Job。每個Databricks資源都有唯一的ID和Endpoint與之對應,以便能夠進行Restful API調用,集群通過Databricks門戶創建即可。
本示例將創建1個Driver和2個Worker,共計3個節點的Standard Cluster,Databricks版本為6.4(包含Apache Spark 2.4.5、Scala 2.11)。如果需要做機器學習相關計算,可啟用集成GPU/ML框架的版本,詳細說明見官方文檔,這里不做贅述。
集群狀態變為Running就意味著就緒可以使用了。不過在導入Python Notebook之前,需要通過Maven Central安裝com.microsoft.azure:azureeventhubsspark庫文件,以便安裝Spark連接Azure EventHub Connector,需要注意庫文件的版本要匹配。
Notebook可以直接在門戶里新建寫入,也可以在VS Code等IDE中編寫完之后發布。本文采用第二種模式,原因是IDE豐富的插件可以提高效率。具體的Notebook本文不做展示,放在這里,有需要的童鞋可以自行查看。通過import導入后,附上導入后的截圖并做幾點說明:
整個Notebook分為三個階段:
第一階段:從EventHub Entity ingestion讀取Producer寫入的數據,通過Streaming DataFrames的spark.readStream()創建。
第二階段:通過DataFrame豐富的函數做字段篩選,篩選出來我們需要的字段。
第三階段:回寫EventHub Entity alerting,通過Streaming DataFrames的spark.writeStream()流式寫入,注意利用Checkpoint方便任務終止再運行。
當Producer運行起來后,EventHub就會不斷有數據寫入,所以能看到Spark的Input Records圖像。對于每一個Job,都能看到對于該任務分配的資源和Spark參數配置項。
4.Logic APP配置郵件告警
經過Azure Databricks的數據篩選后,篩選出來的Messages都寫入了EventHub Entity alerting中,此時通過LogicApp來定義一個自動化的工作流來進行郵件告警。具體創建過程選擇Blank,然后自己創建Step即可,當然Azure門戶上的示例模板也可以用來參考,如下圖所示:
第一步訂閱EventHub Entity alerting,第二步集成Outlook郵件接口發快遞告警郵件。所以當目標消息被篩選出來之后,LogicApp就按照定義的郵件內容(本文是消息內容和時間戳)來發快遞郵件,發快遞郵件的截圖如下:
總結
總體上,一個通過消息隊列Azure EventHub以及Databricks做流計算處理的示例就完成了。
如果消息生產者Producer不斷產生消息,那么整個任務就會一直運行下去,當出現目標消息的時候就會不斷的持續告警。在Spark推出Structure Streaming后,也解決了Spark Streaming micro batch的局限性。
本文開頭提及的需求已經順利實現!
有關如何借助Azure Databricks實現流計算(Structure Streaming)的內容就是這些了。隨后的內容中,我們還將介紹如何實現批處理(Spark SQL)、安全訪問控制和機器學習能力。通過這些真實場景中的最佳實踐分享,也能幫助大家更清楚地認識到Azure Databricks的價值。敬請期待!
特別聲明:以上文章內容僅代表作者本人觀點,不代表ESG跨境電商觀點或立場。如有關于作品內容、版權或其它問題請于作品發表后的30日內與ESG跨境電商聯系。
二維碼加載中...
使用微信掃一掃登錄
使用賬號密碼登錄
平臺顧問
微信掃一掃
馬上聯系在線顧問
小程序
ESG跨境小程序
手機入駐更便捷
返回頂部