用Snowflake将JSON插入和解析的数据工程项目2

Find AI Tools
No difficulty
No complicated process
Find ai tools

用Snowflake将JSON插入和解析的数据工程项目2

目录

1. 介绍

  • 什么是数据工程项目
  • 本视频的主题

2. 数据提取和插入

  • 从PredictIt提取数据
  • 将数据插入Snowflake

3. 数据预处理

  • Json文件的基本解析
  • 分析市场和合约信息

4. 使用Snowflake对象

  • 利用Snowflake中的Stage、Tasks和Integrations
  • 创建Stage和Storage Integration
  • 使用Tasks自动化任务

5. 自动化文件加载

  • 创建并运行Tasks
  • 检查Tasks运行状态

6. 数据处理和插入

  • 创建Raw Predicted Data表
  • 解析Json文件中的数据
  • 插入数据到Stage表

7. 设置任务依赖

  • 设置Tasks之间的依赖性
  • 重启任务以实现数据插入

8. 总结和下一步计划

  • 对已插入的数据进行分析和查询
  • 创建基本仪表板

介绍

首先,欢迎大家回到本视频,我是Ben Rogan,也被称为Seattle数据人。今天我们将继续进行数据工程项目的第二部分视频。如果你还记得,上一期视频中,我们从PredictIt提取了数据,展示了他们设立的各种市场。现在,我们将采取这些数据,并将其插入到Snowflake中。本视频主要关注数据预处理步骤,如果你还没有观看上一期视频,我建议你去看一下。

数据提取和插入

在我们深入进一步的代码之前,让我们先记住我们要做什么。假设我们提取的数据中存在两种不同的实体,即市场和合约。市场是父级信息,每个市场都有多个合约。我们希望将这些信息分解为市场和合约两个表。对于合约信息,我们实际上会跟踪时间的变化,因为我们想要了解每天的变化,这可能会变得非常有趣。为了做到这一点,我们将创建两个表格,一个市场表和一个合约表,并在之后的时间内对其进行分析。

为了实现这一目标,我们将使用Snowflake的一些独特对象,其中一些在Snowflake中非常独特。例如,我们将使用Stage、Tasks和Integrations来处理数据和安全性的问题。我们将引用集成,并通过引用存储集成来引用数据源。这种存储集成是处理安全性方面的一种方式。关于如何执行此操作的更多信息,请参阅下方的链接。

在这一步骤中,我们将使用Stage引用的S3文件夹。这些文件包含我们最近几天提取的特定文件。数据在这些文件中是非结构化的,这是我们之后要处理的任务。我们将解析这些信息并将其放入表格中,然后使用Tasks自动化这一过程。首先,让我们创建原始预测数据表,并将数据存储在其中。

Create TABLE raw_predicted_data AS
  COPY INTO 's3://your-bucket-name/your-folder-name/'
  FILE_FORMAT = (FORMAT_NAME = 'your-format-name')
  PURGE = TRUE;

这将从S3存储桶中的指定路径加载数据到Raw Predicted Data表中。

然而,这个过程并不是自动化的。如果我们想要自动加载数据,有很多不同的方式可以实现。在本视频中,我们将使用Tasks来实现。Snowflake的Tasks非常易于使用,只需运行相同的COPY INTO语句,并为任务指定名称、仓库和运行时间。这将在每天的2点执行任务。

数据预处理

在我们深入代码之前,让我们回顾一下我们要做的目标。您可能记得,我们提取的数据包含两种实体:市场和合约。市场是父级信息,每个市场都有多个合约。我们的目标是将这些数据解析并存储在相关的表格中,以便我们可以对其进行进一步的分析。

我们将使用Snowflake的lateral flatten子句对Json数据进行解析。此子句将对Json数据进行展平,以便我们可以按行访问特定部分。在我们的情况下,我们将解析市场部分。通过将Json数据展平并分解为每个特定市场的行。这样,我们可以进一步解析每个市场的ID、名称、短名称和URL。

为了避免重复数据,我们只想向表格中插入那些与之前不同的市场信息。为了实现这一点,我们将使用LEFT JOIN到Stage表格,并且只选择那些在Stage表中不存在的数据。这样我们可以确保只插入新的市场信息而不会重复。

INSERT INTO market_table (market_id, market_name, market_short_name, market_url)
SELECT DISTINCT market_id, market_name, market_short_name, market_url
FROM raw_predicted_data
LEFT JOIN stage_table ON raw_predicted_data.market_id = stage_table.market_id
WHERE stage_table.market_id IS NULL;

通过运行上述语句,我们将从Raw Predicted Data表中选择并插入新的市场信息到Market表中。

使用Snowflake对象

在处理数据工程项目时,Snowflake提供了许多有用的对象和功能。在这一部分,我们将详细介绍一些对象,并说明如何使用它们。

首先,我们将使用Stage对象引用数据源。Stage是一种在Snowflake中引用数据源的方式。我们可以在Stage对象中引用S3存储桶中的特定文件夹。例如,我们可以创建一个Stage对象,指向存储桶中的特定文件夹。这样,我们可以轻松地将来自存储桶的数据加载到Snowflake中。

CREATE STAGE your_stage_name
URL = 's3://your-bucket-name/your-folder-name';

接下来,我们可以使用Tasks对象来自动化工作流程。Task是一个独立的工作单元,可以在Snowflake中执行。我们可以为任务指定名称、仓库和运行时间。例如,我们可以创建一个名为"your-task-name"的任务,指定仓库和每天运行时间。

CREATE TASK your_task_name
WAREHOUSE = your_warehouse_name
SCHEDULE = 'USING CRON 0 2 * * *';

最后,我们可以使用Integrations对象来管理存储集成。存储集成是用于处理数据安全性的一种方式。通过引用存储集成,在Snowflake中使用API密钥来处理安全性问题。例如,我们可以引用存储集成,并将其与Stage和Task对象一起使用。

CREATE STORAGE INTEGRATION your_integration_name
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = S3
  ENABLED = TRUE
  STORAGE_AWS_ROLE_ARN = 'your_aws_role_arn';

通过上述步骤,我们可以利用Snowflake的Stage、Tasks和Integrations对象来处理数据和确保安全性。

自动化文件加载

在数据工程项目中,自动化文件加载是非常重要的一步。在这一部分,我们将详细介绍如何使用Task对象来自动加载数据。

首先,我们要创建和运行我们的Tasks。我们可以使用CREATE TASK语句来创建任务,并使用ALTER TASK语句来启动任务。例如,我们可以创建一个名为"load_file_task"的任务,指定仓库和运行时间。

CREATE TASK load_file_task
WAREHOUSE = your_warehouse_name
SCHEDULE = 'USING CRON 0 2 * * *';

为了启动任务,我们可以使用ALTER TASK语句,并指定任务名称和状态。

ALTER TASK load_file_task RESUME;

通过运行上述语句,我们可以创建和启动Tasks,自动加载文件数据。

数据处理和插入

在数据工程项目中,数据处理和插入是非常重要的一步。在这一部分,我们将详细介绍如何处理数据并将其插入到相关表格中。

首先,我们要创建一个Raw Predicted Data表,用于存储原始数据。我们可以使用CREATE TABLE语句创建表格,并使用COPY INTO语句将数据加载到表格中。

CREATE TABLE raw_predicted_data AS
  COPY INTO 's3://your-bucket-name/your-folder-name/'
  FILE_FORMAT = (FORMAT_NAME = 'your-format-name')
  PURGE = TRUE;

通过运行上述语句,我们将数据加载到Raw Predicted Data表中。

接下来,我们将根据需要解析Json文件中的数据,并将其插入到Stage表中。我们可以使用INSERT INTO SELECT语句来选择特定的Json字段,并将其插入到Stage表中。

INSERT INTO stage_table (field1, field2, field3)
SELECT json_field1, json_field2, json_field3
FROM raw_predicted_data
WHERE condition1 = 'your-condition';

通过运行上述语句,我们可以将Json字段的数据解析并插入到Stage表中。

最后,我们可以使用Tasks对象来实现数据的自动加载和处理。我们可以创建一个新的任务,将其与前一个任务相关联,然后重新启动任务。

CREATE TASK process_data_task
WAREHOUSE = your_warehouse_name
SCHEDULE = 'USING RECURRING WITH START_TIME = YOUR_START_TIME'
AFTER LOAD_TASK;

通过运行上述语句,我们可以创建一个新的任务,并设置其与前一个任务的依赖关系。然后,我们可以重新启动任务以实现数据的插入和处理。

设置任务依赖

在数据工程项目中,任务之间的依赖关系非常重要。在这一部分,我们将详细介绍如何设置任务之间的依赖关系。

首先,我们要设置任务之间的依赖关系。我们可以使用ALTER TASK语句,并使用AFTER子句来设置任务之间的依赖关系。

ALTER TASK your_task_name
AFTER load_task_name;

通过运行上述语句,我们可以创建任务之间的依赖关系。

其次,我们要重新启动任务以实现数据的插入和处理。我们可以使用ALTER TASK语句,并使用RESUME子句来重新启动任务。

ALTER TASK your_task_name
RESUME;

通过运行上述语句,我们可以重新启动任务以实现数据的插入和处理。

总结和下一步计划

在本视频中,我们讨论了如何进行数据工程项目,并使用Snowflake来处理和插入数据。我们通过使用Stage、Tasks和Integrations对象,自动加载文件数据,并使用Task对象设置任务之间的依赖关系。接下来,我们将利用这些数据,并使用类似Tableau的工具创建基本仪表板。

感谢大家观看本视频,希望对数据工程项目有所帮助。如果您有任何问题,请随时向我咨询。下次再见,谢谢大家!

Most people like

Are you spending too much time looking for ai tools?
App rating
4.9
AI Tools
100k+
Trusted Users
5000+
WHY YOU SHOULD CHOOSE TOOLIFY

TOOLIFY is the best ai tool source.