Search
ETL Process

The whole ETL process is conducted through a Python script connecting both the MongoDB and the PostgreSQL databases using pymongo and sqlachemy respectively. The resulting data is finally stored in the Warehouse. This is a repetitive script as well, executing every second day at 10:05AM .This is achieved with the following cron command:

5 10 */2 * * python3 path_to_script/script.py

Click the toggle button to see the code

import numpy as np
from datetime import datetime
import pandas as pd
from pymongo import MongoClient
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Table,Column, Integer, String, Date, MetaData
from sqlalchemy import create_engine
from sklearn.pipeline import Pipeline
from sklearn.impute import KNNImputer
import os
os.chdir(os.path.dirname(__file__))
start = datetime.now()
countries_dict = {'it': 'Italy','de':'Germany','fr':'France','cz':'Czech Republic','tr':'Turkey','at':'Austria','lu':'Luxemburg',
                  'be':'Belgium','nl':'Netherlands','gb':'Great Britain','es':'Spain','bg':'Bulgaria','ru':'Russia','pl':'Poland',
                  'ro':'Romania','hu':'Hungary'}

DATABASE_URI = 'postgres+psycopg2://<name>:<pass>@<IP_address>/<db_name>'
meta = MetaData() 
engine = create_engine(DATABASE_URI)

client = MongoClient('mongodb+srv://<User>:<Password>@dwprojectcluster.lpqbf.mongodb.net/cars_database?retryWrites=true&w=majority')

df_cars = pd.DataFrame(list(client.cars_database.cars.find({})))
df_cars.drop('_id', axis = 1, inplace = True)
df_cars = df_cars[df_cars['Loaded_in_DW'].eq(False)]
df_cars['Make'] = df_cars['Make'].replace({np.nan: 'Undefined'})
df_cars['Model'] = df_cars['Model'].replace({np.nan: 'Undefined'})

df_ads = pd.DataFrame(list(client.cars_database.ads.find({})))
df_ads.drop(['_id', 'Price'], axis =1, inplace = True)

df_sellers = pd.DataFrame(list(client.cars_database.sellers.find({})))
df_sellers.drop('_id', axis = 1, inplace = True)
df_sellers['City'] = df_sellers['City'].str.lower()

df_ads.drop_duplicates(subset=['CarID'], inplace = True)
df_cars.drop_duplicates(subset=['ID'], inplace = True)
df_sellers.drop_duplicates(subset = ['SellerID'], inplace = True)
print("Stignav")
df_cities = pd.read_csv('cities.csv')
print("I posle gradovi")
df_cities_eu = df_cities.loc[df_cities['Country'].eq('de') | df_cities['Country'].eq('it')
                             | df_cities['Country'].eq('at') | df_cities['Country'].eq('nl')
                             | df_cities['Country'].eq('be') | df_cities['Country'].eq('lu')
                             | df_cities['Country'].eq('pl') | df_cities['Country'].eq('ru')
                             | df_cities['Country'].eq('es') | df_cities['Country'].eq('fr')
                             | df_cities['Country'].eq('cz') | df_cities['Country'].eq('ro')
                             | df_cities['Country'].eq('gb') | df_cities['Country'].eq('cz')
                             | df_cities['Country'].eq('hu') | df_cities['Country'].eq('tr')
                             | df_cities['Country'].eq('bg')]

df_cities_eu.drop_duplicates(subset=['City'], inplace = True)

sellers_extended = df_sellers.join(df_cities_eu.set_index('City'), on = 'City',rsuffix='_other')
sellers_extended.loc[sellers_extended['Country'].isnull(), 'Country'] = sellers_extended['Country_other'].map(countries_dict)
sellers_extended['Country'] = sellers_extended['Country'].replace({np.nan: 'Undefined'})


join1 = df_cars.merge(df_ads, left_on = 'ID', right_on = 'CarID', how = 'inner')
join2 = join1.merge(sellers_extended, left_on = 'SellerID', right_on = 'SellerID', how = 'inner')
final = join2[['Make','Model','SellerID','Country','Mileage','Price']]
final.isnull().sum()

# remove rows which do not have both Make and Model 
final = final[~(final['Make'].isna() | final['Model'].isna())]
# these are the removed documents (rows of the dataframe)
documents_without_make_and_model = final[~(final['Make'].isna() | final['Model'].isna())]

# performing imputation on a different dataframe
imputation_dataframe = final.copy()
# SellerID can't contribute to the imputation
imputation_dataframe.drop('SellerID', axis = 1, inplace = True)
# one-hot encoding the categorical variables, so KNN can be used for imputing
imputation_dataframe = pd.get_dummies(data = imputation_dataframe,
                                      columns = ['Country', 'Make', 'Model'],
                                      dummy_na = False)

# can be done without a pipeline, but in case of more steps in the future, the structure remains a pipeline
imputation_pipeline = Pipeline(steps = [('iterative_imputer', KNNImputer(missing_values = np.nan,
                                                                         n_neighbors = 5,
                                                                         weights = 'distance',
                                                                         metric = 'nan_euclidean',
                                                                         copy = False,
                                                                         add_indicator = False))])
# applying pipeline and creating resulting dataframe
imputed_dataframe = pd.DataFrame(data = imputation_pipeline.fit_transform(imputation_dataframe),
                                 columns = imputation_dataframe.columns,
                                 dtype = int)

# adding the imputed columns to the original dataframe
final.drop(['Mileage', 'Price'], axis = 1, inplace = True)
final['Mileage'] = imputed_dataframe['Mileage']
final['Price'] = imputed_dataframe['Price']

# replacing null values with appropriate value defined by the team
final['Country'] = final['Country'].replace({np.nan: 'Undefined'})
final['Make'] = final['Make'].replace({np.nan: 'Undefined'})
final['Model'] = final['Model'].replace({np.nan: 'Undefined'})

model_postgre = pd.read_sql_table('model', con = engine)
marka_postgre = pd.read_sql_table('marka', con = engine)
seller_postgre = pd.read_sql_table('seller', con = engine)
zemja_postgre = pd.read_sql_table('zemja', con = engine)

left_join = pd.DataFrame({'make':list(set(df_cars['Make']))}).merge(marka_postgre, left_on = 'make', right_on = 'ime', how = 'left')
left_join = left_join[left_join['skey'].isnull()]
left_join['ime'] = left_join['make']
left_join.drop(['skey', 'make'], axis =1, inplace = True)
left_join.to_sql('marka', con=engine, if_exists='append', index=False)
marka_postgre = pd.read_sql_table('marka', con = engine)

left_join = pd.DataFrame({'model':list(set(df_cars['Model']))}).merge(model_postgre, left_on = 'model', right_on = 'ime', how = 'left')
left_join = left_join[left_join['skey'].isnull()]
left_join['ime'] = left_join['model']
left_join.drop(['skey', 'model'], axis =1, inplace = True)
left_join.to_sql('model', con=engine, if_exists='append', index=False,  method = 'multi')
model_postgre = pd.read_sql_table('model', con = engine)


left_join = pd.DataFrame({'zemja':list(set(sellers_extended['Country']))}).merge(zemja_postgre, left_on = 'zemja', right_on = 'ime', how = 'left')
left_join = left_join[left_join['skey'].isnull()]
left_join['ime'] = left_join['zemja']
left_join.drop(['skey', 'zemja'], axis =1, inplace = True)
left_join.to_sql('zemja', con=engine, if_exists='append', index=False,  method = 'multi')
zemja_postgre = pd.read_sql_table('zemja', con = engine)

left_join = pd.DataFrame({'zemja':list(set(sellers_extended['Country']))}).merge(zemja_postgre, left_on = 'zemja', right_on = 'ime', how = 'left')
left_join = left_join[left_join['skey'].isnull()]
left_join['ime'] = left_join['zemja']
left_join.drop(['skey', 'zemja'], axis =1, inplace = True)
left_join.to_sql('zemja', con=engine, if_exists='append', index=False, method = 'multi')
zemja_postgre = pd.read_sql_table('zemja', con = engine)


left_join = pd.concat([df_sellers['SellerID'], df_sellers['Vendor']], axis=1).merge(seller_postgre, left_on = 'SellerID', right_on = 'odb_sellerid', how = 'left')
left_join = left_join[left_join['skey'].isnull()]
left_join['odb_sellerid'] = left_join['SellerID']
left_join['vendorname'] = left_join['Vendor']
left_join.drop(['skey', 'SellerID', 'Vendor'], axis =1, inplace = True)
left_join.to_sql('seller', con=engine, if_exists='append', index=False, method = 'multi')
seller_postgre = pd.read_sql_table('seller', con = engine)


merged = final.merge(seller_postgre, left_on = 'SellerID', right_on = 'odb_sellerid', how = 'inner') \
     .merge(zemja_postgre, left_on ='Country', right_on = 'ime', how = 'inner')  \
     .merge(marka_postgre, left_on = 'Make', right_on = 'ime', how = 'inner') \
     .merge(model_postgre, left_on = 'Model', right_on = 'ime', how = 'inner')[['skey_x', 'skey_y', 'Price', 'Mileage']]
merged.columns = ['seller_skey', 'marka_skey', 'zemja_skey', 'model_skey','Price','Mileage']


fact1 = merged.drop('seller_skey', axis =1)
fact2 = merged.drop(['zemja_skey','model_skey'], axis =1)

fact1 = fact1.groupby(['marka_skey', 'zemja_skey','model_skey']).agg(
    avg_price = ('Price','mean'),
    car_count = ('Price', 'count'),
    avg_mileage = ('Mileage','mean')
).reset_index()



fact2 =  fact2.groupby(['seller_skey', 'marka_skey']).agg(
    car_count = ('Price', 'count'),

).reset_index()


engine.execute("TRUNCATE TABLE fact_make_model_country")
engine.execute("TRUNCATE TABLE fact_seller_make")

fact1.to_sql('fact_make_model_country', con=engine, if_exists='append', index=False, method = 'multi')
fact2.to_sql('fact_seller_make', con=engine, if_exists='append', index=False, method = 'multi')


finish = datetime.now()
log = {'Start': [start], 'Finish': [finish]}
df_log = pd.DataFrame(data=log)
df_log.to_sql('log_table', con=engine, if_exists='append', index=False, method = 'multi')