To Nha Notes | Jan. 2, 2022, 12:46 p.m.
If you’re using Snowflake as your data warehouse, you have three options for configuring access to the S3 bucket from your Snowflake instance:
Of the three, the first is recommended because of how seamless using a Snowflake storage integration is when later interacting with the S3 bucket from Snowflake. it’s best to refer to the latest Snowflake documentation on the topic.
In the final step of the configuration you’ll create an external stage. An external stage is an object that points to an external storage location so Snowflake can access it. The S3 bucket you created earlier will serve as that location.
Before you create the stage, it’s handy to define a FILE FORMAT in Snowflake that you can both refer to for the stage and later use for similar file formats.
CREATE or REPLACE FILE FORMAT pipe_csv_format
TYPE = 'csv'
FIELD_DELIMITER = '|';
Create the stage for s3 bucket, the syntax will look something like this:
USE SCHEMA my_db.my_schema;
CREATE STAGE my_s3_stage
storage_integration = s3_int
url = 's3://pipeline-bucket/'
file_format = pipe_csv_format;
Here is the section to add to pipeline.conf:
[snowflake_creds]
username = snowflake_user
password = snowflake_password
account_name = snowflake_acct1.us-east-2.aws
The mechanism for loading data into Snowflake is the COPY INTO command. COPY INTO loads the contents of a file or multiple files into a table in the Snowflake warehouse. You can read more about the advanced usage and options of the command in the Snowflake documentation.
Snowflake also has a data integration service called Snowpipe that enables loading data from files as soon as they’re available in a Snowflake stage like the one used in the example in this section. You can use Snowpipe to continuously load data rather than scheduling a bulk load via the COPY INTO command.
Now, using the COPY INTO command, you can load the file into a Snowflake table as follows:
COPY INTO destination_table
FROM @my_s3_stage/extract_file.csv;
It’s also possible to load multiple files into the table at once. In some cases, data is extracted into more than one file due to volume or as a result of multiple extraction job runs since the last load. If the files have a consistent naming pattern (and they should!), you can load them all using the pattern parameter:
COPY INTO destination_table
FROM @my_s3_stage
pattern='.*extract.*.csv';
Now that you know how the COPY INTO command works, it’s time to write a short Python script that can be scheduled and executed to automate the load in a pipeline.
(env) $ pip install snowflake-connector-python
import snowflake.connector
import configparser
parser = configparser.ConfigParser()
parser.read("pipeline.conf")
username = parser.get("snowflake_creds",
"username")
password = parser.get("snowflake_creds",
"password")
account_name = parser.get("snowflake_creds",
"account_name")
snow_conn = snowflake.connector.connect(
user = username,
password = password,
account = account_name
)
sql = """COPY INTO destination_table
FROM @my_s3_stage
pattern='.*extract.*.csv';"""
cur = snow_conn.cursor()
cur.execute(sql)
cur.close()
Reference from https://learning.oreilly.com/library/view/data-pipelines-pocket/9781492087823/ch05.html#load-data-snowflake