-
Notifications
You must be signed in to change notification settings - Fork 0
/
13. building data engineering pipelines in python.txt
290 lines (243 loc) · 11.7 KB
/
13. building data engineering pipelines in python.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
// LESSON 1.1 - EXTRACTING DATA - not really the first lesson
endpoint = "http://localhost:5000"
# Fill in the correct API key
api_key = "scientist007"
# Create the web API’s URL
authenticated_endpoint = "{}/{}".format(endpoint, api_key)
# Get the web API’s reply to the endpoint
api_response = requests.get(authenticated_endpoint).json()
pprint.pprint(api_response)
# Create the API’s endpoint for the shops
shops_endpoint = "{}/{}/{}/{}".format(endpoint, api_key, "diaper/api/v1.0", "shops")
shops = requests.get(shops_endpoint).json()
print(shops)
# Create the API’s endpoint for items of the shop starting with a "D"
items_of_specific_shop_URL = "{}/{}/{}/{}/{}".format(endpoint, api_key, "diaper/api/v1.0", "items", "DM")
products_of_shop = requests.get(items_of_specific_shop_URL).json()
pprint.pprint(products_of_shop)
// LESSON 1.2 - INTRODUCTION TO DATA INGESTION WITH SINGER
- data exchange format: json
describing data thru its schema
columns = ("id", "name", "age", "has_children")
users = {
(1, "Adrian", 32, False),
(2, "Ruanne", 28, False),
(3, "Hillary", 29, True)
}
json_schema = {
"properties": {
"id": {
"type": "integer"
},
"name": {
"type": "string"
},
"age": {
"maximum": 130,
"minimum": 1,
"type": "integer"
},
"has_children": {
"type": "boolean"
}
},
"$id": "http://yourdomain.com/schemas/my_user_schema.json",
"$schema": "http://json-schema.org/draft-07/schema#"
}
describing the data through this schema
import singer
singer.write_schema( # creating a json schema
schema=json_schema, # using json schema we defined above
stream_name='DC_employees',
key_properties=["id"] # primary key in the schema
)
serializing json
import json
json.dumps(json_schema["properties"]["age"]) # transforms the object to a string or convert an object to json
# writes the same string to a file
with open("foo.json", mode="w") as fh:
json.dump(obj=json_schema, fp=fh) # writes the json-serialized object to the open file handle
// LESSON 1.3 - RUNNING AN INGESTION PIPELIN WITH SINGER
streaming record messages - record and state messages, so that we can create a full ingestion pipeline
columns = ("id", "name", "age", "has_children")
users = {(1, "Adrian", 32, False),
(2, "Ruanne", 28, False),
(3, "Hillary", 29, True)}
singer.write_record(stream_name="DC_employees",
record=dict(zip(columns, users.pop())))
fixed_dict = {"type": "RECORD", "stream": "DC_employees"}
record_msg = {**fixed_dict, "record": dict(zip(columns, users.pop()))}
print(json.dumps(record_msg))
chaining taps and targets - schema and records are using modularity - one module per each schema + record
# Module: my_tap.py
import singer
# this two shall be connected since they had the same stream_name
singer.write_schema(stream_name="foo", schema=...)
singer.write_records(stream_name="foo", records=...)
# terminal command - pipe the output into a singer target
python my_tap.py | target-csv # outputting the results into target-csv file
python my_tap.py | target-csv --config userconfig.cfg
my-packaged-tap | target-csv --config userconfig.cfg
modular ingestion pipelines
# data ingestion, or grabbing the data from different modules
my-packaged-tap | target-csv
my-packaged-tap | target-google-sheets
my-packaged-tap | target-postgresql --config conf.json # passing config file to target-postgresql
tap-marketing-api | target-csv --config ingest/data_lake.conf # i dont even know what this doing anymore
// LESSON 1.4 - CONCLUSION CODE FOR DATA INGESTION
import requests
import singer
api_netloc = "localhost:5000"
api_key = "scientist007"
shops_template = f"http://{api_netloc}/{api_key}/diaper/api/v1.0/shops"
items_template = f"http://{api_netloc}/{api_key}/diaper/api/v1.0/items/"
# Complete the JSON schema
schema = {'properties': {
'brand': {'type': 'string'},
'model': {'type': 'string'},
'price': {'type': 'number'},
'currency': {'type': 'string'},
'quantity': {'type': 'integer', 'minimum': 1},
'date': {'type': 'string', "format": "date"},
'countrycode': {'type': 'string', 'pattern': "^[A-Z]{2}$"},
'store_name': {'type': 'string'}},
'$schema': 'http://json-schema.org/draft-07/schema#'
}
# Write the schema to stdout.
singer.write_schema(stream_name='products', schema=schema, key_properties=[])
# Return the set of items scraped from a specific store as a list
def retrieve_store_items(store_name, items_endpoint=items_template):
return requests.get(f"{items_endpoint}{store_name}").json()["items"]
def main():
for shop in requests.get(shops_template).json()["shops"]:
singer.write_records(stream_name='products',
# Add the name of the store to every record.
records=({'store_name': shop, **item}
for item in retrieve_store_items(shop)))
if __name__ == "__main__":
main()
// LESSON 2.1 - BASIC INTRODUCTION TO PYSPARK
spark
- fast and general engine for large-scale data processing
- exists in different langauges (python(pyspark),java,scala,r)
- data processing at scale by using horizontal scaling
- can also be used for machine learning
not useful for small data
- because of the overheads that pyspark uses, for small data use pandas
- not useful when you have small operations
starting the spark analytics engine
from pyspark.sql
import SparkSessionspark = SparkSession.builder.getOrCreate()
reading a csv file
prices = spark.read.csv("mnt/data_lake/landing/prices.csv")
prices.show()
enforcing a schema - not a good idea to automatically infer data types for schema
schema = StructType([StructField("store", StringType(), nullable=False),
StructField("countrycode", StringType(), nullable=False),
StructField("brand", StringType(), nullable=False),
StructField("price", FloatType(), nullable=False),
StructField("currency", StringType(), nullable=True),
StructField("quantity", IntegerType(), nullable=True),
StructField("date", DateType(), nullable=False)])
prices = spark.read.options(header="true").schema(schema).csv("mnt/data_lake/landing/prices.csv")
print(prices.dtypes)
// LESSON 2.2 - CLEANING DATA
reasons to clean data
- most data sources are not ready for analytics
- incorrect data types
- invalid rows
- incomplete rows
- badly chosen placeholders
automating data cleaning
implicit standards in the company?
- regional datetimes vs UTC
- column naming conventions
low level details of the systems
- representation of unknown/incomplete data
- ranges for numerical values
- meaning of fields
selecting data types
just wanted to let you know that list exists in datacamp, but im too lazy to copy it
handle invalid rows - badly formatted source
- spark handles badly formatted source data and somehow fix it, but is it enough? ofc not
prices = (spark
.read
.options(header="true", mode="DROPMALFORMED") # this dropmalformed handles invalid row
.csv('landing/prices.csv'))
significance of null - put null to missing values
prices = (spark.read.options(header="true")
.schema(schema)
.csv('/landing/prices_with_incomplete_rows.csv'))
prices.show()
supplying defaults values for missing data
prices.fillna(25, subset=['quantity']).show()
badly chosen placeholders
- people usually put extremely bad placeholder for values they do not know (e.g. attribute for end_date is 9999-12-13)
- instead of putting bad placeholder just put null value
conditionally replace values
from pyspark.sql.functions import col, when
from datetime import date, timedelta
one_year_from_now = date.today().replace(year=date.today().year + 1)
better_frame = employees.withColumn("end_date",
when(col("end_date") > one_year_from_now, None).otherwise(col("end_date")))
better_frame.show()
// LESSON 2.3 - TRANSFORMING DATA WITH SPARK
why transform data?
process
- collect data
- "massage" data: involves cleaning and business logic
- derive insights (e.g. top 10 best rated hotels in your neighborhood)
example:
- collect data from booking.com and hotels/com
- standardize hotel names, normalizing review scores
- join datasets, filter on location and rank results
common data transformations
- filtering and ordering rows: only put daya that are useful for a particular analysys
prices_in_belgium = prices.filter(col('countrycode') == 'BE').orderBy(col('date'))
- selecting and renaming columns
prices.select(
col("store"),
col("brand").alias("brandname")
)
- reducing duplicate values
prices.select(
col("store"),
col("brand").alias("brandname")
).distinct()
- grouping and aggregation: grouping rows by a particular field and then aggregating some metrics
(prices
.groupBy(col('brand'))
.mean('price')
).show()
- joining
ratings_with_prices = ratings.join(prices, ["brand", "model"])
// UNFINISHED - LESSON 2.4 - PACKAGING YOUR APPLICATION
- zip your files
// LESSON 3.1 - IMPORTANCE OF TESTS
rationale behind testing
- improves chance of code being correct in the future
- raises confidence (not a guarantee) that code is correct now
- most up to date documentation
test pyramid
- unit test: test for each function
- integration test: itneraction with file systems and databases
- ui test: test as user interacts with the application
// LESSON 3.2 - QWRITING UNIT TESTS FOR PYSPARK
// LESSON X.1 - CONCLUSION WORKFLOW
ingesting data
tap/single module: steps in creating a tap or single module
extracting data: extract data(e.g. web scraping, querying from database, etc.) and transform it the way it fits our schema
creating json schema for data: creating format on how the data would look like
serializing json: so the json data you have can be transferrable into different language (e.g. transferring json data from python to scala )
streaming record messages: adding data using the schema
chaining taps and targets: use modularity, one module for each schema + record | tap is the data source from which data will be extracted, these are the modules
modular ingestion pipelines: since data are now stored into different modules, when you need them grab that module
data transformation
determine size of data: determine the size of data as the tool you'll be using would be based on this
small data: use pandas, pyspark is not preferred since it adds a lot of overheads
big data: pyspark is good since it utilizes horizontal scaling or divides task into different workers
starting the sparks analytic engine: obviously to be able to use spark
reading the file (csv,etc.)
enforcing a schema: creating a schema to be used
cleaning the data
transforming the data