데이터 엔지니어링 프로젝트 파트 2 - Snowflake에 JSON 삽입 및 구문 분석
목차
- 데이터 엔지니어링 프로젝트 개요
- 데이터 추출 및 적재
- 기본적인 Json 파일 파싱
- Snowflake를 활용한 데이터 처리
- 시장과 계약 정보 저장을 위한 테이블 구성
- 시간에 따른 데이터 추적
- Snowflake의 객체 활용
- Stage 사용하기
- Task 활용하기
- Storage Integration 이해하기
- 스테이지 테이블 구성과 데이터 파싱
- 초기 데이터 로드
- 데이터 파싱을 통한 테이블 구성
- 작업 자동화를 위한 Task 설정
- Snowflake Task 생성하기
- 의존성 설정과 자동 실행
- 마무리 및 추가 작업
- 미래 작업에 대한 준비
- 데이터 활용을 위한 대시보드 생성
📊 데이터 엔지니어링 프로젝트 개요
이번 비디오에서는 데이터 엔지니어링 프로젝트의 두 번째 부분으로, 이전 비디오에서 수집한 데이터를 임베딩하는 작업을 진행합니다. 이번 비디오는 주로 사전 처리에 초점을 맞추어 진행됩니다. 사전 비디오를 보지 않으셨다면 추천드리지만, 간단히 복습하면 이번 비디오를 이해하는 데 무리가 없을 것입니다. AWS에서 Apache Airflow용 관리 워크플로우(MWAA)를 설정하여 데이터를 추출하고, S3에 적재하는 과정을 마련했습니다. 그럼 시작해보겠습니다.
🚀 데이터 추출 및 적재
실제 코드에 들어가기 전에 이번 작업의 목표에 대해 다시 한번 상기해봅시다. 이번 작업에서 추출한 데이터는 시장과 계약이라는 두 가지 다른 개체로 나뉩니다. 시장은 상위 정보를 나타내고, 각 시장에는 여러 개의 계약이 있습니다. 이 정보를 정리하기 위해 두 개의 테이블을 생성할 계획입니다. 계약 정보는 시간에 따라 추적해야 하므로, 일일 기준으로 변경되는 내용을 확인할 수 있어야 합니다. 이를 위해 시장 테이블과 계약 테이블을 생성하고, 시간에 따라 분석할 수 있도록 준비합니다. 이 작업을 위해 Snowflake에는 특별한 객체들을 사용합니다. Snowflake 스테이지는 저장소 통합을 통해 데이터 소스를 참조할 수 있게 해주는 개체로, 보안 측면에서 중요한 역할을 합니다. 이러한 스테이지와 관련된 내용에 대해서는 나중에 자세히 다루겠습니다. 또한, 작업(Task)와 통합(Integration)을 활용하여 작업을 자동화할 것입니다.
🔍 기본적인 Json 파일 파싱
데이터의 구체적인 내용에 들어가기 전에, 데이터를 어떻게 처리할 것인지에 대해 다시 한번 생각해볼 필요가 있습니다. 데이터를 파싱하여 테이블에 넣을 것입니다. 이를 위해 먼저 원시 예측 데이터를 저장할 테이블을 생성하겠습니다. 다음 명령문을 실행하여 데이터를 테이블에 저장할 수 있습니다. Snowflake는 데이터를 적재할 때 상세한 정보를 제공해주는데, 이를 통해 어떤 파일이 로드되었는지, 에러가 있는지, 얼마나 많은 행이 로드되었는지 등을 확인할 수 있습니다.
🌨️ Snowflake를 활용한 데이터 처리
데이터 적재가 수동으로 이루어지는 것은 좋지 않습니다. 이를 자동화하기 위해 Snowflake Task를 활용할 수 있습니다. Snowflake Task는 간단하게 설정할 수 있으며, 주기적으로 실행되도록 설정할 수 있습니다. 예를 들어, "매일 오전 2시에 이 작업을 실행"과 같이 설정할 수 있습니다. 이를 설정하기 위해 별도의 작업 스케줄링 도구를 사용하지 않고 Snowflake Task를 활용하는 것이 간편합니다. 그러나 복잡한 의존성 및 재실행이 필요한 작업들과 같은 경우에는 다른 오케스트레이션 시스템을 고려해야 합니다.
📂 스테이지 테이블 구성과 데이터 파싱
먼저 파싱 작업을 위해 스테이지 테이블을 만들고, JSON 파일에서 필요한 정보를 추출할 것입니다. 이를 위해 JSON의 특정 부분을 파싱하여 시장 정보를 하나씩 추출할 것입니다. 이는 중첩된 JSON 파일을 처리하는 데 유용한 기능입니다. 시장의 각 항목에 대해 ID, 이름, 짧은 이름 및 URL과 같은 정보를 추출할 것입니다. 이 테이블에는 새로운 데이터만 적재해야 하므로, 스테이징 테이블과 LEFT JOIN하여 이미 적재된 데이터는 제외할 것입니다.
⚙️ 작업 자동화를 위한 Task 설정
Snowflake Task를 활용하여 데이터 적재 작업을 자동화할 수 있습니다. 기존 Task를 활용하여 다음 작업을 만들 수 있습니다. 작업 간의 의존성도 설정할 수 있으며, 이전 작업의 수행 완료 여부에 따라 다음 작업이 자동으로 실행됩니다. 의존성을 추가하여 간단한 작업 실행 흐름을 구성할 수 있습니다.
🔚 마무리 및 추가 작업
작업 자동화를 통해 데이터가 테이블에 적재되었습니다. 이제 이를 활용하여 추가적인 분석이나 대시보드 생성 작업을 진행할 수 있습니다. 추가 작업을 위한 준비를 마치고, Tableau와 같은 도구를 활용하여 대시보드를 생성할 수 있습니다.
👉 강조할 점
- 데이터 엔지니어링 프로젝트의 두 번째 부분으로, 데이터 추출과 적재, 그리고 기본적인 Json 파일 파싱 작업을 다룹니다.
- Snowflake를 활용하여 데이터를 처리하며, Stage, Task, Storage Integration 등의 객체를 사용합니다.
- 스테이지 테이블을 구성하고, JSON 파일을 파싱하여 시장 정보와 계약 정보를 추출합니다.
- Task를 설정하여 데이터 적재 작업을 자동화하며, 작업 간의 의존성 설정이 가능합니다.
- 추가 작업을 위한 준비를 마치고, 대시보드 생성 등의 작업을 수행할 수 있습니다.
❓ 자주 묻는 질문
-
Snowflake Task 외에도 다른 작업 스케줄링 도구를 사용할 수 있나요?
- 네, Snowflake Task가 아닌 다른 작업 스케줄링 도구(예: Apache Airflow)를 사용할 수도 있습니다. 각 도구의 장단점과 사용환경에 따라 선택하시면 됩니다.
-
Snowflake Task의 장점은 무엇인가요?
- Snowflake Task는 설정이 간편하고 사용하기 쉬우며, 기본적인 작업 자동화에 효과적입니다. 크론(Cron) 스케줄링과 유사한 방식으로 작업 실행 주기를 설정할 수 있습니다.
-
Snowflake의 Stage와 Storage Integration은 어떤 역할을 하나요?
- Stage는 데이터 소스를 참조하여 데이터를 추출하거나 적재할 수 있는 기능입니다. Storage Integration은 Stage의 보안 측면을 담당하여 데이터에 대한 접근 권한을 관리합니다.
-
데이터 파싱 작업에서 중첩된 JSON을 처리하려면 어떻게 해야 하나요?
- Snowflake의
LATERAL FLATTENED
구문을 사용하여 중첩된 JSON을 처리할 수 있습니다. 특정 부분을 파싱해야 하는 경우 이 구문을 활용하면 중첩된 JSON 내부의 원하는 데이터에 접근할 수 있습니다.
-
Snowflake의 작업 의존성 설정은 어떻게 수행하나요?
- Snowflake의 작업은 의존하는 이전 작업에 대한 설정이 가능합니다. 이전 작업 실행여부에 따라 다음 작업이 자동으로 실행됩니다. 각 작업 간의 의존성을 설정하여 원하는 작업 실행 흐름을 구성할 수 있습니다.
🌐 참고 자료