-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathload_fact.py
29 lines (24 loc) · 997 Bytes
/
load_fact.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class LoadFactOperator(BaseOperator):
ui_color = '#89DA59'
@apply_defaults
def __init__(self,
redshift_conn_id="",
table="",
sql_stmt="",
*args, **kwargs):
super(LoadFactOperator, self).__init__(*args, **kwargs)
self.table = table
self.redshift_conn_id = redshift_conn_id
self.sql_stmt = sql_stmt
def execute(self, context):
self.log.info("Starting LoadFactOperator")
redshift = PostgresHook(postgres_conn_id=self.redshift_conn_id)
self.log.info(f"Loading fact table {self.table} in Redshift")
insert_sql = f"""
{self.sql_stmt}
"""
redshift.run(insert_sql)
self.log.info("LoadFactOperator completed successfully")