-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfinal_project.py
135 lines (119 loc) · 4.31 KB
/
final_project.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
from datetime import datetime, timedelta
import pendulum
import os
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
from final_project_operators.stage_redshift import StageToRedshiftOperator
from final_project_operators.load_fact import LoadFactOperator
from final_project_operators.load_dimension import LoadDimensionOperator
from final_project_operators.data_quality import DataQualityOperator
from helpers.final_project_sql_statements import SqlQueries
redshift_conn_id = "redshift"
aws_credentials_id = "aws_credentials"
s3_bucket = "udacity-dend"
song_s3_key = "song_data"
events_s3_key = "log_data"
# Set default arguments
default_args = {
'owner': 'udacity',
'start_date': pendulum.now(),
'depends_on_past': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'catchup': False,
'email_on_retry': False,
}
# Create DAG
@dag(
default_args=default_args,
description='Load and transform data in Redshift with Airflow',
schedule_interval='0 * * * *',
catchup=False,
max_active_runs=1
)
def final_project():
start_operator = EmptyOperator(task_id='Begin_execution')
# Stage Operator
s3_events_to_redshift = StageToRedshiftOperator(
task_id='staging_events',
redshift_conn_id=redshift_conn_id,
aws_credentials_id=aws_credentials_id,
table="staging_events",
s3_bucket=s3_bucket,
s3_key=events_s3_key,
s3_format="FORMAT AS JSON 's3://udacity-dend/log_json_path.json'"
)
s3_songs_to_redshift = StageToRedshiftOperator(
task_id='staging_songs',
redshift_conn_id=redshift_conn_id,
aws_credentials_id=aws_credentials_id,
table="staging_songs",
s3_bucket=s3_bucket,
s3_key=song_s3_key,
s3_format="JSON 'auto'"
)
# Fact Operator
load_songplays_table = LoadFactOperator(
task_id='load_songplays_fact_table',
redshift_conn_id=redshift_conn_id,
table="songplays",
sql_stmt=SqlQueries.songplay_table_insert
)
# Dimension Operators
load_user_dimension_table = LoadDimensionOperator(
task_id='load_user_dim_table',
redshift_conn_id=redshift_conn_id,
table="users",
sql_stmt=SqlQueries.user_table_insert
)
load_song_dimension_table = LoadDimensionOperator(
task_id='load_song_dim_table',
redshift_conn_id=redshift_conn_id,
table="songs",
sql_stmt=SqlQueries.song_table_insert
)
load_artist_dimension_table = LoadDimensionOperator(
task_id='load_artist_dim_table',
redshift_conn_id=redshift_conn_id,
table="artists",
sql_stmt=SqlQueries.artist_table_insert
)
load_time_dimension_table = LoadDimensionOperator(
task_id='load_time_dim_table',
redshift_conn_id=redshift_conn_id,
table="time",
sql_stmt=SqlQueries.time_table_insert
)
# Data Quality Operator
run_quality_checks = DataQualityOperator(
task_id='data_quality_checks',
redshift_conn_id=redshift_conn_id,
test_cases=[
{'check_sql': "SELECT COUNT(*) FROM users WHERE user_id IS NULL",
'expected_result': 0},
{'check_sql': "SELECT COUNT(*) FROM songs WHERE song_id IS NULL",
'expected_result': 0},
{'check_sql': "SELECT COUNT(*) FROM artists WHERE artist_id IS NULL",
'expected_result': 0},
{'check_sql': "SELECT COUNT(*) FROM time WHERE start_time IS NULL",
'expected_result': 0},
]
)
end_operator = EmptyOperator(task_id='Stop_execution')
# Define dependencies
start_operator >> [s3_events_to_redshift, s3_songs_to_redshift]
[s3_events_to_redshift, s3_songs_to_redshift] >> load_songplays_table
load_songplays_table >> [
load_user_dimension_table,
load_song_dimension_table,
load_artist_dimension_table,
load_time_dimension_table
]
[
load_user_dimension_table,
load_song_dimension_table,
load_artist_dimension_table,
load_time_dimension_table
] >> run_quality_checks
run_quality_checks >> end_operator
final_project_dag = final_project()