ETL Process
The whole ETL process is conducted through a Python script connecting both the MongoDB and the PostgreSQL databases using pymongo and sqlachemy respectively. The resulting data is finally stored in the Warehouse. This is a repetitive script as well, executing every second day at 10:05AM .This is achieved with the following cron command:
5 10 */2 * * python3 path_to_script/script.py
Click the toggle button to see the code
import numpy as np
from datetime import datetime
import pandas as pd
from pymongo import MongoClient
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Table,Column, Integer, String, Date, MetaData
from sqlalchemy import create_engine
from sklearn.pipeline import Pipeline
from sklearn.impute import KNNImputer
import os
os.chdir(os.path.dirname(__file__))
start = datetime.now()
countries_dict = {'it': 'Italy','de':'Germany','fr':'France','cz':'Czech Republic','tr':'Turkey','at':'Austria','lu':'Luxemburg',
'be':'Belgium','nl':'Netherlands','gb':'Great Britain','es':'Spain','bg':'Bulgaria','ru':'Russia','pl':'Poland',
'ro':'Romania','hu':'Hungary'}
DATABASE_URI = 'postgres+psycopg2://<name>:<pass>@<IP_address>/<db_name>'
meta = MetaData()
engine = create_engine(DATABASE_URI)
client = MongoClient('mongodb+srv://<User>:<Password>@dwprojectcluster.lpqbf.mongodb.net/cars_database?retryWrites=true&w=majority')
df_cars = pd.DataFrame(list(client.cars_database.cars.find({})))
df_cars.drop('_id', axis = 1, inplace = True)
df_cars = df_cars[df_cars['Loaded_in_DW'].eq(False)]
df_cars['Make'] = df_cars['Make'].replace({np.nan: 'Undefined'})
df_cars['Model'] = df_cars['Model'].replace({np.nan: 'Undefined'})
df_ads = pd.DataFrame(list(client.cars_database.ads.find({})))
df_ads.drop(['_id', 'Price'], axis =1, inplace = True)
df_sellers = pd.DataFrame(list(client.cars_database.sellers.find({})))
df_sellers.drop('_id', axis = 1, inplace = True)
df_sellers['City'] = df_sellers['City'].str.lower()
df_ads.drop_duplicates(subset=['CarID'], inplace = True)
df_cars.drop_duplicates(subset=['ID'], inplace = True)
df_sellers.drop_duplicates(subset = ['SellerID'], inplace = True)
print("Stignav")
df_cities = pd.read_csv('cities.csv')
print("I posle gradovi")
df_cities_eu = df_cities.loc[df_cities['Country'].eq('de') | df_cities['Country'].eq('it')
| df_cities['Country'].eq('at') | df_cities['Country'].eq('nl')
| df_cities['Country'].eq('be') | df_cities['Country'].eq('lu')
| df_cities['Country'].eq('pl') | df_cities['Country'].eq('ru')
| df_cities['Country'].eq('es') | df_cities['Country'].eq('fr')
| df_cities['Country'].eq('cz') | df_cities['Country'].eq('ro')
| df_cities['Country'].eq('gb') | df_cities['Country'].eq('cz')
| df_cities['Country'].eq('hu') | df_cities['Country'].eq('tr')
| df_cities['Country'].eq('bg')]
df_cities_eu.drop_duplicates(subset=['City'], inplace = True)
sellers_extended = df_sellers.join(df_cities_eu.set_index('City'), on = 'City',rsuffix='_other')
sellers_extended.loc[sellers_extended['Country'].isnull(), 'Country'] = sellers_extended['Country_other'].map(countries_dict)
sellers_extended['Country'] = sellers_extended['Country'].replace({np.nan: 'Undefined'})
join1 = df_cars.merge(df_ads, left_on = 'ID', right_on = 'CarID', how = 'inner')
join2 = join1.merge(sellers_extended, left_on = 'SellerID', right_on = 'SellerID', how = 'inner')
final = join2[['Make','Model','SellerID','Country','Mileage','Price']]
final.isnull().sum()
# remove rows which do not have both Make and Model
final = final[~(final['Make'].isna() | final['Model'].isna())]
# these are the removed documents (rows of the dataframe)
documents_without_make_and_model = final[~(final['Make'].isna() | final['Model'].isna())]
# performing imputation on a different dataframe
imputation_dataframe = final.copy()
# SellerID can't contribute to the imputation
imputation_dataframe.drop('SellerID', axis = 1, inplace = True)
# one-hot encoding the categorical variables, so KNN can be used for imputing
imputation_dataframe = pd.get_dummies(data = imputation_dataframe,
columns = ['Country', 'Make', 'Model'],
dummy_na = False)
# can be done without a pipeline, but in case of more steps in the future, the structure remains a pipeline
imputation_pipeline = Pipeline(steps = [('iterative_imputer', KNNImputer(missing_values = np.nan,
n_neighbors = 5,
weights = 'distance',
metric = 'nan_euclidean',
copy = False,
add_indicator = False))])
# applying pipeline and creating resulting dataframe
imputed_dataframe = pd.DataFrame(data = imputation_pipeline.fit_transform(imputation_dataframe),
columns = imputation_dataframe.columns,
dtype = int)
# adding the imputed columns to the original dataframe
final.drop(['Mileage', 'Price'], axis = 1, inplace = True)
final['Mileage'] = imputed_dataframe['Mileage']
final['Price'] = imputed_dataframe['Price']
# replacing null values with appropriate value defined by the team
final['Country'] = final['Country'].replace({np.nan: 'Undefined'})
final['Make'] = final['Make'].replace({np.nan: 'Undefined'})
final['Model'] = final['Model'].replace({np.nan: 'Undefined'})
model_postgre = pd.read_sql_table('model', con = engine)
marka_postgre = pd.read_sql_table('marka', con = engine)
seller_postgre = pd.read_sql_table('seller', con = engine)
zemja_postgre = pd.read_sql_table('zemja', con = engine)
left_join = pd.DataFrame({'make':list(set(df_cars['Make']))}).merge(marka_postgre, left_on = 'make', right_on = 'ime', how = 'left')
left_join = left_join[left_join['skey'].isnull()]
left_join['ime'] = left_join['make']
left_join.drop(['skey', 'make'], axis =1, inplace = True)
left_join.to_sql('marka', con=engine, if_exists='append', index=False)
marka_postgre = pd.read_sql_table('marka', con = engine)
left_join = pd.DataFrame({'model':list(set(df_cars['Model']))}).merge(model_postgre, left_on = 'model', right_on = 'ime', how = 'left')
left_join = left_join[left_join['skey'].isnull()]
left_join['ime'] = left_join['model']
left_join.drop(['skey', 'model'], axis =1, inplace = True)
left_join.to_sql('model', con=engine, if_exists='append', index=False, method = 'multi')
model_postgre = pd.read_sql_table('model', con = engine)
left_join = pd.DataFrame({'zemja':list(set(sellers_extended['Country']))}).merge(zemja_postgre, left_on = 'zemja', right_on = 'ime', how = 'left')
left_join = left_join[left_join['skey'].isnull()]
left_join['ime'] = left_join['zemja']
left_join.drop(['skey', 'zemja'], axis =1, inplace = True)
left_join.to_sql('zemja', con=engine, if_exists='append', index=False, method = 'multi')
zemja_postgre = pd.read_sql_table('zemja', con = engine)
left_join = pd.DataFrame({'zemja':list(set(sellers_extended['Country']))}).merge(zemja_postgre, left_on = 'zemja', right_on = 'ime', how = 'left')
left_join = left_join[left_join['skey'].isnull()]
left_join['ime'] = left_join['zemja']
left_join.drop(['skey', 'zemja'], axis =1, inplace = True)
left_join.to_sql('zemja', con=engine, if_exists='append', index=False, method = 'multi')
zemja_postgre = pd.read_sql_table('zemja', con = engine)
left_join = pd.concat([df_sellers['SellerID'], df_sellers['Vendor']], axis=1).merge(seller_postgre, left_on = 'SellerID', right_on = 'odb_sellerid', how = 'left')
left_join = left_join[left_join['skey'].isnull()]
left_join['odb_sellerid'] = left_join['SellerID']
left_join['vendorname'] = left_join['Vendor']
left_join.drop(['skey', 'SellerID', 'Vendor'], axis =1, inplace = True)
left_join.to_sql('seller', con=engine, if_exists='append', index=False, method = 'multi')
seller_postgre = pd.read_sql_table('seller', con = engine)
merged = final.merge(seller_postgre, left_on = 'SellerID', right_on = 'odb_sellerid', how = 'inner') \
.merge(zemja_postgre, left_on ='Country', right_on = 'ime', how = 'inner') \
.merge(marka_postgre, left_on = 'Make', right_on = 'ime', how = 'inner') \
.merge(model_postgre, left_on = 'Model', right_on = 'ime', how = 'inner')[['skey_x', 'skey_y', 'Price', 'Mileage']]
merged.columns = ['seller_skey', 'marka_skey', 'zemja_skey', 'model_skey','Price','Mileage']
fact1 = merged.drop('seller_skey', axis =1)
fact2 = merged.drop(['zemja_skey','model_skey'], axis =1)
fact1 = fact1.groupby(['marka_skey', 'zemja_skey','model_skey']).agg(
avg_price = ('Price','mean'),
car_count = ('Price', 'count'),
avg_mileage = ('Mileage','mean')
).reset_index()
fact2 = fact2.groupby(['seller_skey', 'marka_skey']).agg(
car_count = ('Price', 'count'),
).reset_index()
engine.execute("TRUNCATE TABLE fact_make_model_country")
engine.execute("TRUNCATE TABLE fact_seller_make")
fact1.to_sql('fact_make_model_country', con=engine, if_exists='append', index=False, method = 'multi')
fact2.to_sql('fact_seller_make', con=engine, if_exists='append', index=False, method = 'multi')
finish = datetime.now()
log = {'Start': [start], 'Finish': [finish]}
df_log = pd.DataFrame(data=log)
df_log.to_sql('log_table', con=engine, if_exists='append', index=False, method = 'multi')