用Snowflake将JSON插入和解析的数据工程项目2
目录
1. 介绍
2. 数据提取和插入
- 从PredictIt提取数据
- 将数据插入Snowflake
3. 数据预处理
4. 使用Snowflake对象
- 利用Snowflake中的Stage、Tasks和Integrations
- 创建Stage和Storage Integration
- 使用Tasks自动化任务
5. 自动化文件加载
6. 数据处理和插入
- 创建Raw Predicted Data表
- 解析Json文件中的数据
- 插入数据到Stage表
7. 设置任务依赖
- 设置Tasks之间的依赖性
- 重启任务以实现数据插入
8. 总结和下一步计划
介绍
首先,欢迎大家回到本视频,我是Ben Rogan,也被称为Seattle数据人。今天我们将继续进行数据工程项目的第二部分视频。如果你还记得,上一期视频中,我们从PredictIt提取了数据,展示了他们设立的各种市场。现在,我们将采取这些数据,并将其插入到Snowflake中。本视频主要关注数据预处理步骤,如果你还没有观看上一期视频,我建议你去看一下。
数据提取和插入
在我们深入进一步的代码之前,让我们先记住我们要做什么。假设我们提取的数据中存在两种不同的实体,即市场和合约。市场是父级信息,每个市场都有多个合约。我们希望将这些信息分解为市场和合约两个表。对于合约信息,我们实际上会跟踪时间的变化,因为我们想要了解每天的变化,这可能会变得非常有趣。为了做到这一点,我们将创建两个表格,一个市场表和一个合约表,并在之后的时间内对其进行分析。
为了实现这一目标,我们将使用Snowflake的一些独特对象,其中一些在Snowflake中非常独特。例如,我们将使用Stage、Tasks和Integrations来处理数据和安全性的问题。我们将引用集成,并通过引用存储集成来引用数据源。这种存储集成是处理安全性方面的一种方式。关于如何执行此操作的更多信息,请参阅下方的链接。
在这一步骤中,我们将使用Stage引用的S3文件夹。这些文件包含我们最近几天提取的特定文件。数据在这些文件中是非结构化的,这是我们之后要处理的任务。我们将解析这些信息并将其放入表格中,然后使用Tasks自动化这一过程。首先,让我们创建原始预测数据表,并将数据存储在其中。
Create TABLE raw_predicted_data AS
COPY INTO 's3://your-bucket-name/your-folder-name/'
FILE_FORMAT = (FORMAT_NAME = 'your-format-name')
PURGE = TRUE;
这将从S3存储桶中的指定路径加载数据到Raw Predicted Data表中。
然而,这个过程并不是自动化的。如果我们想要自动加载数据,有很多不同的方式可以实现。在本视频中,我们将使用Tasks来实现。Snowflake的Tasks非常易于使用,只需运行相同的COPY INTO语句,并为任务指定名称、仓库和运行时间。这将在每天的2点执行任务。
数据预处理
在我们深入代码之前,让我们回顾一下我们要做的目标。您可能记得,我们提取的数据包含两种实体:市场和合约。市场是父级信息,每个市场都有多个合约。我们的目标是将这些数据解析并存储在相关的表格中,以便我们可以对其进行进一步的分析。
我们将使用Snowflake的lateral flatten子句对Json数据进行解析。此子句将对Json数据进行展平,以便我们可以按行访问特定部分。在我们的情况下,我们将解析市场部分。通过将Json数据展平并分解为每个特定市场的行。这样,我们可以进一步解析每个市场的ID、名称、短名称和URL。
为了避免重复数据,我们只想向表格中插入那些与之前不同的市场信息。为了实现这一点,我们将使用LEFT JOIN到Stage表格,并且只选择那些在Stage表中不存在的数据。这样我们可以确保只插入新的市场信息而不会重复。
INSERT INTO market_table (market_id, market_name, market_short_name, market_url)
SELECT DISTINCT market_id, market_name, market_short_name, market_url
FROM raw_predicted_data
LEFT JOIN stage_table ON raw_predicted_data.market_id = stage_table.market_id
WHERE stage_table.market_id IS NULL;
通过运行上述语句,我们将从Raw Predicted Data表中选择并插入新的市场信息到Market表中。
使用Snowflake对象
在处理数据工程项目时,Snowflake提供了许多有用的对象和功能。在这一部分,我们将详细介绍一些对象,并说明如何使用它们。
首先,我们将使用Stage对象引用数据源。Stage是一种在Snowflake中引用数据源的方式。我们可以在Stage对象中引用S3存储桶中的特定文件夹。例如,我们可以创建一个Stage对象,指向存储桶中的特定文件夹。这样,我们可以轻松地将来自存储桶的数据加载到Snowflake中。
CREATE STAGE your_stage_name
URL = 's3://your-bucket-name/your-folder-name';
接下来,我们可以使用Tasks对象来自动化工作流程。Task是一个独立的工作单元,可以在Snowflake中执行。我们可以为任务指定名称、仓库和运行时间。例如,我们可以创建一个名为"your-task-name"的任务,指定仓库和每天运行时间。
CREATE TASK your_task_name
WAREHOUSE = your_warehouse_name
SCHEDULE = 'USING CRON 0 2 * * *';
最后,我们可以使用Integrations对象来管理存储集成。存储集成是用于处理数据安全性的一种方式。通过引用存储集成,在Snowflake中使用API密钥来处理安全性问题。例如,我们可以引用存储集成,并将其与Stage和Task对象一起使用。
CREATE STORAGE INTEGRATION your_integration_name
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = S3
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = 'your_aws_role_arn';
通过上述步骤,我们可以利用Snowflake的Stage、Tasks和Integrations对象来处理数据和确保安全性。
自动化文件加载
在数据工程项目中,自动化文件加载是非常重要的一步。在这一部分,我们将详细介绍如何使用Task对象来自动加载数据。
首先,我们要创建和运行我们的Tasks。我们可以使用CREATE TASK语句来创建任务,并使用ALTER TASK语句来启动任务。例如,我们可以创建一个名为"load_file_task"的任务,指定仓库和运行时间。
CREATE TASK load_file_task
WAREHOUSE = your_warehouse_name
SCHEDULE = 'USING CRON 0 2 * * *';
为了启动任务,我们可以使用ALTER TASK语句,并指定任务名称和状态。
ALTER TASK load_file_task RESUME;
通过运行上述语句,我们可以创建和启动Tasks,自动加载文件数据。
数据处理和插入
在数据工程项目中,数据处理和插入是非常重要的一步。在这一部分,我们将详细介绍如何处理数据并将其插入到相关表格中。
首先,我们要创建一个Raw Predicted Data表,用于存储原始数据。我们可以使用CREATE TABLE语句创建表格,并使用COPY INTO语句将数据加载到表格中。
CREATE TABLE raw_predicted_data AS
COPY INTO 's3://your-bucket-name/your-folder-name/'
FILE_FORMAT = (FORMAT_NAME = 'your-format-name')
PURGE = TRUE;
通过运行上述语句,我们将数据加载到Raw Predicted Data表中。
接下来,我们将根据需要解析Json文件中的数据,并将其插入到Stage表中。我们可以使用INSERT INTO SELECT语句来选择特定的Json字段,并将其插入到Stage表中。
INSERT INTO stage_table (field1, field2, field3)
SELECT json_field1, json_field2, json_field3
FROM raw_predicted_data
WHERE condition1 = 'your-condition';
通过运行上述语句,我们可以将Json字段的数据解析并插入到Stage表中。
最后,我们可以使用Tasks对象来实现数据的自动加载和处理。我们可以创建一个新的任务,将其与前一个任务相关联,然后重新启动任务。
CREATE TASK process_data_task
WAREHOUSE = your_warehouse_name
SCHEDULE = 'USING RECURRING WITH START_TIME = YOUR_START_TIME'
AFTER LOAD_TASK;
通过运行上述语句,我们可以创建一个新的任务,并设置其与前一个任务的依赖关系。然后,我们可以重新启动任务以实现数据的插入和处理。
设置任务依赖
在数据工程项目中,任务之间的依赖关系非常重要。在这一部分,我们将详细介绍如何设置任务之间的依赖关系。
首先,我们要设置任务之间的依赖关系。我们可以使用ALTER TASK语句,并使用AFTER子句来设置任务之间的依赖关系。
ALTER TASK your_task_name
AFTER load_task_name;
通过运行上述语句,我们可以创建任务之间的依赖关系。
其次,我们要重新启动任务以实现数据的插入和处理。我们可以使用ALTER TASK语句,并使用RESUME子句来重新启动任务。
ALTER TASK your_task_name
RESUME;
通过运行上述语句,我们可以重新启动任务以实现数据的插入和处理。
总结和下一步计划
在本视频中,我们讨论了如何进行数据工程项目,并使用Snowflake来处理和插入数据。我们通过使用Stage、Tasks和Integrations对象,自动加载文件数据,并使用Task对象设置任务之间的依赖关系。接下来,我们将利用这些数据,并使用类似Tableau的工具创建基本仪表板。
感谢大家观看本视频,希望对数据工程项目有所帮助。如果您有任何问题,请随时向我咨询。下次再见,谢谢大家!