資料工程專案 - Snowflake 中插入和解析 JSON (Part 2)
目錄
- 引言
- 預處理數據
- 建立資料表
- 數據自動加載
- 摘要
- 常見問題解答
引言
大家好,歡迎回到本系列的第二部分。在之前的視頻中,我們從 PredictIt 中提取了數據,展示了他們設置的各種預測市場。現在,我們將把這些數據插入到 Snowflake 中,然後對其進行基本的 JSON 解析。最後,我們會實際處理這些數據。本視頻將主要集中在預處理步驟上,如果你還沒有觀看上一部分的視頻,我建議你回頭看一下。基本上,我們已經在 AWS 上設置了 Apache Airflow 的托管工作流,用於提取數據並將其插入到 S3 中。讓我們開始吧!
預處理數據
在深入編碼之前,讓我們先想想我們在這裡要做的事情。如果你記得,我們提取的數據中有兩個不同的實體:市場和合約。市場基本上是父信息,每個市場都有多個合約,這就是我們要將這些信息拆分的方式,市場作為一個表,合約則為另一個表。
對於合約信息,我們將跟蹤時間變化,因為我們希望看到每天的變化,這可能會變得更有趣。
為了做到這一點,我們將創建兩個表:一個市場表和一個合約表。然後,隨著時間的推移,我們將對其進行分析。
為了實現這一點,我們將使用幾個 Snowflake 對象,其中一些是專屬於 Snowflake 的,例如:stage, tasks 和 integrations 或 storage integrations。
要清楚起見,Snowflake stage 能夠通過存儲集成來引用數據源,因此存儲集成是解決安全性問題的其中一種方式。在這裡,我們參考了 storage integration。理論上,你還可以引用你的 API 密鑰,但根據我個人認為,這通常被認為是更好的做法,因此我提及了這個存儲集成。
我已經創建了這個 stage 和存儲集成,關於如何創建這些的更多信息,我不打算深入討論,我將在下面放置一個鏈接,因為這部分可能會變得沈悶,你需要瞭解在 AWS Azure 或其他系統中的設置一些政策。
現在,我們假設這部分已經完成,我們已經創建了這個 stage,現在我們將能夠從中提取信息。
正如前面提到的,這個 stage 實際上是參考這個 S3 文件夾,我已經創建了這個文件夾,在這裡我查詢了這些信息,你可以看到它有我過去幾天提取的特定文件,以及其中的數據。
這是非結構化數據,如果你記得的話,這只是我們從 PredictIt 中提取的 JSON,每個文件都是一個大的塊,所以我們需要解析出來,這就是我們將在這裡做的。我們將解析這些信息,將其放入表格中,然後我們將使用任務來自動執行此工作。
首先,讓我們創建一個表來存儲原始預測數據。運行這個。
Create TABLE raw_predicted_data AS (
SELECT $1 AS raw_json
FROM @my_stage/my_folder
);
好的,表已經創建,現在我們將使用 COPY INTO
陳述將相同的數據存儲到這個表中。運行這個。
COPY INTO raw_predicted_data
FROM (SELECT $1:raw_json FROM @my_stage/my_folder)
FILE_FORMAT = (TYPE = 'JSON');
好的,檔案已經被加載,但有一個很大的問題。如果我再次運行這個查詢,你會注意到,沒有任何文件被處理,原因是 Snowflake 已經意識到我們已經加載了這些文件,並且已經快速地列出了相關的文檔。這個功能在過去是需要手動來處理的,需要創建一個系統來追踪加載了多少行數,是否出錯,以及是否加載了此文件,但在 Snowflake 中,這一切都已經被處理好了,現在只需要 COPY INTO
就可以了。
但這不算是自動化的,這只是我手動運行的。有很多不同的方式可以自動化這個加載過程,但現在我們只使用了任務。我非常喜歡 Snowflake 任務,因為它非常簡單,只需運行相同的語句,只是在前面加上了 CREATE TASK
,然後在其中指定了許多配置選項,例如使用的倉庫以及運行時間等等。這與 cron 很相似,只是說明在早上 2 點運行這個任務。
這是我們所需的全部。
CREATE TASK load_raw_predicted_data
WAREHOUSE = my_warehouse
SCHEDULE = 'USING TIMESTAMP 0 CRON ''0 2 * * *'''
AS
COPY INTO raw_predicted_data
FROM (SELECT $1:raw_json FROM @my_stage/my_folder)
FILE_FORMAT = (TYPE = 'JSON');
好了,任務已經創建完成,但有另外一件事需要做。如果我們在 Show TASKS
中查看任務的狀態,你會發現它們都是 SUSPENDED
,這是因為它們並沒有在運行,如果你想要實際運行它們,你需要將它們重新開啟。
所以,現在我需要執行這個。
ALTER TASK load_raw_predicted_data RESUME;
好了,任務已經重新開始運行。如果我們再次運行 SHOW TASKS
,你將看到它們的狀態已經變成了 STARTED
。
這是我們整個任務流程中的第一個塊,我們首先要運行這個任務來加載文件,現在我們需要將這個文件處理為我們可以使用的實際表格,讓我們創建兩個表格。
-- 市場表
CREATE TABLE markets (
market_id NUMBER,
market_name STRING,
url STRING
);
-- 合約表
CREATE TABLE contracts (
market_id NUMBER,
contract_id NUMBER,
contract_name STRING,
url STRING
);
好的,表格已經創建,現在讓我們使用以下代碼來解析 JSON 數據。
-- 解析 JSON 數據到市場表
INSERT INTO markets (market_id, market_name, url)
SELECT
data:value:id::NUMBER,
data:value:name::STRING,
data:value:url::STRING
FROM raw_predicted_data,
LATERAL FLATTEN(raw_json:json_parse('$.markets')) data
WHERE data:value:id NOT IN (SELECT market_id FROM markets);
-- 解析 JSON 數據到合約表
INSERT INTO contracts (market_id, contract_id, contract_name, url)
SELECT
data:value:market_id::NUMBER,
c:id::NUMBER,
c:name::STRING,
c:url::STRING
FROM raw_predicted_data,
LATERAL FLATTEN(raw_json:json_parse('$.markets[*].contracts[*]')) c
LEFT JOIN markets ON markets.market_id = data:value:market_id
WHERE markets.market_id IS NULL;
好了,現在我們已經成功地將數據插入到這兩個表中了。現在讓我們再次運行下面的代碼,這次我們將使它依賴於 load_raw_predicted_data
任務,這樣當上一個任務運行時,它將自動運行。
-- 加載市場數據
CREATE OR REPLACE TASK load_market_data
WAREHOUSE = my_warehouse
AFTER load_raw_predicted_data
AS
INSERT INTO markets (market_id, market_name, url)
SELECT
data:value:id::NUMBER,
data:value:name::STRING,
data:value:url::STRING
FROM raw_predicted_data,
LATERAL FLATTEN(raw_json:json_parse('$.markets')) data
WHERE data:value:id NOT IN (SELECT market_id FROM markets);
-- 加載合約數據
CREATE OR REPLACE TASK load_contract_data
WAREHOUSE = my_warehouse
AFTER load_market_data
AS
INSERT INTO contracts (market_id, contract_id, contract_name, url)
SELECT
data:value:market_id::NUMBER,
c:id::NUMBER,
c:name::STRING,
c:url::STRING
FROM raw_predicted_data,
LATERAL FLATTEN(raw_json:json_parse('$.markets[*].contracts[*]')) c
LEFT JOIN markets ON markets.market_id = data:value:market_id
WHERE markets.market_id IS NULL;
好了,任務已經成功創建,但是它們目前都是暫停的狀態,所以現在我需要運行以下命令,來重新開始這兩個任務。
ALTER TASK load_market_data RESUME;
ALTER TASK load_contract_data RESUME;
非常好,現在我們的任務狀態已經變為 STARTED
,這將使它們開始運行。
這就是如何將數據從原始預測數據表加載到目標市場表和合約表的過程。
摘要
在這個視頻中,我們學習了如何預處理原始的 JSON 數據,並將數據插入到 Snowflake 中的目標表中。我們使用了 Snowflake 的 stage、tasks 和 integrations 等對象,以實現自動化加載數據的過程。通過這個步驟,我們可以在未來的分析中使用這些數據。
在下一部分的視頻中,我們將利用這些表格來回答一些問題,並可能使用像 Tableau 這樣的工具創建基本的儀表板。
感謝觀看這個視頻,如果你有任何問題,請隨時與我聯繫。下次見,謝謝!
常見問題解答
問:如何創建一個 Snowflake stage?
答:要創建一個 Snowflake stage,你需要在其後面引用存儲集成,然後選擇你想要引用的數據源。你可以通過創建集成、設置策略等方式來設置相應的安全性。
問:如何使用 Snowflake 任務來自動加載數據?
答:要使用 Snowflake 任務來自動加載數據,你只需使用 CREATE TASK
陳述,並指定任務的設置,例如所使用的倉庫和運行時間。然後,在任務的內容中提供相應的 SQL 代碼,用於從源數據源中提取數據並將其插入到目標表中。
問:我如何設置任務之間的依賴關係?
答:要設置任務之間的依賴關係,你只需在任務的 AFTER
子句中指定前置任務的名稱。這樣一來,當前置任務運行完成後,後續任務將自動開始運行。
問:如何在 Snowflake 中使用 JSON 函數來解析數據?
答:在 Snowflake 中,你可以使用 JSON 函數來解析 JSON 數據。這些函數可以讓你輕鬆地從 JSON 數據中提取所需的字段或數值。
問:可以在 Snowflake 中用 SQL 查詢任務的狀態嗎?
答:是的,你可以使用 SQL 查詢腳本來獲取 Snowflake 中任務的狀態。只需運行 SHOW TASKS
陳述,它將返回所有任務的相關信息,包括狀態、計劃等。
問:有沒有更複雜的方式來自動化數據加載過程?
答:是的,Snowflake 不僅提供了任務功能,還提供了更強大的的工作流程管理功能,例如 Apache Airflow。這些工具可讓你更靈活地管理任務的依賴性、重新運行等操作。如果你對更複雜的自動化有需求,我建議你考慮使用這些工具。
資料來源: