Final Exam
, Fall 2022
: Time Series Analysis of US Inflation
¶Version 1.0.1
Change history:
1.0.1 - bugfix ex2 test code.
1.0 - initial release
All of the header information is important. Please read it..
Topics, number of exercises: This problem builds on your knowledge of Pandas, Numpy, basic Python data structures, and implementing mathematical functions. It has 9 exercises, numbered 0 to 8. There are 18 available points. However, to earn 100% the threshold is 13 points. (Therefore, once you hit 13 points, you can stop. There is no extra credit for exceeding this threshold.)
Exercise ordering: Each exercise builds logically on previous exercises, but you may solve them in any order. That is, if you can't solve an exercise, you can still move on and try the next one. Use this to your advantage, as the exercises are not necessarily ordered in terms of difficulty. Higher point values generally indicate more difficult exercises.
Demo cells: Code cells starting with the comment ### define demo inputs
load results from prior exercises applied to the entire data set and use those to build demo inputs. These must be run for subsequent demos to work properly, but they do not affect the test cells. The data loaded in these cells may be rather large (at least in terms of human readability). You are free to print or otherwise use Python to explore them, but we did not print them in the starter code.
Debugging your code: Right before each exercise test cell, there is a block of text explaining the variables available to you for debugging. You may use these to test your code and can print/display them as needed (careful when printing large objects, you may want to print the head or chunks of rows at a time).
Exercise point breakdown:
Final reminders:
Inflation is an increase in overall prices in an economy over time. Deflation is "negative inflation", a decrease in prices over time. A common way to measure inflation is to first calculate the CPI (price of a representative basket of goods), then compute the difference in CPI over a time interval. In other words if the CPI is 100 at one point in time, and the CPI is 105 one year later then we would say that the inflation rate over that year was 5%.
We have obtained the US CPI for each month going back to the early 20th century from The Organisation for Economic Co-operation and Development.
### Global Imports
###
### AUTOGRADER TEST - DO NOT REMOVE
###
import pandas as pd
import numpy as np
import pickle
from matplotlib import pyplot as plt
To start things off we will load the CPI data into the notebook environment. You do not need to modify the cell below, just execute the test and collect your free point!
This cell will also display the first few rows and last few rows of the CPI data we just loaded.
cpi_all_df = pd.read_csv('resource/asnlib/publicdata/cpi_urban_all.csv')
display(cpi_all_df.head())
display(cpi_all_df.tail())
The cell below will test your solution for Exercise 0. The testing variables will be available for debugging under the following names in a dictionary format.
input_vars
- Input variables for your solution. original_input_vars
- Copy of input variables from prior to running your solution. These should be the same as input_vars
- otherwise the inputs were modified by your solution.returned_output_vars
- Outputs returned by your solution.true_output_vars
- The expected output. This should "match" returned_output_vars
based on the question requirements - otherwise, your solution is not returning the correct output. ### test_cell_ex0
###
### AUTOGRADER TEST - DO NOT REMOVE
###
assert 'cpi_all_df' in globals()
assert isinstance(cpi_all_df, pd.DataFrame)
print('Passed! Please submit.')
The raw data needs some light cleaning. There are some columns which we do not need for analysis, some of the numerical columns have blanks, and (due to the blanks) some numerical columns are the wrong type. We need to correct these issues before moving forward.
Define the function cleanup_df(df, drop_cols)
. Input df
is a DataFrame and drop_cols
is a list of column names which may or may not appear in df
.
Your function should return a new DataFrame having the same contents as df
with the following exceptions:
drop_cols
should be dropped. drop_cols
does not appear in df
.' '
should be replaced with np.nan
.Jan, Feb, Mar, Apr, May, Jun, Jul, Aug, Sep, Oct, Nov, Dec
) should be converted to float64
.### Define demo inputs
demo_df_ex1 = cpi_all_df.tail().reset_index(drop=True)
display(demo_df_ex1)
demo_drop_cols_ex1 = ['HALF1', 'HALF2', 'THIS COLUMN DOESN\'T EXIST']
The demo included in the solution cell below should display the following output:
Year | Jan | Feb | Mar | Apr | May | Jun | Jul | Aug | Sep | Oct | Nov | Dec | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 2018 | 247.867 | 248.991 | 249.554 | 250.546 | 251.588 | 251.989 | 252.006 | 252.146 | 252.439 | 252.885 | 252.038 | 251.233 |
1 | 2019 | 251.712 | 252.776 | 254.202 | 255.548 | 256.092 | 256.143 | 256.571 | 256.558 | 256.759 | 257.346 | 257.208 | 256.974 |
2 | 2020 | 257.971 | 258.678 | 258.115 | 256.389 | 256.394 | 257.797 | 259.101 | 259.918 | 260.28 | 260.388 | 260.229 | 260.474 |
3 | 2021 | 261.582 | 263.014 | 264.877 | 267.054 | 269.195 | 271.696 | 273.003 | 273.567 | 274.31 | 276.589 | 277.948 | 278.802 |
4 | 2022 | 281.148 | 283.716 | 287.504 | 289.109 | 292.296 | 296.311 | 296.276 | 296.171 | 296.808 | 298.012 | NaN | NaN |
Notice:
df
.np.nan
(which displays as 'NaN'). FYI np.nan
is a float
.Notes:
dtypes
attribute of your result. Columns which are months ('Jan', 'Feb', ...) should be float64
. Any other remaining columns should have the same dtype
as the original column in the input.### Exercise 1 solution
def cleanup_df(df, drop_cols):
clean_df = df.drop(columns=drop_cols, errors='ignore')\
.replace(' ', np.nan)
months = 'Jan, Feb, Mar, Apr, May, Jun, Jul, Aug, Sep, Oct, Nov, Dec'.split(', ')
month_dict = {m: 'float64' for m in months}
return clean_df.astype(month_dict)
### demo function call
demo_output_ex1 = cleanup_df(demo_df_ex1, demo_drop_cols_ex1)
display(demo_output_ex1)
demo_df_ex1.dtypes
The cell below will test your solution for Exercise 1. The testing variables will be available for debugging under the following names in a dictionary format.
input_vars
- Input variables for your solution. original_input_vars
- Copy of input variables from prior to running your solution. These should be the same as input_vars
- otherwise the inputs were modified by your solution.returned_output_vars
- Outputs returned by your solution.true_output_vars
- The expected output. This should "match" returned_output_vars
based on the question requirements - otherwise, your solution is not returning the correct output. ### test_cell_ex1
###
### AUTOGRADER TEST - DO NOT REMOVE
###
from tester_fw.testers import Tester
conf = {
'case_file':'tc_1',
'func': cleanup_df, # replace this with the function defined above
'inputs':{ # input config dict. keys are parameter names
'df':{
'dtype':'pd.DataFrame', # data type of param.
'check_modified':True,
},
'drop_cols':{
'dtype':'list', # data type of param.
'check_modified':True,
}
},
'outputs':{
'output_0':{
'index':0,
'dtype':'pd.DataFrame',
'check_dtype': True,
'check_col_dtypes': True, # Ignored if dtype is not df
'check_col_order': True, # Ignored if dtype is not df
'check_row_order': True, # Ignored if dtype is not df
'check_column_type': True, # Ignored if dtype is not df
'float_tolerance': 10 ** (-10)
}
}
}
tester = Tester(conf, key=b'z0BNF11iKYQicR63590bVXZGa19YGvJcmzrbP6R7oAY=', path='resource/asnlib/publicdata/')
for _ in range(70):
try:
tester.run_test()
(input_vars, original_input_vars, returned_output_vars, true_output_vars) = tester.get_test_vars()
except:
(input_vars, original_input_vars, returned_output_vars, true_output_vars) = tester.get_test_vars()
raise
###
### AUTOGRADER TEST - DO NOT REMOVE
###
print('Passed! Please submit.')
To complete our time series analysis we need to reshape the data into a proper time series. By using earlier functions we are able to pare down the data into this form:
Year | Jan | Feb | Mar | Apr | May | Jun | Jul | Aug | Sep | Oct | Nov | Dec | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
2 | 2020 | 257.971 | 258.678 | 258.115 | 256.389 | 256.394 | 257.797 | 259.101 | 259.918 | 260.28 | 260.388 | 260.229 | 260.474 |
3 | 2021 | 261.582 | 263.014 | 264.877 | 267.054 | 269.195 | 271.696 | 273.003 | 273.567 | 274.31 | 276.589 | 277.948 | 278.802 |
4 | 2022 | 281.148 | 283.716 | 287.504 | 289.109 | 292.296 | 296.311 | 296.276 | 296.171 | 296.808 | 298.012 | NaN | NaN |
We want to further transform it into a single dimension in chronological order. (i.e. all the data points for 2020 followed by all the data points for 2021 followed by all the data points for 2022.)
Note: In the example above there are no records for November and December of 2022 (because they have not concluded at the writing of this exam).
Define the function to_ts(df)
. The input df
can be assumed to have the following characteristics:
'Year' 'Jan' 'Feb' 'Mar' 'Apr' 'May' 'Jun' 'Jul' 'Aug' 'Sep' 'Oct' 'Nov' 'Dec'
in that particular order. float64
.np.nan
. 'Year'
column in ascending order.Your function should return a new Array or None
by implementing this logic:
None
### Define demo inputs
demo_df_ex2 = \
pd.DataFrame([[2021,261.582,263.014, 264.877, 267.054, 269.195, 271.696, 273.003, 273.567, 274.31, 276.589, 277.948, 278.802],
[2022, 281.148, 283.716, 287.504, 289.109, 292.296, 296.311, 296.276, 296.171, 96.808, 298.012, np.nan, np.nan]],
columns=['Year', 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'])
demo_invalid_df_ex2 = \
pd.DataFrame([[2021,261.582,263.014, np.nan, 267.054, 269.195, 271.696, 273.003, 273.567, 274.31, 276.589, 277.948, 278.802],
[2022, 281.148, 283.716, 287.504, 289.109, 292.296, 296.311, 296.276, 296.171, 96.808, 298.012, np.nan, np.nan]],
columns=['Year', 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'])
The demo included in the solution cell below should display the following output:
Demo output
[261.582 263.014 264.877 267.054 269.195 271.696 273.003 273.567 274.31
276.589 277.948 278.802 281.148 283.716 287.504 289.109 292.296 296.311
296.276 296.171 96.808 298.012]
Demo handling invalid input
None
The demo runs your solution first on a df
input with missing values only at the end (an array is expected as output). Then runs it on a df
input with a missing value in the middle (None
is expected as output).
### Exercise 2 solution
def to_ts(df):
assert (['Year'] + 'Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split()) == list(df.columns)
ts = df.drop(columns='Year').values
ts = ts.reshape((-1,))
missing = np.argwhere(np.isnan(ts)).reshape((-1,))
max_not_missing = np.argwhere(~np.isnan(ts)).max()
if (max_not_missing > missing).any():
return None
return ts[:(max_not_missing +1)]
### demo function call
demo_output_ex2 = to_ts(demo_df_ex2)
demo_invalid_ex2 = to_ts(demo_invalid_df_ex2)
print('Demo output')
print(demo_output_ex2)
print()
print('Demo handling invalid input')
print(demo_invalid_ex2)
The cell below will test your solution for Exercise 2. The testing variables will be available for debugging under the following names in a dictionary format.
input_vars
- Input variables for your solution. original_input_vars
- Copy of input variables from prior to running your solution. These should be the same as input_vars
- otherwise the inputs were modified by your solution.returned_output_vars
- Outputs returned by your solution.true_output_vars
- The expected output. This should "match" returned_output_vars
based on the question requirements - otherwise, your solution is not returning the correct output. ### test_cell_ex2
###
### AUTOGRADER TEST - DO NOT REMOVE
###
from tester_fw.testers import Tester
conf = {
'case_file':'tc_2',
'func': to_ts, # replace this with the function defined above
'inputs':{ # input config dict. keys are parameter names
'df':{
'dtype':'pd.DataFrame', # data type of param.
'check_modified':True,
}
},
'outputs':{
'output_0':{
'index':0,
'dtype':'np.ndarray',
'check_dtype': True,
'check_col_dtypes': True, # Ignored if dtype is not df
'check_col_order': True, # Ignored if dtype is not df
'check_row_order': True, # Ignored if dtype is not df
'check_column_type': True, # Ignored if dtype is not df
'float_tolerance': 10 ** (-10)
}
}
}
tester = Tester(conf, key=b'z0BNF11iKYQicR63590bVXZGa19YGvJcmzrbP6R7oAY=', path='resource/asnlib/publicdata/')
for _ in range(200):
try:
tester.run_test()
(input_vars, original_input_vars, returned_output_vars, true_output_vars) = tester.get_test_vars()
except:
(input_vars, original_input_vars, returned_output_vars, true_output_vars) = tester.get_test_vars()
raise
###
### AUTOGRADER TEST - DO NOT REMOVE
###
print('Passed! Please submit.')
Eventually, we are going to plot some of the time series data, so we will need a date axis to provide context for users. We can extract this from our source DataFrame.
Define the function date_series(df, n)
. The input df
will have these columns ['Year', 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
in that order. Also the 'Year'
column will be sorted in ascending order. We are unconcerned with the values or types in any other columns. The input n
will be a positive integer smaller than 12*df.shape[0]
.
Your function should return a Pandas Series with dtype of datetime64
containing the timestamp for midnight on the first day of the first n
months represented in df
. The pd.to_datetime()
function is useful in converting the dates.
### Define demo inputs
demo_df_ex3 = pd.DataFrame(columns=['Year', 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'])
demo_df_ex3['Year'] = [1961, 1962, 1963]
demo_n_ex3 = 14
display(demo_df_ex3)
The demo included in the solution cell below should display the following output:
0 1961-01-01
1 1961-02-01
2 1961-03-01
3 1961-04-01
4 1961-05-01
5 1961-06-01
6 1961-07-01
7 1961-08-01
8 1961-09-01
9 1961-10-01
10 1961-11-01
11 1961-12-01
12 1962-01-01
13 1962-02-01
dtype: datetime64[ns]
Notice that the items are datetime64
and not strings.
### Exercise 3 solution
def date_series(df, n):
date_list = []
for y in df['Year']:
for m in range(1,13):
date_str = f'{y}-{m}-01'
date_list.append(date_str)
date_series = pd.Series(date_list[:n])
return pd.to_datetime(date_series)
### demo function call
demo_output_ex3 = date_series(demo_df_ex3, demo_n_ex3)
demo_output_ex3
The cell below will test your solution for Exercise 3. The testing variables will be available for debugging under the following names in a dictionary format.
input_vars
- Input variables for your solution. original_input_vars
- Copy of input variables from prior to running your solution. These should be the same as input_vars
- otherwise the inputs were modified by your solution.returned_output_vars
- Outputs returned by your solution.true_output_vars
- The expected output. This should "match" returned_output_vars
based on the question requirements - otherwise, your solution is not returning the correct output. ### test_cell_ex3
###
### AUTOGRADER TEST - DO NOT REMOVE
###
from tester_fw.testers import Tester
conf = {
'case_file':'tc_3',
'func': date_series, # replace this with the function defined above
'inputs':{ # input config dict. keys are parameter names
'df':{
'dtype':'pd.DataFrame', # data type of param.
'check_modified':True,
},
'n':{
'dtype':'int', # data type of param.
'check_modified':False,
}
},
'outputs':{
'output_0':{
'index':0,
'dtype':'pd.Series',
'check_dtype': True,
'check_col_dtypes': True, # Ignored if dtype is not df
'check_col_order': True, # Ignored if dtype is not df
'check_row_order': True, # Ignored if dtype is not df
'check_column_type': True, # Ignored if dtype is not df
'float_tolerance': 10 ** (-6)
}
}
}
tester = Tester(conf, key=b'z0BNF11iKYQicR63590bVXZGa19YGvJcmzrbP6R7oAY=', path='resource/asnlib/publicdata/')
for _ in range(70):
try:
tester.run_test()
(input_vars, original_input_vars, returned_output_vars, true_output_vars) = tester.get_test_vars()
except:
(input_vars, original_input_vars, returned_output_vars, true_output_vars) = tester.get_test_vars()
raise
###
### AUTOGRADER TEST - DO NOT REMOVE
###
print('Passed! Please submit.')
We have the CPI data re-organized into a time series. We are concerned with inflation, which is the multiplicative change in CPI over some time interval. We will need to transform the data a final time to get an inflation time series.
Define the function multiplicative_change(ts, lag)
. The input ts
is a 1-D array of floats representing monthly observations of the CPI. The input lag
is an integer indicating the time interval we want to measure inflation over in months.
Your function should implement the following formula to calculate $\hat{x}$ and return the result as a 1-D array. In the mathematical notation $x$ is ts
, and $\ell$ is lag
:
$$\hat{x_i} = \frac{x_i - x_{i-\ell}}{x_{i-\ell}}$$
Note that by this definition the first $\ell$ (or lag
) entries in $\hat{x}$ are undefined. The output will start with the first defined value.
### Define demo inputs
demo_ts_ex4 = np.array([100., 150., 180., 216., 324.])
### Exercise 4 solution
def multiplicative_change(ts, lag):
return (ts[lag:] - ts[:-lag])/ts[:-lag]
### demo function call
demo_output_ex4_lag_1 = multiplicative_change(demo_ts_ex4, 1)
demo_output_ex4_lag_2 = multiplicative_change(demo_ts_ex4, 2)
print('lag of 1')
print(demo_output_ex4_lag_1)
print()
print('lag of 2')
print(demo_output_ex4_lag_2)
The cell below will test your solution for Exercise 4. The testing variables will be available for debugging under the following names in a dictionary format.
input_vars
- Input variables for your solution. original_input_vars
- Copy of input variables from prior to running your solution. These should be the same as input_vars
- otherwise the inputs were modified by your solution.returned_output_vars
- Outputs returned by your solution.true_output_vars
- The expected output. This should "match" returned_output_vars
based on the question requirements - otherwise, your solution is not returning the correct output. ### test_cell_ex4
###
### AUTOGRADER TEST - DO NOT REMOVE
###
from tester_fw.testers import Tester
conf = {
'case_file':'tc_4',
'func': multiplicative_change, # replace this with the function defined above
'inputs':{ # input config dict. keys are parameter names
'ts':{
'dtype':'np.ndarray', # data type of param.
'check_modified':True,
},
'lag':{
'dtype':'int', # data type of param.
'check_modified':True,
}
},
'outputs':{
'output_0':{
'index':0,
'dtype':'',
'check_dtype': True,
'check_col_dtypes': True, # Ignored if dtype is not df
'check_col_order': True, # Ignored if dtype is not df
'check_row_order': True, # Ignored if dtype is not df
'check_column_type': True, # Ignored if dtype is not df
'float_tolerance': 10 ** (-6)
}
}
}
tester = Tester(conf, key=b'z0BNF11iKYQicR63590bVXZGa19YGvJcmzrbP6R7oAY=', path='resource/asnlib/publicdata/')
for _ in range(70):
try:
tester.run_test()
(input_vars, original_input_vars, returned_output_vars, true_output_vars) = tester.get_test_vars()
except:
(input_vars, original_input_vars, returned_output_vars, true_output_vars) = tester.get_test_vars()
raise
###
### AUTOGRADER TEST - DO NOT REMOVE
###
print('Passed! Please submit.')
The following two exercises will focus on implementing two triple time-series analysis techniques, triple and double exponential smoothing. The high level idea for simple smoothing is that we will make an initial guess, compare it with the observation, and use that information to improve our guess on the following observation. For double smoothing, we will do this on two levels - adjusting successive guesses for the observations themselves as well as for the difference between observations in an attempt to capture any trends in our model.
This is the formula for our application of simple exponential smoothing. In the math notation $x_t$ is ts[t]
, and $\hat{x_t}$ is our prediction for $x_t$:
Initial conditions
For $t > 0$
When $\alpha$ is closer to 1 the model is more sensitive to recent observations. When $\alpha$ is closer to 0 the model is more sensitive to past observations.
Define the function simple_exp_smoothing(ts, alpha)
. The input ts
will be a 1-D numerical array (the vector $x$ from the formula above), and the input alpha
(the scalar $\alpha$ from the formula above) will be a floating point number between 0 and 1.
Your function should implement the formula above and return the vector $\hat{x}$ as a 1-D array.
np.nan
. ts
.### Define demo inputs
demo_ts_ex5 = np.array([100., 105., 120., 110., 115.])
The demo included in the solution cell below should display the following output:
[ nan 100. 105. 120. 110. 115.]
[ nan 100. 100. 100. 100. 100.]
[ nan 100. 102.5 111.25 110.625 112.8125]
The demo below will run your solution 3 times with alpha
values of 1
, 0
, and 0.5
.
### Exercise 5 solution
def simple_exp_smoothing(ts, alpha):
# t = 0
x_hat = np.array([np.nan]*(ts.shape[0] + 1))
s = np.array([ts[0]]*ts.shape[0])
# t > 0
for t in range(1, ts.shape[0]):
s[t] = alpha*ts[t] + (1-alpha)*s[t-1]
x_hat[1:] = s
return x_hat
### demo function call
print(simple_exp_smoothing(demo_ts_ex5, 1))
print(simple_exp_smoothing(demo_ts_ex5, 0))
print(simple_exp_smoothing(demo_ts_ex5, 0.5))
The cell below will test your solution for Exercise 5. The testing variables will be available for debugging under the following names in a dictionary format.
input_vars
- Input variables for your solution. original_input_vars
- Copy of input variables from prior to running your solution. These should be the same as input_vars
- otherwise the inputs were modified by your solution.returned_output_vars
- Outputs returned by your solution.true_output_vars
- The expected output. This should "match" returned_output_vars
based on the question requirements - otherwise, your solution is not returning the correct output. ### test_cell_ex5
###
### AUTOGRADER TEST - DO NOT REMOVE
###
from tester_fw.testers import Tester
conf = {
'case_file':'tc_5',
'func': simple_exp_smoothing, # replace this with the function defined above
'inputs':{ # input config dict. keys are parameter names
'ts':{
'dtype':'np.ndarray', # data type of param.
'check_modified':True,
},
'alpha':{
'dtype':'float', # data type of param.
'check_modified':True,
}
},
'outputs':{
'output_0':{
'index':0,
'dtype':'np.ndarray',
'check_dtype': True,
'check_col_dtypes': True, # Ignored if dtype is not df
'check_col_order': True, # Ignored if dtype is not df
'check_row_order': True, # Ignored if dtype is not df
'check_column_type': True, # Ignored if dtype is not df
'float_tolerance': 10 ** (-6)
}
}
}
tester = Tester(conf, key=b'z0BNF11iKYQicR63590bVXZGa19YGvJcmzrbP6R7oAY=', path='resource/asnlib/publicdata/')
for _ in range(70):
try:
tester.run_test()
(input_vars, original_input_vars, returned_output_vars, true_output_vars) = tester.get_test_vars()
except:
(input_vars, original_input_vars, returned_output_vars, true_output_vars) = tester.get_test_vars()
raise
###
### AUTOGRADER TEST - DO NOT REMOVE
###
print('Passed! Please submit.')
Now we will implement double exponential smoothing. For our implementation the formula is as follows:
For $t > 0$:
Define the function double_exp_smoothing(ts, alpha, beta)
. The input ts
will be a 1-D numerical array (the vector $x$ from the formula above), and the inputs alpha
and beta
(the scalars $\alpha$ and $\beta$ from the formula above) will be floating point numbers between 0 and 1.
Your function should implement the formula above and return the vector $\hat{x}$ as a 1-D array.
np.nan
. ts
.### Define demo inputs
demo_ts_ex6 = np.array([100., 105., 120., 110., 115.])
The demo included in the solution cell below should display the following output:
[nan 100. 102.5 111.25 110.625 112.8125]
[nan 100. 105. 122.5 120. 118.75]
[nan 100. 103.75 117.1875 117.109375 119.04296875]
[nan 100. 101.875 109.296875 112.45117188 116.38549805]
The demo below performs 4 runs with your solution. Each run with different alpha
or beta
parameters.
### Exercise 6 solution
def double_exp_smoothing(ts, alpha, beta):
# t = 0
x_hat = np.array([np.nan]*(ts.shape[0] + 1))
s = np.array([ts[0]]*ts.shape[0])
b = np.array([0.]*ts.shape[0])
# t > 0
for t in range(1, ts.shape[0]):
s[t] = alpha*ts[t] + (1-alpha)*(s[t-1]+b[t-1])
b[t] = beta*(s[t]-s[t-1]) + (1-beta)*b[t-1]
x_hat[1:] = s + b
return x_hat
print(double_exp_smoothing(demo_ts_ex6, alpha=0.5, beta=0))
print(double_exp_smoothing(demo_ts_ex6, alpha=0.5, beta=1))
print(double_exp_smoothing(demo_ts_ex6, alpha=0.5, beta=0.5))
print(double_exp_smoothing(demo_ts_ex6, alpha=0.25, beta=0.5))
The cell below will test your solution for Exercise 6. The testing variables will be available for debugging under the following names in a dictionary format.
input_vars
- Input variables for your solution. original_input_vars
- Copy of input variables from prior to running your solution. These should be the same as input_vars
- otherwise the inputs were modified by your solution.returned_output_vars
- Outputs returned by your solution.true_output_vars
- The expected output. This should "match" returned_output_vars
based on the question requirements - otherwise, your solution is not returning the correct output. ### test_cell_ex6
###
### AUTOGRADER TEST - DO NOT REMOVE
###
from tester_fw.testers import Tester
conf = {
'case_file':'tc_6',
'func': double_exp_smoothing, # replace this with the function defined above
'inputs':{ # input config dict. keys are parameter names
'ts':{
'dtype':'np.ndarray', # data type of param.
'check_modified':True,
},
'alpha':{
'dtype':'float', # data type of param.
'check_modified':True,
},
'beta':{
'dtype':'float', # data type of param.
'check_modified':True,
}
},
'outputs':{
'output_0':{
'index':0,
'dtype':'np.ndarray',
'check_dtype': True,
'check_col_dtypes': True, # Ignored if dtype is not df
'check_col_order': True, # Ignored if dtype is not df
'check_row_order': True, # Ignored if dtype is not df
'check_column_type': True, # Ignored if dtype is not df
'float_tolerance': 10 ** (-6)
}
}
}
tester = Tester(conf, key=b'z0BNF11iKYQicR63590bVXZGa19YGvJcmzrbP6R7oAY=', path='resource/asnlib/publicdata/')
for _ in range(70):
try:
tester.run_test()
(input_vars, original_input_vars, returned_output_vars, true_output_vars) = tester.get_test_vars()
except:
(input_vars, original_input_vars, returned_output_vars, true_output_vars) = tester.get_test_vars()
raise
###
### AUTOGRADER TEST - DO NOT REMOVE
###
print('Passed! Please submit.')
Calculus won't work for finding the optimal parameters for either flavor of exponential smoothing implemented above. A "brute-force" alternative is to generate a list of suitable candidates for each parameter and test our functions on all possible combinations. This is called a grid search. It's not flashy, but for a lot of modeling techniques it's one of the only suitable choices.
Define the function build_grid(params)
. The input params
will be a dictionary mapping strings to arrays (or array-like data structures such as list
s). Consider each string a parameter name and each value in the corresponding array to be a candidate value for that parameter.
Your function should return a list of dictionaries which satisfies the following:
For example, build_params({'b': [1, 2], 'z': [3, 5, 6], 'a': [100, 10]})
should return:
[{'b': 1, 'z': 3, 'a': 10},
{'b': 1, 'z': 5, 'a': 10},
{'b': 1, 'z': 6, 'a': 10},
{'b': 2, 'z': 3, 'a': 10},
{'b': 2, 'z': 5, 'a': 10},
{'b': 2, 'z': 6, 'a': 10},
{'b': 1, 'z': 3, 'a': 100},
{'b': 1, 'z': 5, 'a': 100},
{'b': 1, 'z': 6, 'a': 100},
{'b': 2, 'z': 3, 'a': 100},
{'b': 2, 'z': 5, 'a': 100},
{'b': 2, 'z': 6, 'a': 100}]
Notice the following:
2*3*2
)possible combinations. 'a'
value, then by the 'b'
value, and finally by the 'z'
value.Keep in mind that the function needs to work for an arbitrary dictionary (the number of keys, the keys themselves, and the values can be anything which meets the which has the structure given earlier in the prompt).
Note: You may find the functions itertools.product
or numpy.meshgrid
helpful in solving this problem.
### Define demo inputs
demo_params_ex7 = {'b': [1, 2], 'z': [3, 5, 6], 'a': [100, 10]}
### Exercise 7 solution
### Using np.meshgrid
def build_grid(params):
keys = sorted(params.keys())
grid = np.meshgrid(*params.values())
grid = [ar.reshape(np.prod(ar.shape),1) for ar in grid]
grid_arr = np.concatenate(grid, axis=1)
param_dicts = [{k: row[i] for i, k in enumerate(params.keys())} for row in grid_arr]
return sorted(param_dicts, key=lambda d: tuple(d[k] for k in keys))
### Using itertools.product
def build_grid(params):
from itertools import product
keys = list(params.keys())
prod = list(product(*params.values()))
grid = [dict(zip(keys, t)) for t in prod]
def key_func(d):
ordered_keys = sorted(d.keys())
return tuple(d[k] for k in ordered_keys)
return sorted(grid, key=key_func)
### demo function call
build_grid(demo_params_ex7)
The cell below will test your solution for Exercise 7. The testing variables will be available for debugging under the following names in a dictionary format.
input_vars
- Input variables for your solution. original_input_vars
- Copy of input variables from prior to running your solution. These should be the same as input_vars
- otherwise the inputs were modified by your solution.returned_output_vars
- Outputs returned by your solution.true_output_vars
- The expected output. This should "match" returned_output_vars
based on the question requirements - otherwise, your solution is not returning the correct output. ### test_cell_ex7
###
### AUTOGRADER TEST - DO NOT REMOVE
###
from tester_fw.testers import Tester
conf = {
'case_file':'tc_7',
'func': build_grid, # replace this with the function defined above
'inputs':{ # input config dict. keys are parameter names
'params':{
'dtype':'dict', # data type of param.
'check_modified':False,
}
},
'outputs':{
'output_0':{
'index':0,
'dtype':'',
'check_dtype': True,
'check_col_dtypes': True, # Ignored if dtype is not df
'check_col_order': True, # Ignored if dtype is not df
'check_row_order': True, # Ignored if dtype is not df
'check_column_type': True, # Ignored if dtype is not df
'float_tolerance': 10 ** (-6)
}
}
}
tester = Tester(conf, key=b'z0BNF11iKYQicR63590bVXZGa19YGvJcmzrbP6R7oAY=', path='resource/asnlib/publicdata/')
for _ in range(70):
try:
tester.run_test()
(input_vars, original_input_vars, returned_output_vars, true_output_vars) = tester.get_test_vars()
except:
(input_vars, original_input_vars, returned_output_vars, true_output_vars) = tester.get_test_vars()
raise
###
### AUTOGRADER TEST - DO NOT REMOVE
###
print('Passed! Please submit.')
In order to implement a grid search we need some metric of evaluating how well each model fits the data. We have provided mse
to calculate the mean squared error for the predictions coming from a model. It takes two 1-D array arguments, obs
(observed values) and preds
(predicted values). It returns the mean of the squared difference between observations and predictions. A lower output from mse
indicates that the model is a better fit.
def mse(obs, preds):
r = obs-preds
return np.power(r,2).mean()
Now that we have built our parameter grid, it's time to test it.
Define the function grid_search(func, ts, grid, n_back)
. The inputs are as follows.
func
: an arbitrary function similar to the ones which we defined in exercises 5 and 6. It takes a 1-D time-series array and some additional parameters as inputs. It returns predictions for the time series based on some logic of which we are unaware.func(...)[i]
to be the prediction that corresponds with ts[i]
. func(...)[-1]
is the prediction for the next value of ts
which has not been observed yet. ts
: a 1-D numerical array which is a suitable input for func
. Think of this as time-series data that we want to fit some model defined by func
.grid
: a list of dictionaries. Each dictionary maps the remaining parameters for func
to values. You can assume that the keys in these dictionaries match the remaining named parameters for func
. n_back
: a positive integer smaller than ts.shape[0]
.Your function should iteratively search the parameter sets given in grid
to determine the set which results in the best model (the one with the lowest mse
) given func
and ts
. Since exponential smoothing takes time to ramp up, you should only consider the last n_back
observations in mse
calculations. If there are multiple parameter sets which result in the same mse
the parameter set encountered first in grid
should be chosen.
You can follow this whiteboard-level algorithm to implement your search.
mse
returned so far in the search.dict
(we will call the dict params
) in grid
.func(ts, **params)
will calculate the predictions.n_back
observations are in the slices.mse
of the slices.mse
has improved.dict
s in grid
have been checked, return the best set of parameters.### Define demo inputs
def demo_func_ex8(ts, a, b):
return np.concatenate(( np.array([np.nan]),
((5*a)/(7*b))*ts[1:],
np.array([50.])))
demo_ts_ex8 = np.array([51.1, 61.7, 34.92, 7.97, 84.03, 29.65, 85.86, 95.4, 82., 36.61])
demo_grid_ex8 = [ {'a': 2, 'b': 5},
{'a': 2, 'b': 11},
{'a': 3, 'b': 5},
{'a': 3, 'b': 11},
{'a': 21, 'b': 15},
{'a': 7, 'b': 11},
{'a': 14, 'b': 10}]
demo_n_back_ex8 = 9
The demo included in the solution cell below should display the following output:
{'a': 21, 'b': 15}
Notice:
func
will return "predictions" $\hat{x}_t = \frac{5a}{7b}x_t$ whenever $0 < t \le 9$.n_back
parameter we are only looking at the last 9 observations (same interval as in the above point).{'a': 21, 'b': 15}
and {'a': 14, 'b': 10}
match this form, so we choose {'a': 21, 'b': 15}
which appeared first in grid
.### Exercise 8 solution
def grid_search(func, ts, grid, n_back):
min_mse, best_params = np.inf, None
for params in grid:
preds = func(ts, **params)
cur_mse = mse(ts[-n_back:], preds[-(n_back+1):-1])
if cur_mse < min_mse:
min_mse, best_params = cur_mse, params
return best_params
### demo function call
grid_search(func=demo_func_ex8, ts=demo_ts_ex8, grid=demo_grid_ex8, n_back=9)
The cell below will test your solution for Exercise 8. The testing variables will be available for debugging under the following names in a dictionary format.
input_vars
- Input variables for your solution. original_input_vars
- Copy of input variables from prior to running your solution. These should be the same as input_vars
- otherwise the inputs were modified by your solution.returned_output_vars
- Outputs returned by your solution.true_output_vars
- The expected output. This should "match" returned_output_vars
based on the question requirements - otherwise, your solution is not returning the correct output. ### test_cell_ex8
###
### AUTOGRADER TEST - DO NOT REMOVE
###
from tester_fw.testers import Tester
conf = {
'case_file':'tc_8',
'func': grid_search, # replace this with the function defined above
'inputs':{ # input config dict. keys are parameter names
'func':{
'dtype':'function', # data type of param.
'check_modified':False,
},
'ts':{
'dtype':'np.ndarray', # data type of param.
'check_modified':True,
},
'grid':{
'dtype':'dict', # data type of param.
'check_modified':True,
},
'n_back':{
'dtype':'int', # data type of param.
'check_modified':False,
}
},
'outputs':{
'output_0':{
'index':0,
'dtype':'dict',
'check_dtype': True,
'check_col_dtypes': True, # Ignored if dtype is not df
'check_col_order': True, # Ignored if dtype is not df
'check_row_order': True, # Ignored if dtype is not df
'check_column_type': True, # Ignored if dtype is not df
'float_tolerance': 10 ** (-6)
}
}
}
tester = Tester(conf, key=b'z0BNF11iKYQicR63590bVXZGa19YGvJcmzrbP6R7oAY=', path='resource/asnlib/publicdata/')
for _ in range(70):
try:
tester.run_test()
(input_vars, original_input_vars, returned_output_vars, true_output_vars) = tester.get_test_vars()
except:
(input_vars, original_input_vars, returned_output_vars, true_output_vars) = tester.get_test_vars()
raise
###
### AUTOGRADER TEST - DO NOT REMOVE
###
print('Passed! Please submit.')
Fin. If you have made it this far, congratulations on completing the exam. Don't forget to submit!
We have used the functions developed above to fit simple and double exponential smoothing models. If you have correctly completed all of the exercises then you will get the same results by running this code:
### Load data
cpi_all_df = pd.read_csv('resource/asnlib/publicdata/cpi_urban_all.csv')
### Clean data
cpi_clean_df = cleanup_df(cpi_all_df, ['HALF1', 'HALF2'])
### Reshape to time-series array
cpi_ts = to_ts(cpi_clean_df)
### Calculate inflation (change lag of one month)
monthly_inflation = multiplicative_change(cpi_ts, 1)
### Determine corresponding dates for plotting
dates = date_series(cpi_clean_df, cpi_ts.shape[0])
### Calculate params for simple smoothing
simple_params = grid_search(
func=simple_exp_smoothing,
ts=monthly_inflation,
grid=build_grid({'alpha': np.linspace(0, 1, 21)}),
n_back=360)
### Calculate params for double smoothing
double_params = grid_search(
func=double_exp_smoothing,
ts=monthly_inflation,
grid=build_grid({'alpha': np.linspace(0, 1, 21), 'beta': np.linspace(0, 1, 21)}),
n_back=360)
### Make predictions
simple_preds = simple_exp_smoothing(monthly_inflation, **simple_params)
double_preds = double_exp_smoothing(monthly_inflation, **double_params)
We will load our pre-computed results in case you did not complete all of the exercises.
Let's start by plotting the month-over-month inflation.
with open('resource/asnlib/publicdata/monthly_inflation.pkl', 'rb') as f:
monthly_inflation = pickle.load(f)
with open('resource/asnlib/publicdata/dates.pkl', 'rb') as f:
dates = pickle.load(f)
plt.rcParams["figure.figsize"] = (12, 9)
plt.plot(dates[1:], monthly_inflation)
We see a lot of volitility over the first few decades. Let's look at the last 30 years...
plt.plot(dates[-360:], monthly_inflation[-360:])
This looks like a more stationary time-series. There's one outlier in 2008. Aside from that it looks like the month-to-month inflation rate is bouncing around a more-or-less constant value.
Now, let's see what parameters we got from the grid search...
with open('resource/asnlib/publicdata/simple_params.pkl', 'rb') as f:
simple_params = pickle.load(f)
with open('resource/asnlib/publicdata/double_params.pkl', 'rb') as f:
double_params = pickle.load(f)
print(f'Best simple params: {simple_params}')
print(f'Best double params: {double_params}')
There are two interesting take-aways from this choice of parameter. The first is from the alpha
parameter. The value relatively close to 1 means that the best model takes 75% of it's prediction from the most recent observation and places relatively little importance on older observations. In layman's terms, the model does not think that the inflation rate is likely to change from whatever it was in the previous month.
The second takeaway is that the beta
parameter is 0. This means that the best model does not account for any trend. In other words if the inflation rate goes up in one month it does not imply that it will go up (or down) again in the following month. Note: The double exponential smoothing model with beta=0
is equivalent to the simple exponential smoothing model!
Now, let's take a look at the predicted vs. observed inflation rates. We will look at a shorter time interval so it's easy to compare...
with open('resource/asnlib/publicdata/simple_preds.pkl', 'rb') as f:
simple_preds = pickle.load(f)
simple_mse = mse(monthly_inflation[-360:], simple_preds[-361:-1])
print(simple_mse)
plt.plot(dates[-120:], monthly_inflation[-120:], label='Observed', marker='o')
plt.plot(dates[-120:], simple_preds[-121:-1], label='Predicted', marker='x')
plt.legend()
plt.figure()
plt.plot(dates[-360:], monthly_inflation[-360:] - simple_preds[-361:-1], label='Residuals')
plt.legend()
The predictions are not terrible. There seems to be a sort of "seasonal rhythm" to the observations. We can account for seasonality by using a more complex model. We have fit and generated predictions using the Holt-Winters method (so-called triple exponential smoothing). This method accounts for signal, trend, and cyclal seasonal effects. Let's take a look!
with open('resource/asnlib/publicdata/triple_preds.pkl', 'rb') as f:
triple_preds = pickle.load(f)
triple_mse = mse(monthly_inflation[-360:], triple_preds[-361:-1])
print(triple_mse)
plt.plot(dates[-120:], monthly_inflation[-120:], label='Observed', marker='o')
plt.plot(dates[-120:], triple_preds[-121:-1], label='Predicted', marker='x')
plt.legend()
plt.figure()
plt.plot(dates[-360:], monthly_inflation[-360:] - triple_preds[-361:-1], label='Residuals')
plt.legend()
The Holt-Winters model did not offer much improvement.
Here's the take-aways from this analysis: