Final Exam Fall 2025: Running¶Version 1.0.0
All of the header information is important. Please read it..
Topics number of exercises: This problem builds on your knowledge of GPS data processing, geospatial analysis, SQL for sports analytics, and computer vision for route classification. It has 11 exercises numbered 0 to 10. There are 21 available points. However to earn 100% the threshold is 16 points. (Therefore once you hit 16 points you can stop. There is no extra credit for exceeding this threshold.)
Exercise ordering: Each exercise builds logically on previous exercises but you may solve them in any order. That is if you can't solve an exercise you can still move on and try the next one. Use this to your advantage as the exercises are not necessarily ordered in terms of difficulty. Higher point values generally indicate more difficult exercises.
Demo cells: Code cells starting with the comment ### Run Me!!! load results from prior exercises applied to the entire data set and use those to build demo inputs. These must be run for subsequent demos to work properly but they do not affect the test cells. The data loaded in these cells may be rather large (at least in terms of human readability). You are free to print or otherwise use Python to explore them but we may not print them in the starter code.
Debugging your code: Right before each exercise test cell there is a block of text explaining the variables available to you for debugging. You may use these to test your code and can print/display them as needed (careful when printing large objects you may want to print the head or chunks of rows at a time).
Exercise point breakdown:
Exercise 0 - : 1 point(s)
Exercise 1 - : 3 point(s)
Exercise 2 - : 2 point(s)
Exercise 3 - : 2 point(s)
Exercise 4 - : 2 point(s)
Exercise 5 - : 2 point(s)
Exercise 6 - : 2 point(s)
Exercise 7 - : 3 point(s)
Exercise 8 - : 1 point(s)
Exercise 9 - : 2 point(s)
Exercise 10 - : 1 point(s)
Final reminders:
### Global imports
import dill
from cse6040_devkit import plugins, utils
from cse6040_devkit.training_wheels import run_with_timeout, suppress_stdout
import tracemalloc
from time import time
import re
import pandas as pd
import pprint
import sqlite3
import numpy as np
from pprint import pprint
utils.add_from_file('handle_gdf', plugins)
For this exam, you are given information about different running metrics. From that data, you will:
conn = sqlite3.connect('resource/asnlib/publicdata/running.db')
explore_data__FREE
Example: we have defined explore_data__FREE as follows:
This is a free exercise!
Please run the test cell below to collect your FREE point!
The output will show the structure of the GPX file and the database tables which we will use for the following exercises.
### Solution - Exercise 0
def explore_data__FREE(name)->list:
if '.gpx' in name:
with open(name) as f:
lines=[line for line in f.readlines()]
return'\n'.join(lines)
else:
return pd.read_sql_query(f'''SELECT * FROM {name} LIMIT 10''', conn)
### Demo function call
name="resource/asnlib/publicdata/shape_run_paris2.gpx"
print(f'===============\n{name}\n===============\n')
print(explore_data__FREE(name)[:1000])
name="df_heartrate"
print(f'\n\n===============\n{name}\n===============\n')
display(explore_data__FREE(name))
name="df_races"
print(f'\n\n===============\n{name}\n===============\n')
display(explore_data__FREE(name))
The test cell below will always pass. Please submit to collect your free points for explore_data__FREE (exercise 0).
### Test Cell - Exercise 0
print('Passed! Please submit.')
parsegpx_string
Your task: define parsegpx_string as follows:
Implement a function, parsegpx_string(xmlstring), which parses a list of latitudes and longitudes. The input string is taken from a .gpx file which is extended XML.
Inputs:
xmlstring: A string extracted from a .gpx fileReturn:
latitudes_longitudes: list of dictionaries containing lat and lon key:value pairs for each entry found within the xmlstringRequirements/steps:
trkpt tag (which stands for track point and is found within a trk (track) and trkseg (track segment)), there will be a lat and lon field.lat and lon and store in a list of dictionaries containing lat and lon key:value pairslat and lon strings must only include the characters 0-9, ., +, and -.### Solution - Exercise 1
def parsegpx_string(xmlstring:str)->list:
### BEGIN SOLUTION
import re
pattern = re.compile(r'''<trkpt lat="([\+\-0-9\.]+)" lon="([\+\-0-9\.]+)">''')
return [{'lat':lat,'lon':lon} for lat, lon in pattern.findall(xmlstring)]
### END SOLUTION
### Demo function call
demo_ex0_xmlstring = utils.load_object_from_publicdata('demo_ex0_xmlstring.dill')
demo_ex0_output = parsegpx_string(demo_ex0_xmlstring)
pprint(demo_ex0_output)
The demo should display this printed output.
[{'lat': '+48.8722677', 'lon': '-2.3372904999999995'},
{'lat': '48.872473199999995', 'lon': '2.3373655'},
{'lat': '48.8724855', 'lon': '2.3374103000000006'},
{'lat': '48.874067499999995', 'lon': '2.3454081'},
{'lat': '48.8740639', 'lon': '2.3464527'},
{'lat': '48.874060699999994', 'lon': '2.347187'},
{'lat': '48.872178299999995', 'lon': '2.3501147999999996'}]
The cell below will test your solution for parsegpx_string (exercise 1). The testing variables will be available for debugging under the following names in a dictionary format.
input_vars - Input variables for your solution. original_input_vars - Copy of input variables from prior to running your solution. Any key:value pair in original_input_vars should also exist in input_vars - otherwise the inputs were modified by your solution. returned_output_vars - Outputs returned by your solution. true_output_vars - The expected output. This should "match" returned_output_vars based on the question requirements - otherwise, your solution is not returning the correct output. ### Test Cell - Exercise 1
from cse6040_devkit.tester_fw.testers import Tester
from yaml import safe_load
from time import time
tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")
# Load testing utility
with open('resource/asnlib/publicdata/execute_tests', 'rb') as f:
executor = dill.load(f)
@run_with_timeout(error_threshold=200.0, warning_threshold=100.0)
@suppress_stdout
def execute_tests(**kwargs):
return executor(**kwargs)
# Execute test
start_time = time()
passed, test_case_vars, e = execute_tests(func=parsegpx_string,
ex_name='parsegpx_string',
key=b'R1r08DBgQQHILDOw___OgsG_1QX-_jJAGLag1EdnTPI=',
n_iter=21)
# Assign test case vars for debugging
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to parsegpx_string did not pass the test.'
### BEGIN HIDDEN TESTS
start_time = time()
tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")
passed, test_case_vars, e = execute_tests(func=parsegpx_string,
ex_name='parsegpx_string',
key=b'1kIeYcEN-UGgzaeKRsm5fCi9viAok6jEOXn6ctZCznw=',
n_iter=21,
hidden=True)
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to parsegpx_string did not pass the test.'
### END HIDDEN TESTS
print('Passed! Please submit.')
next_destination_frequency
Your task: define next_destination_frequency as follows:
Implement a function which calculates the conditional probability of going from an origin point a to the next point b.
Inputs:
run_records: A dictionary with the key representing a run_id and the values are an ordered list of points that the runner ran to.Return:
next_dest_dict: a nested dictionary where: aba to b based on starting at a, rounded to 3 decimal placesSimple Example:
{0: ['A', 'B', 'C'], 1: ['A','B'], 2: ['A','C']}
{'A': {'B': 0.667, 'C': 0.333}, 'B': {'C': 1.000} }
Requirements/steps:
### Solution - Exercise 2
def next_destination_frequency(run_records: dict) -> dict:
### BEGIN SOLUTION
from collections import defaultdict, Counter
next_dest_dict = defaultdict(Counter)
for run_id, dest_list in run_records.items():
for i in range(0, len(dest_list)-1):
origin = dest_list[i]
dest = dest_list[i+1]
next_dest_dict[origin][dest] += 1
for key in next_dest_dict.keys():
total = sum(next_dest_dict[key].values())
for inner_key, value in next_dest_dict[key].items():
next_dest_dict[key][inner_key] = round(value / total,3)
next_dest_dict = dict(next_dest_dict)
return {k:{kk:vv for kk,vv in v.items()} for k,v in next_dest_dict.items()} #remove Counter from dict outputs
### END SOLUTION
### Demo function call
demo_run_records = utils.load_object_from_publicdata('demo_run_records.dill')
demo_next_dest_dict = next_destination_frequency(demo_run_records)
pprint(demo_next_dest_dict)
The demo should display this printed output.
{'Apollo Theater': {'Bronx Zoo': 0.333,
'Central Park': 0.333,
'Citi Field': 0.333},
'Battery Park': {'Apollo Theater': 0.667, 'Brooklyn Public Library': 0.333},
'Bronx Zoo': {'Battery Park': 0.5, 'Brooklyn Public Library': 0.5},
'Brooklyn Bridge': {'Central Park': 0.5, 'The Sphere': 0.5},
'Brooklyn Public Library': {'Battery Park': 0.333,
'Bronx Zoo': 0.167,
'Citi Field': 0.167,
'The Sphere': 0.333},
'Central Park': {'Brooklyn Public Library': 0.667, 'Citi Field': 0.333},
'Citi Field': {'Brooklyn Bridge': 0.333, 'Brooklyn Public Library': 0.667},
'The Sphere': {'Apollo Theater': 0.333,
'Brooklyn Bridge': 0.333,
'Central Park': 0.333}}
The cell below will test your solution for next_destination_frequency (exercise 2). The testing variables will be available for debugging under the following names in a dictionary format.
input_vars - Input variables for your solution. original_input_vars - Copy of input variables from prior to running your solution. Any key:value pair in original_input_vars should also exist in input_vars - otherwise the inputs were modified by your solution. returned_output_vars - Outputs returned by your solution. true_output_vars - The expected output. This should "match" returned_output_vars based on the question requirements - otherwise, your solution is not returning the correct output. ### Test Cell - Exercise 2
from cse6040_devkit.tester_fw.testers import Tester
from yaml import safe_load
from time import time
tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")
# Load testing utility
with open('resource/asnlib/publicdata/execute_tests', 'rb') as f:
executor = dill.load(f)
@run_with_timeout(error_threshold=200.0, warning_threshold=100.0)
@suppress_stdout
def execute_tests(**kwargs):
return executor(**kwargs)
# Execute test
start_time = time()
passed, test_case_vars, e = execute_tests(func=next_destination_frequency,
ex_name='next_destination_frequency',
key=b'R1r08DBgQQHILDOw___OgsG_1QX-_jJAGLag1EdnTPI=',
n_iter=21)
# Assign test case vars for debugging
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to next_destination_frequency did not pass the test.'
### BEGIN HIDDEN TESTS
start_time = time()
tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")
passed, test_case_vars, e = execute_tests(func=next_destination_frequency,
ex_name='next_destination_frequency',
key=b'1kIeYcEN-UGgzaeKRsm5fCi9viAok6jEOXn6ctZCznw=',
n_iter=21,
hidden=True)
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to next_destination_frequency did not pass the test.'
### END HIDDEN TESTS
print('Passed! Please submit.')
getdistances
Your task: define getdistances as follows:
Implement a function to calculate the distance from 2 points of latitudes/longitudes.
Inputs:
d: A Pandas DataFrame which contains lat and lon columnsReturn:
distances: a list of distances, rounded to 5 decimal places, between subsequent points of latitudes/longitudesRequirements/steps:
### Solution - Exercise 3
def getdistances(d: pd.DataFrame) -> list:
from math import sin, cos, sqrt, acos, radians
r=3956
### BEGIN SOLUTION
lon=[float(i) for i in d['lon'].to_list()]
lat=[float(i) for i in d['lat'].to_list()]
distances=[]
for i in range(len(lon)-1):
lat1=radians(lat[i])
lon1=radians(lon[i])
lat2=radians(lat[i+1])
lon2=radians(lon[i+1])
dlon = lon2 - lon1
a = sin(lat1) * sin(lat2) + cos(lat1) * cos(lat2) * cos(dlon)
central_angle = acos(a)
distances.append(round(r * central_angle,5))
return distances
### END SOLUTION
### Demo function call
demo_getdistances_d = utils.load_object_from_publicdata('demo_getdistances_d.dill')
demo_getdistances_output = getdistances(demo_getdistances_d)
pprint(demo_getdistances_output)
The demo should display this printed output.
[0.24721, 0.20106, 0.40918, 0.14515]
The cell below will test your solution for getdistances (exercise 3). The testing variables will be available for debugging under the following names in a dictionary format.
input_vars - Input variables for your solution. original_input_vars - Copy of input variables from prior to running your solution. Any key:value pair in original_input_vars should also exist in input_vars - otherwise the inputs were modified by your solution. returned_output_vars - Outputs returned by your solution. true_output_vars - The expected output. This should "match" returned_output_vars based on the question requirements - otherwise, your solution is not returning the correct output. ### Test Cell - Exercise 3
from cse6040_devkit.tester_fw.testers import Tester
from yaml import safe_load
from time import time
tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")
# Load testing utility
with open('resource/asnlib/publicdata/execute_tests', 'rb') as f:
executor = dill.load(f)
@run_with_timeout(error_threshold=200.0, warning_threshold=100.0)
@suppress_stdout
def execute_tests(**kwargs):
return executor(**kwargs)
# Execute test
start_time = time()
passed, test_case_vars, e = execute_tests(func=getdistances,
ex_name='getdistances',
key=b'R1r08DBgQQHILDOw___OgsG_1QX-_jJAGLag1EdnTPI=',
n_iter=21)
# Assign test case vars for debugging
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to getdistances did not pass the test.'
### BEGIN HIDDEN TESTS
start_time = time()
tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")
passed, test_case_vars, e = execute_tests(func=getdistances,
ex_name='getdistances',
key=b'1kIeYcEN-UGgzaeKRsm5fCi9viAok6jEOXn6ctZCznw=',
n_iter=21,
hidden=True)
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to getdistances did not pass the test.'
### END HIDDEN TESTS
print('Passed! Please submit.')
geopandasdata
Your task: define geopandasdata as follows:
Implement a function, geopandasdata(d), which produces a geoPandas GeoDataFrame object.
Maintain the same ordering as the input DataFrame
Inputs:
d: A pandas DataFrame with lat and lon as stringscrs: a string representing the Coordinate Reference System (CRS) which tells Python how those coordinates relate to places on EarthReturn:
path_gdf: a geoPandasDataFrame to be used for mappingRequirements/steps:
lat and lon columns from input d. geometry column which represents a GeometryArray. Hint: you should use points_from_xy.crs should be set to the crs input variabled### Solution - Exercise 4
def geopandasdata(d: pd.DataFrame,crs: str='EPSG:4326') -> list:
import geopandas as gpd
d2=d.copy(deep=True)
### BEGIN SOLUTION
def getpathcoords(d: pd.DataFrame) -> list:
def getlatlon(d):
lon=[float(i) for i in d['lon'].to_list()]
lat=[float(i) for i in d['lat'].to_list()]
return lat,lon
lat, lon=getlatlon(d)
return [(lat[i],lon[i]) for i in range(len(lon))]
path_coords=getpathcoords(d2)
path_gdf=gpd.GeoDataFrame(
d2,
geometry=gpd.points_from_xy([c[1] for c in path_coords],[c[0] for c in path_coords]),
crs=crs
)
return path_gdf
### END SOLUTION
### Demo function call
demo_getdistances_d = utils.load_object_from_publicdata('demo_getdistances_d.dill')
demo_getpathcoords_output = geopandasdata(demo_getdistances_d)
display(demo_getpathcoords_output)
The demo should display this output.
| lat | lon | geometry | |
|---|---|---|---|
| 0 | 48.8706362 | 2.3473592 | POINT (2.34736 48.87064) |
| 1 | 48.872920199999996 | 2.3431671000000005 | POINT (2.34317 48.87292) |
| 2 | 48.875697599999995 | 2.3418363000000006 | POINT (2.34184 48.87570) |
| 3 | 48.8738757 | 2.3504104000000003 | POINT (2.35041 48.87388) |
| 4 | 48.8740889 | 2.3472305999999996 | POINT (2.34723 48.87409) |
The cell below will test your solution for geopandasdata (exercise 4). The testing variables will be available for debugging under the following names in a dictionary format.
input_vars - Input variables for your solution. original_input_vars - Copy of input variables from prior to running your solution. Any key:value pair in original_input_vars should also exist in input_vars - otherwise the inputs were modified by your solution. returned_output_vars - Outputs returned by your solution. true_output_vars - The expected output. This should "match" returned_output_vars based on the question requirements - otherwise, your solution is not returning the correct output. ### Test Cell - Exercise 4
from cse6040_devkit.tester_fw.testers import Tester
from yaml import safe_load
from time import time
tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")
# Load testing utility
with open('resource/asnlib/publicdata/execute_tests', 'rb') as f:
executor = dill.load(f)
@run_with_timeout(error_threshold=200.0, warning_threshold=100.0)
@suppress_stdout
def execute_tests(**kwargs):
return executor(**kwargs)
# Execute test
start_time = time()
passed, test_case_vars, e = execute_tests(func=plugins.handle_gdf(geopandasdata),
ex_name='geopandasdata',
key=b'R1r08DBgQQHILDOw___OgsG_1QX-_jJAGLag1EdnTPI=',
n_iter=20)
# Assign test case vars for debugging
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to geopandasdata did not pass the test.'
### BEGIN HIDDEN TESTS
start_time = time()
tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")
passed, test_case_vars, e = execute_tests(func=plugins.handle_gdf(geopandasdata),
ex_name='geopandasdata',
key=b'1kIeYcEN-UGgzaeKRsm5fCi9viAok6jEOXn6ctZCznw=',
n_iter=20,
hidden=True)
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to geopandasdata did not pass the test.'
### END HIDDEN TESTS
print('Passed! Please submit.')
### Run Me!!!
demo_result_geopandasdata_TRUE = utils.load_object_from_publicdata('demo_result_geopandasdata_TRUE')
Now that we can extract path coordinates as well as latitudes and longitudes, we can plot the routes on an image using matplotlib. Assume that data has all the latitudes and longitudes from the original GPX file. We'll use these generated png files later to build a machine learning model to classify the type of shape.
import matplotlib.pyplot as plt
ax=geopandasdata(data).plot()
# plt.savefig(f'resource/asnlib/publicdata/pi_shaperun_paris_2.png')
plt.show()

Additionally, we can use an interactive map plotting library such as Folium to plot our gpx on a map.
# # https://stackoverflow.com/questions/60578408/is-it-possible-to-draw-paths-in-folium
# https://stackoverflow.com/questions/71831698/creating-a-folium-map-with-markers-with-different-colors
import folium
def plotfolium(path_coords,latlon):
lat,lon=latlon
from statistics import mean
m=folium.Map(location=[mean(lat),mean(lon)],zoom_start=14,tiles="CartoDB positron")
folium.PolyLine(path_coords,color="red",opacity=.9).add_to(m)
return m
pathcoords=utils.load_object_from_publicdata('demo_pathcoords.dill')
latlon=utils.load_object_from_publicdata('demo_latlon.dill')
plotfolium(pathcoords,latlon)
convert_distances
Your task: define convert_distances as follows:
Implement a function which converts and standardizes data to either miles or kilometers.
Inputs:
df_runs: A Pandas DataFrame containing distance and unit columnsfinal_unit: a string, either miles or kilometersReturn:
df_runs_copy: a Pandas DataFrame containing distance and unit columns:distance column needs to be rounded to 3 decimalsRequirements/steps:
miles_to_km, convert and standardize the data in the df_runs DataFrame based on the final_unit### Solution - Exercise 5
def convert_distances(df_runs: pd.DataFrame, final_unit: str) -> pd.DataFrame:
miles_to_km = 1.609
### BEGIN SOLUTION
df_runs_copy = df_runs.copy()
if final_unit == 'miles':
df_runs_copy.loc[df_runs_copy['unit'] != 'miles', 'distance'] = df_runs_copy.loc[df_runs_copy['unit'] != 'miles', 'distance'] / miles_to_km
else:
df_runs_copy.loc[df_runs_copy['unit'] != 'kilometers', 'distance'] = df_runs_copy.loc[df_runs_copy['unit'] != 'kilometers', 'distance'] * miles_to_km
df_runs_copy['unit'] = final_unit
df_runs_copy['distance'] = df_runs_copy['distance'].round(3)
return df_runs_copy
### END SOLUTION
### Demo function call
demo_convert_distances_df_runs = utils.load_object_from_publicdata('demo_convert_distances_df_runs.dill')
demo_convert_distances_output = convert_distances(demo_convert_distances_df_runs, 'miles')
display(demo_convert_distances_output)
The demo should display this output.
| distance | unit | |
|---|---|---|
| 0 | 1.865 | miles |
| 1 | 0.622 | miles |
| 2 | 9.000 | miles |
| 3 | 1.865 | miles |
| 4 | 9.000 | miles |
The cell below will test your solution for convert_distances (exercise 5). The testing variables will be available for debugging under the following names in a dictionary format.
input_vars - Input variables for your solution. original_input_vars - Copy of input variables from prior to running your solution. Any key:value pair in original_input_vars should also exist in input_vars - otherwise the inputs were modified by your solution. returned_output_vars - Outputs returned by your solution. true_output_vars - The expected output. This should "match" returned_output_vars based on the question requirements - otherwise, your solution is not returning the correct output. ### Test Cell - Exercise 5
from cse6040_devkit.tester_fw.testers import Tester
from yaml import safe_load
from time import time
tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")
# Load testing utility
with open('resource/asnlib/publicdata/execute_tests', 'rb') as f:
executor = dill.load(f)
@run_with_timeout(error_threshold=200.0, warning_threshold=100.0)
@suppress_stdout
def execute_tests(**kwargs):
return executor(**kwargs)
# Execute test
start_time = time()
passed, test_case_vars, e = execute_tests(func=convert_distances,
ex_name='convert_distances',
key=b'R1r08DBgQQHILDOw___OgsG_1QX-_jJAGLag1EdnTPI=',
n_iter=21)
# Assign test case vars for debugging
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to convert_distances did not pass the test.'
### BEGIN HIDDEN TESTS
start_time = time()
tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")
passed, test_case_vars, e = execute_tests(func=convert_distances,
ex_name='convert_distances',
key=b'1kIeYcEN-UGgzaeKRsm5fCi9viAok6jEOXn6ctZCznw=',
n_iter=21,
hidden=True)
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to convert_distances did not pass the test.'
### END HIDDEN TESTS
print('Passed! Please submit.')
### Run Me!!!
demo_result_convert_distances_TRUE = utils.load_object_from_publicdata('demo_result_convert_distances_TRUE')
heartratedrift
Your task: define heartratedrift_query as follows:
Write a SQL query to calculate heart rate drift
Requirements/steps:
df_heartrate tableData is from one hour runs. For every user, their speed (mph) and heart rate (bpm) is tabulated for each minute of their one hour run from minute 0 to minute 59.
| user | minute | speed | heart_rate |
|---|---|---|---|
| 001 | 0 | 4.1 | 136 |
| 001 | 1 | 4.5 | 135 |
| 001 | 2 | 3.9 | 135 |
| 001 | 3 | 3.8 | 136 |
We want to calculate the heart rate drift % for each runner over the hour period. This is calculated by:
$$ \frac {\frac{\bar{s_1}} {\bar{h_1}} - \frac{\bar{s_2}} {\bar{h_2}}} {\frac{\bar{s_1}} {\bar{h_1}}} $$Your query should return one row per runner with their heart rate drift %
| user | heart_rate_drift |
|---|---|
| 001 | 0.038 |
| 002 | 0.012 |
### Solution - Exercise 6
heartratedrift_query = '''YOUR QUERY HERE'''
### BEGIN SOLUTION
heartratedrift_query = '''
WITH user_stats AS (
SELECT
user
, AVG(CASE WHEN minute <= 29 THEN speed ELSE NULL END) AS first_half_speed
, AVG(CASE WHEN minute <= 29 THEN heart_rate ELSE NULL END) AS first_half_heart_rate
, AVG(CASE WHEN minute > 29 THEN speed ELSE NULL END) AS second_half_speed
, AVG(CASE WHEN minute > 29 THEN heart_rate ELSE NULL END) AS second_half_heart_rate
FROM df_heartrate
GROUP BY 1
)
SELECT
user
, ((first_half_speed / first_half_heart_rate) - (second_half_speed / second_half_heart_rate)) / (first_half_speed / first_half_heart_rate) AS heart_rate_drift
FROM user_stats
'''
### END SOLUTION
### Demo function call
demo_result_heartratedrift = pd.read_sql(heartratedrift_query, conn)
demo_result_heartratedrift_output=demo_result_heartratedrift[demo_result_heartratedrift['user']=='1']
display(demo_result_heartratedrift_output)
The demo should display this output.
| user | heart_rate_drift | |
|---|---|---|
| 1 | 1 | -0.007659 |
The cell below will test your solution for heartratedrift (exercise 6). The testing variables will be available for debugging under the following names in a dictionary format.
input_vars - Input variables for your solution. original_input_vars - Copy of input variables from prior to running your solution. Any key:value pair in original_input_vars should also exist in input_vars - otherwise the inputs were modified by your solution. returned_output_vars - Outputs returned by your solution. true_output_vars - The expected output. This should "match" returned_output_vars based on the question requirements - otherwise, your solution is not returning the correct output. ### Test Cell - Exercise 6
from cse6040_devkit.tester_fw.testers import Tester
from yaml import safe_load
from time import time
tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")
# Load testing utility
with open('resource/asnlib/publicdata/execute_tests', 'rb') as f:
executor = dill.load(f)
@run_with_timeout(error_threshold=200.0, warning_threshold=100.0)
@suppress_stdout
def execute_tests(**kwargs):
return executor(**kwargs)
# Execute test
start_time = time()
passed, test_case_vars, e = execute_tests(func=plugins.sql_executor(heartratedrift_query),
ex_name='heartratedrift',
key=b'R1r08DBgQQHILDOw___OgsG_1QX-_jJAGLag1EdnTPI=',
n_iter=10)
# Assign test case vars for debugging
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to heartratedrift did not pass the test.'
### BEGIN HIDDEN TESTS
start_time = time()
tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")
passed, test_case_vars, e = execute_tests(func=plugins.sql_executor(heartratedrift_query),
ex_name='heartratedrift',
key=b'1kIeYcEN-UGgzaeKRsm5fCi9viAok6jEOXn6ctZCznw=',
n_iter=10,
hidden=True)
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to heartratedrift did not pass the test.'
### END HIDDEN TESTS
print('Passed! Please submit.')
### Run Me!!!
demo_result_heartratedrift_TRUE = utils.load_object_from_publicdata('demo_result_heartratedrift_TRUE')
race_winners
Your task: define race_winners_query as follows:
Write a SQL query to calculate the top 3 finishers and their time delta with the runner in front
Requirements/steps:
df_races tableRace, determine the top 3 Runner finishers and their time Delta from the finisher in front of themDelta of NaN because no one finished ahead of them.| Race | Runner | Time |
|---|---|---|
| marathon | Zac | 248.7 |
| marathon | Hannah | 241.5 |
| marathon | Bella | 242.9 |
| marathon | Liam | 236.7 |
| marathon | Riley | 224.6 |
| Race | Runner | Delta |
|---|---|---|
| marathon | Riley | NaN |
| marathon | Liam | 12.1 |
| marathon | Hannah | 4.8 |
### Solution - Exercise 7
race_winners_query = '''YOUR QUERY HERE'''
### BEGIN SOLUTION
race_winners_query = '''
WITH ranked_by_race AS (
SELECT
Race
, Runner
, Time
, ROW_NUMBER() OVER(PARTITION BY Race ORDER BY Time) AS rownum
, LAG(Time) OVER(PARTITION BY Race ORDER BY Time) AS prev_runner_time
FROM df_races
)
SELECT
Race
, Runner
, Time - prev_runner_time AS Delta
FROM ranked_by_race
WHERE rownum < 4
'''
### END SOLUTION
### Demo function call
demo_result_race_winners = pd.read_sql(race_winners_query, conn)
display(demo_result_race_winners)
The demo should display this output.
| Race | Runner | Delta | |
|---|---|---|---|
| 0 | 400m | Mia | NaN |
| 1 | 400m | Matthew | 0.2 |
| 2 | 400m | Andrew | 0.1 |
| 3 | 5k | Alexander | NaN |
| 4 | 5k | Chloe | 0.5 |
| 5 | 5k | Harper | 5.3 |
| 6 | half marathon | Carter | NaN |
| 7 | half marathon | Zoe | 1.0 |
| 8 | half marathon | Andrew | 1.0 |
| 9 | marathon | Riley | NaN |
| 10 | marathon | Liam | 12.1 |
| 11 | marathon | Hannah | 4.8 |
| 12 | turkey trot | Carter | NaN |
| 13 | turkey trot | Zoe | 2.0 |
| 14 | turkey trot | Andrew | 2.0 |
The cell below will test your solution for race_winners (exercise 7). The testing variables will be available for debugging under the following names in a dictionary format.
input_vars - Input variables for your solution. original_input_vars - Copy of input variables from prior to running your solution. Any key:value pair in original_input_vars should also exist in input_vars - otherwise the inputs were modified by your solution. returned_output_vars - Outputs returned by your solution. true_output_vars - The expected output. This should "match" returned_output_vars based on the question requirements - otherwise, your solution is not returning the correct output. ### Test Cell - Exercise 7
from cse6040_devkit.tester_fw.testers import Tester
from yaml import safe_load
from time import time
tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")
# Load testing utility
with open('resource/asnlib/publicdata/execute_tests', 'rb') as f:
executor = dill.load(f)
@run_with_timeout(error_threshold=200.0, warning_threshold=100.0)
@suppress_stdout
def execute_tests(**kwargs):
return executor(**kwargs)
# Execute test
start_time = time()
passed, test_case_vars, e = execute_tests(func=plugins.sql_executor(race_winners_query),
ex_name='race_winners',
key=b'R1r08DBgQQHILDOw___OgsG_1QX-_jJAGLag1EdnTPI=',
n_iter=10)
# Assign test case vars for debugging
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to race_winners did not pass the test.'
### BEGIN HIDDEN TESTS
start_time = time()
tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")
passed, test_case_vars, e = execute_tests(func=plugins.sql_executor(race_winners_query),
ex_name='race_winners',
key=b'1kIeYcEN-UGgzaeKRsm5fCi9viAok6jEOXn6ctZCznw=',
n_iter=10,
hidden=True)
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to race_winners did not pass the test.'
### END HIDDEN TESTS
print('Passed! Please submit.')
### Run Me!!!
demo_result_race_winners_TRUE = utils.load_object_from_publicdata('demo_result_race_winners_TRUE')
label_filenames
Your task: define label_filenames as follows:
Implement a function, label_filenames, which produces a list of tuples.
Inputs:
filenames: A list of filenamesfirstclass: a stringReturn:
labeled_filenames: a list of tuples where the tuple is (label,filename)Requirements/steps:
firstclass string is in the filename, label=0.firstclass string is NOT in the filename), label=1.filenames.### Solution - Exercise 8
def label_filenames(filenames:list,firstclass:str='pi') -> list:
### BEGIN SOLUTION
return [(0 if firstclass in filename else 1,filename) for filename in filenames]
### END SOLUTION
### Demo function call
filenames=['resource/asnlib/publicdata/pi.png','resource/asnlib/publicdata/rose.png','resource/asnlib/publicdata/pinyc.png']
demo_label_filenames_output = label_filenames(filenames,'pi')
pprint(demo_label_filenames_output)
The demo should display this printed output.
[(0, 'resource/asnlib/publicdata/pi.png'),
(1, 'resource/asnlib/publicdata/rose.png'),
(0, 'resource/asnlib/publicdata/pinyc.png')]
The cell below will test your solution for label_filenames (exercise 8). The testing variables will be available for debugging under the following names in a dictionary format.
input_vars - Input variables for your solution. original_input_vars - Copy of input variables from prior to running your solution. Any key:value pair in original_input_vars should also exist in input_vars - otherwise the inputs were modified by your solution. returned_output_vars - Outputs returned by your solution. true_output_vars - The expected output. This should "match" returned_output_vars based on the question requirements - otherwise, your solution is not returning the correct output. ### Test Cell - Exercise 8
from cse6040_devkit.tester_fw.testers import Tester
from yaml import safe_load
from time import time
tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")
# Load testing utility
with open('resource/asnlib/publicdata/execute_tests', 'rb') as f:
executor = dill.load(f)
@run_with_timeout(error_threshold=200.0, warning_threshold=100.0)
@suppress_stdout
def execute_tests(**kwargs):
return executor(**kwargs)
# Execute test
start_time = time()
passed, test_case_vars, e = execute_tests(func=label_filenames,
ex_name='label_filenames',
key=b'R1r08DBgQQHILDOw___OgsG_1QX-_jJAGLag1EdnTPI=',
n_iter=21)
# Assign test case vars for debugging
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to label_filenames did not pass the test.'
### BEGIN HIDDEN TESTS
start_time = time()
tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")
passed, test_case_vars, e = execute_tests(func=label_filenames,
ex_name='label_filenames',
key=b'1kIeYcEN-UGgzaeKRsm5fCi9viAok6jEOXn6ctZCznw=',
n_iter=21,
hidden=True)
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to label_filenames did not pass the test.'
### END HIDDEN TESTS
print('Passed! Please submit.')
create_model_data
Your task: define create_model_data as follows:
Implement a function, create_model_data, which produces a tuple.
Inputs:
label_filenames: A list of tuples (label,filename)array_size: a tuple representing the preferred image size. Defaults to (4,4).Return:
model_data: a tuple representing: Requirements/steps:
cv2 package, read the image into an numpy.ndarray using img_arr=cv2.imread(filename)[...,::-1] methodarray_size image size, which defaults to (4,4), using cv2.resize### Solution - Exercise 9
def create_model_data(label_filenames:list,array_size:tuple=(4,4)) -> tuple:
import cv2
# img_arr=cv2.imread(filename)[...,::-1] ### You will use this!
### BEGIN SOLUTION
data=[]
ydata=[]
for l in label_filenames:
label,filename=l
img_arr=cv2.imread(filename)[...,::-1]
resized_arr=cv2.resize(img_arr,array_size)
data.append(resized_arr)
ydata.append(label)
return data,ydata
### END SOLUTION
### Demo function call
label_filenames=[(0,'resource/asnlib/publicdata/pi.png'),(1,'resource/asnlib/publicdata/rose.png')]
demo_create_model_data_output = create_model_data(label_filenames)
pprint(demo_create_model_data_output)
The demo should display this printed output.
([array([[[255, 255, 255],
[140, 198, 197],
[140, 198, 197],
[255, 255, 255]],
[[255, 255, 255],
[144, 204, 203],
[144, 204, 203],
[255, 255, 255]],
[[255, 255, 255],
[144, 204, 203],
[144, 204, 203],
[255, 255, 255]],
[[255, 255, 255],
[136, 136, 136],
[144, 204, 203],
[255, 255, 255]]], dtype=uint8),
array([[[255, 255, 255],
[140, 198, 197],
[140, 198, 197],
[255, 255, 255]],
[[157, 157, 157],
[144, 204, 203],
[144, 204, 203],
[255, 255, 255]],
[[255, 255, 255],
[144, 204, 203],
[144, 204, 203],
[255, 255, 255]],
[[255, 255, 255],
[235, 235, 235],
[238, 238, 238],
[255, 255, 255]]], dtype=uint8)],
[0, 1])
The cell below will test your solution for create_model_data (exercise 9). The testing variables will be available for debugging under the following names in a dictionary format.
input_vars - Input variables for your solution. original_input_vars - Copy of input variables from prior to running your solution. Any key:value pair in original_input_vars should also exist in input_vars - otherwise the inputs were modified by your solution. returned_output_vars - Outputs returned by your solution. true_output_vars - The expected output. This should "match" returned_output_vars based on the question requirements - otherwise, your solution is not returning the correct output. ### Test Cell - Exercise 9
from cse6040_devkit.tester_fw.testers import Tester
from yaml import safe_load
from time import time
tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")
# Load testing utility
with open('resource/asnlib/publicdata/execute_tests', 'rb') as f:
executor = dill.load(f)
@run_with_timeout(error_threshold=200.0, warning_threshold=100.0)
@suppress_stdout
def execute_tests(**kwargs):
return executor(**kwargs)
# Execute test
start_time = time()
passed, test_case_vars, e = execute_tests(func=create_model_data,
ex_name='create_model_data',
key=b'R1r08DBgQQHILDOw___OgsG_1QX-_jJAGLag1EdnTPI=',
n_iter=20)
# Assign test case vars for debugging
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to create_model_data did not pass the test.'
### BEGIN HIDDEN TESTS
start_time = time()
tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")
passed, test_case_vars, e = execute_tests(func=create_model_data,
ex_name='create_model_data',
key=b'1kIeYcEN-UGgzaeKRsm5fCi9viAok6jEOXn6ctZCznw=',
n_iter=20,
hidden=True)
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to create_model_data did not pass the test.'
### END HIDDEN TESTS
print('Passed! Please submit.')
This isn't going to be the best classifying model due to the limited images we gave it. Nonetheless, let's train on 7 images and test on 3 images to determine how effective our model is. We should likely increase the dimensions and layers of the Convolutional Neural Network based on the resulting predictions. Below you'll find the code.
import tensorflow as tf
from tensorflow.keras import layers, models
import matplotlib.pyplot as plt
#training
label_filenames=[(0,'gpsartproto/pi.png'),(1,'gpsartproto/rose.png'),(0,'gpsartproto/pinyc.png'),(1,'gpsartproto/rosenyc.png'),(0,'gpsartproto/pisf.png'),(1,'gpsartproto/roseboston.png'),(0,'gpsartproto/piboston.png')]
data,ydata=create_model_data(label_filenames)
dix_train=np.array(data)
diy_train=np.array(ydata).reshape(-1,1)
#testing
label_filenames=[(1,'gpsartproto/rosearlington.png'),(1,'gpsartproto/rosesf.png'),(0,'gpsartproto/pi_shaperun_paris.png')]
data,ydata=create_model_data(label_filenames)
dix_test=np.array(data)
diy_test=np.array(ydata).reshape(-1,1)
di_train, di_test = dix_train / 255.0, dix_test / 255.0
##train model
#define CNN for gpsartify
model = models.Sequential([
layers.Conv2D(32, (3, 3), activation='relu', input_shape=(32, 32, 3)),
layers.MaxPooling2D((2, 2)),
layers.Conv2D(64, (3, 3), activation='relu'),
layers.MaxPooling2D((2, 2)),
layers.Conv2D(64, (3, 3), activation='relu'),
layers.Flatten(),
layers.Dense(64, activation='relu'),
layers.Dense(2, activation='softmax') # 2 classes in gpsartify
])
model.compile(optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
## fit the model
history = model.fit(di_train, diy_train, epochs=10, validation_split=0.2)
#make predictions from model
predictions = model.predict(di_test)
class_names = ['pi','rose']
def show_prediction(index):
plt.figure(figsize=(5,5))
plt.imshow(di_test[index])
pred_label = class_names[np.argmax(predictions[index])]
print(diy_test[index])
true_label = class_names[diy_test[index][0]]
plt.title(f"Predicted: {pred_label}\nActual: {true_label}")
plt.axis('off')
plt.savefig(f'model_prediction_{index}.png')
plt.show()
for i in range(3):
show_prediction(i)

Suppose we had an image and we wanted to create a running route by overlaying that image onto a map.
shift_svg
Your task: define shift_svg as follows:
Implement a function, shift_svg, which produces a numpy.ndarray.
Inputs:
sampled_points: A list of tuples representing an x,y coordinate associated with the svg imagesReturn:
shifted_points: A 2-D Numpy array of shifted coordinates (x', y'), where "shifting" is defined below. If there are m input points, then shifted_points is an m x 2 Numpy array.Requirements/steps:
x_min be the smallest of the x input coordinates, and let y_min the smallest of the y input coordinates.sampled_points[i] == (x, y), its shifted version is the point (x - x_min, y - y_min), which should be stored in row shifted_points[i, :] of the final output.### Solution - Exercise 10
def shift_svg(sampled_points:list) -> np.ndarray:
### BEGIN SOLUTION
# Shift to (0, 0)
sampled_points = np.array(sampled_points)
sampled_points -= sampled_points.min(axis=0)
return sampled_points
### END SOLUTION
### Demo function call
demo_sampled_points = utils.load_object_from_publicdata('demo_shift_svg_paths.dill')
demo_sampled_points = demo_sampled_points[:10]
demo_shift_svg_output = shift_svg(demo_sampled_points)
pprint(demo_shift_svg_output)
The demo should display this printed output.
array([[ 0. , 104.89984487],
[ 25.54484314, 109.88842341],
[ 37.40263979, 130.14757786],
[ 53.77403485, 152.94725888],
[ 79.74444851, 173.11282804],
[120.39930102, 185.4696469 ],
[176.81714907, 181.11087016],
[172.8895754 , 110.88946172],
[164.59477901, 50.86536302],
[152.86201033, 0. ]])
The cell below will test your solution for shift_svg (exercise 10). The testing variables will be available for debugging under the following names in a dictionary format.
input_vars - Input variables for your solution. original_input_vars - Copy of input variables from prior to running your solution. Any key:value pair in original_input_vars should also exist in input_vars - otherwise the inputs were modified by your solution. returned_output_vars - Outputs returned by your solution. true_output_vars - The expected output. This should "match" returned_output_vars based on the question requirements - otherwise, your solution is not returning the correct output. ### Test Cell - Exercise 10
from cse6040_devkit.tester_fw.testers import Tester
from yaml import safe_load
from time import time
tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")
# Load testing utility
with open('resource/asnlib/publicdata/execute_tests', 'rb') as f:
executor = dill.load(f)
@run_with_timeout(error_threshold=200.0, warning_threshold=100.0)
@suppress_stdout
def execute_tests(**kwargs):
return executor(**kwargs)
# Execute test
start_time = time()
passed, test_case_vars, e = execute_tests(func=shift_svg,
ex_name='shift_svg',
key=b'R1r08DBgQQHILDOw___OgsG_1QX-_jJAGLag1EdnTPI=',
n_iter=21)
# Assign test case vars for debugging
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to shift_svg did not pass the test.'
### BEGIN HIDDEN TESTS
start_time = time()
tracemalloc.start()
mem_start, peak_start = tracemalloc.get_traced_memory()
print(f"initial memory usage: {mem_start/1024/1024:.2f} MB")
passed, test_case_vars, e = execute_tests(func=shift_svg,
ex_name='shift_svg',
key=b'1kIeYcEN-UGgzaeKRsm5fCi9viAok6jEOXn6ctZCznw=',
n_iter=21,
hidden=True)
input_vars, original_input_vars, returned_output_vars, true_output_vars = test_case_vars
duration = time() - start_time
print(f"Test duration: {duration:.2f} seconds")
current_memory, peak_memory = tracemalloc.get_traced_memory()
print(f"memory after test: {current_memory/1024/1024:.2f} MB")
print(f"memory peak during test: {peak_memory/1024/1024:.2f} MB")
tracemalloc.stop()
if e: raise e
assert passed, 'The solution to shift_svg did not pass the test.'
### END HIDDEN TESTS
print('Passed! Please submit.')
As aforementioned, if we wanted to create our own running routes by overlaying an image onto a map in our city. Below is some code that will allow us to accomplish that. The methodology is as follows:
osmnx and networkx to accomplish this.How does it look. We actually plotted our run above using the Folium library. You can juxtapose the SVG image below to our GPS overlayed image.

#####################################
# extract points from SVG image
# !pip install svgpathtools # likely need to run to get package
from svgpathtools import svg2paths
# Load your SVG file
paths, attributes = svg2paths("gpsartproto/Pi-symbol.svg")
sampled_points = []
for path in paths:
for t in np.linspace(0, 1, 100): # adjust 100 to change resolution
pt = path.point(t)
sampled_points.append((pt.real, -pt.imag))
# RUN CODE from normalize_svg function
# plot svg sampled points to confirm
plt.plot(sampled_points[:, 0], sampled_points[:, 1], 'k') # Flip Y for visual correctness
plt.gca().set_aspect('equal')
plt.title("pi Shape from SVG")
plt.savefig(f'svg_shape_sampled_points.png')
plt.show()
#####################################
# transpose SVG image points onto a map
# !pip install osmnx #likely need to run to get package
import math, pyproj
import numpy as np, shapely.geometry as geom, shapely.affinity as affinity
import shapely.ops as ops
import osmnx as ox, networkx as nx, gpxpy.gpx
CENTER_LL = (48.87075211275222, 2.345818599425802) # lat, lon in Paris
RADIUS = 1200 # meters
SCALE_M = 2.4 # meters per *pixel* unit
# Build and project run network ───────────────────────────
G_latlon = ox.graph_from_point(CENTER_LL, dist=RADIUS, network_type="walk")
G = ox.project_graph(G_latlon)
crs_proj = G.graph["crs"]
# Helper to convert lat/lon into projected meters
to_proj = pyproj.Transformer.from_crs("EPSG:4326", crs_proj, always_xy=True).transform
anchor_x, anchor_y = to_proj(CENTER_LL[1], CENTER_LL[0])
# build svg pi in the same CRS
raw_xy = sampled_points * SCALE_M
shape_m = geom.LineString(raw_xy)
# center the shape on the anchor point
centroid = shape_m.centroid
shape_m = affinity.translate(
shape_m,
xoff=anchor_x - centroid.x,
yoff=anchor_y - centroid.y
)
# Snap every vertex to the nearest street node
def nearest_node(x, y):
return ox.distance.nearest_nodes(G, x, y)
coords = list(shape_m.coords)
nodes = [nearest_node(x, y) for x, y in coords]
# Shortest-path stitching between consecutive snapped nodes
route_nodes = []
for u, v in zip(nodes[:-1], nodes[1:]):
if u == v: # identical then skip
continue
seg = nx.shortest_path(G, u, v, weight="length")
route_nodes.extend(seg[:-1])
route_nodes.append(nodes[-1])
# Back to lat/lon for GPX export
G_ll = ox.project_graph(G, to_latlong=True)
track = [(G_ll.nodes[n]["y"], G_ll.nodes[n]["x"]) for n in route_nodes]
# write GPX file
# !pip install gpxpy # likely need to run to get package
import gpxpy
gpx = gpxpy.gpx.GPX()
trk = gpxpy.gpx.GPXTrack(); gpx.tracks.append(trk)
seg = gpxpy.gpx.GPXTrackSegment(); trk.segments.append(seg)
for lat, lon in track:
seg.points.append(gpxpy.gpx.GPXTrackPoint(latitude=lat, longitude=lon))
with open("shape_run_paris2.gpx", "w") as f:
f.write(gpx.to_xml())
Congratulations on completing the Final Exam and the semester!